您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 .NET Framework 将事件发送到 Azure 事件中心或从其接收事件Send events to or receive events from Azure Event Hubs using .NET Framework

Azure 事件中心是一个大数据流式处理平台和事件引入服务,每秒能够接收和处理数百万个事件。Azure Event Hubs is a Big Data streaming platform and event ingestion service, capable of receiving and processing millions of events per second. 事件中心可以处理和存储分布式软件和设备生成的事件、数据或遥测。Event Hubs can process and store events, data, or telemetry produced by distributed software and devices. 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到数据中心的数据。Data sent to an event hub can be transformed and stored using any real-time analytics provider or batching/storage adapters. 有关事件中心的详细概述,请参阅事件中心概述事件中心功能For detailed overview of Event Hubs, see Event Hubs overview and Event Hubs features.

本教程展示了如何在 C# 中创建 NET Framework 控制台应用程序,以便将事件发送到事件中心或从其接收事件。This tutorial shows how to create .NET Framework console applications in C# to send events to or receive events from an eventhub.

必备组件Prerequisites

若要完成本教程,需要具备以下先决条件:To complete this tutorial, you need the following prerequisites:

  • Microsoft Visual Studio 2019Microsoft Visual Studio 2019.
  • 创建事件中心命名空间和事件中心Create an Event Hubs namespace and an event hub. 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article. 然后,按照以下文章中的说明获取事件中心命名空间的连接字符串获取连接字符串Then, get the connection string for the event hub namespace by following instructions from the article: Get connection string. 本教程后面的步骤将使用此连接字符串。You use the connection string later in this tutorial.

发送事件Send events

本部分展示了如何创建 .NET Core 控制台应用程序来将事件发送到事件中心。This section shows you how to create a .NET Framework console application to send events to an event hub.

创建控制台应用程序Create a console application

在 Visual Studio 中,使用 控制台应用程序 项目模板创建一个新的 Visual C# 桌面应用项目。In Visual Studio, create a new Visual C# Desktop App project using the Console Application project template. 将该项目命名为 SenderName the project Sender.

创建控制台应用程序

添加事件中心 NuGet 包Add the Event Hubs NuGet package

  1. 在解决方案资源管理器中,右键单击“Sender”项目,并单击“为解决方案管理 NuGet 包”。 In Solution Explorer, right-click the Sender project, and then click Manage NuGet Packages for Solution.

  2. 单击“浏览” 选项卡,并搜索 WindowsAzure.ServiceBusClick the Browse tab, then search for WindowsAzure.ServiceBus. 单击“安装” 并接受使用条款。Click Install, and accept the terms of use.

    安装服务总线 NuGet 包

    Visual Studio 下载、安装 Azure 服务总线库 NuGet 包并添加对它的引用。Visual Studio downloads, installs, and adds a reference to the Azure Service Bus library NuGet package.

编写代码以将消息发送到事件中心Write code to send messages to the event hub

  1. 在 Program.cs 文件顶部添加以下 using 语句:Add the following using statements at the top of the Program.cs file:

    using System.Threading;
    using Microsoft.ServiceBus.Messaging;
    
  2. 将以下字段添加到 Program 类,并将占位符值分别替换成在上一节中创建的事件中心的名称和前面保存的命名空间级别连接字符串。Add the following fields to the Program class, substituting the placeholder values with the name of the event hub you created in the previous section, and the namespace-level connection string you saved previously. 可以在 Azure 门户中从“事件中心”页面上的 RootManageSharedAccessKey 下的“连接字符串-主要” 密钥下复制你的事件中心的连接字符串。You can copy connection string for your event hub from Connection string-primary key under RootManageSharedAccessKey on the Event Hub page in the Azure portal. 有关详细步骤,请参阅获取连接字符串For detailed steps, see Get connection string.

    static string eventHubName = "Your Event Hub name";
    static string connectionString = "namespace connection string";
    
  3. 将以下方法添加到 Program 类:Add the following method to the Program class:

    static void SendingRandomMessages()
    {
        var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
        while (true)
        {
            try
            {
                var message = Guid.NewGuid().ToString();
                Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, message);
                eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message)));
            }
            catch (Exception exception)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
                Console.ResetColor();
            }
    
            Thread.Sleep(200);
        }
    }
    

    此方法不断将事件发送到事件中心,延迟为 200 毫秒。This method continuously sends events to your event hub with a 200-ms delay.

  4. 最后,在 Main 方法中添加以下行:Finally, add the following lines to the Main method:

    Console.WriteLine("Press Ctrl-C to stop the sender process");
    Console.WriteLine("Press Enter to start now");
    Console.ReadLine();
    SendingRandomMessages();
    
  5. 运行程序,并确保没有任何错误。Run the program, and ensure that there are no errors.

接收事件Receive events

在本部分中,你将编写一个 .NET Framework 控制台应用程序,以使用事件处理程序主机从事件中心接收消息。In this section, you write a .NET Framework console application that receives messages from an event hub using the Event Processor Host. 事件处理程序主机是一个 .NET 类,它通过从事件中心管理持久检查点和并行接收来简化从那些事件中心接收事件的过程。The Event Processor Host is a .NET class that simplifies receiving events from event hubs by managing persistent checkpoints and parallel receives from those event hubs. 使用事件处理程序主机,可跨多个接收方拆分事件,即使在不同节点中托管时也是如此。Using the Event Processor Host, you can split events across multiple receivers, even when hosted in different nodes.

为事件处理程序主机创建存储帐户Create a storage account for Event Processor Host

事件处理程序主机是一个智能代理,它通过管理持久性检查点和并行接收操作,来简化从事件中心接收事件的过程。The Event Processor Host is an intelligent agent that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives. 对于检查点,事件处理程序主机需要一个存储帐户。For checkpointing, the Event Processor Host requires a storage account. 以下示例演示如何创建存储帐户,以及如何获取其密钥以进行访问:The following example shows how to create a storage account and how to get its keys for access:

  1. 在 Azure 门户中,选择屏幕左上角的“创建资源”。 In the Azure portal, and select Create a resource at the top left of the screen.

  2. 选择“存储”,然后选择“存储帐户 - Blob、文件、表、队列” 。Select Storage, then select Storage account - blob, file, table, queue.

    选择存储帐户

  3. 在“创建存储帐户”页中执行以下步骤: On the Create storage account page, take the following steps:

    1. 输入存储帐户的名称。Enter a name for the storage account.

    2. 选择包含事件中心的 Azure 订阅。Choose an Azure subscription that contains the event hub.

    3. 选择包含事件中心的资源组。Select the resource group that has the event hub.

    4. 选择可在其中创建资源的位置。Select a location in which to create the resource.

    5. 然后单击“查看 + 创建” 。Then click Review + create.

      创建存储帐户 - 页面

  4. 在“查看 + 创建” 页上查看值,然后选择“创建”。 On the Review + create page, review the values, and select Create.

    查看存储帐户设置,然后执行创建操作

  5. 在看到后部署成功消息中,选择转到资源页的顶部。After you see the Deployments Succeeded message, select Go to resource at the top of the page. 还可以通过从资源列表中选择存储帐户来启动“存储帐户”页。You can also launch the Storage Account page by selecting your storage account from the resource list.

    从部署中选择存储帐户

  6. 在“概要” 窗口中选择“Blob” 。In the Essentials window, select Blobs.

    选择 Blob 服务

  7. 选择顶部的“+ 容器” ,为容器输入名称,然后选择“确定” 。Select + Container at the top, enter a name for the container, and select OK.

    创建 Blob 容器

  8. 在左侧菜单中选择”访问密钥” ,然后复制 key1 的值。Select Access keys in the left-side menu, and copy the value of key1.

    将以下值保存到记事本或其他某个临时位置。Save the following values to Notepad or some other temporary location.

    • 存储帐户的名称Name of the storage account
    • 存储帐户的访问密钥Access key for the storage account
    • 容器的名称Name of the container

创建控制台应用程序Create a console application

在 Visual Studio 中,使用 控制台应用程序 项目模板创建一个新的 Visual C# 桌面应用项目。In Visual Studio, create a new Visual C# Desktop App project using the Console Application project template. 将该项目命名为 ReceiverName the project Receiver.

创建控制台应用程序

添加事件中心 NuGet 包Add the Event Hubs NuGet package

  1. 在解决方案资源管理器中,右键单击“Receiver”项目,并单击“为解决方案管理 NuGet 包”。 In Solution Explorer, right-click the Receiver project, and then click Manage NuGet Packages for Solution.

  2. 单击“浏览” 选项卡,并搜索 Microsoft Azure Service Bus Event Hub - EventProcessorHostClick the Browse tab, then search for Microsoft Azure Service Bus Event Hub - EventProcessorHost. 单击“安装” 并接受使用条款。Click Install, and accept the terms of use.

    搜索事件处理器主机 NuGet 包

    Visual Studio 下载、安装 Azure 服务总线事件中心 - EventProcessorHost NuGet 包及其所有依赖项并添加对它们的引用。Visual Studio downloads, installs, and adds a reference to the Azure Service Bus Event Hub - EventProcessorHost NuGet package, with all its dependencies.

实现 IEventProcessor 接口Implement the IEventProcessor interface

  1. 右键单击 Receiver 项目,单击“添加”,并单击“类” 。Right-click the Receiver project, click Add, and then click Class. 将新类命名为 SimpleEventProcessor,然后单击“添加”以创建该类 。Name the new class SimpleEventProcessor, and then click Add to create the class.

    添加 SimpleEventProcessor 类

  2. 在 SimpleEventProcessor.cs 文件的顶部添加以下语句:Add the following statements at the top of the SimpleEventProcessor.cs file:

    using Microsoft.ServiceBus.Messaging;
    using System.Diagnostics;
    
  3. 用以下代码替换该类的正文:Substitute the following code for the body of the class:

    class SimpleEventProcessor : IEventProcessor
    {
      Stopwatch checkpointStopWatch;
    
      async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
      {
          Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
          if (reason == CloseReason.Shutdown)
          {
              await context.CheckpointAsync();
          }
      }
    
      Task IEventProcessor.OpenAsync(PartitionContext context)
      {
          Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
          this.checkpointStopWatch = new Stopwatch();
          this.checkpointStopWatch.Start();
          return Task.FromResult<object>(null);
      }
    
      async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
      {
          foreach (EventData eventData in messages)
          {
              string data = Encoding.UTF8.GetString(eventData.GetBytes());
    
              Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                  context.Lease.PartitionId, data));
          }
    
          //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
          if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
          {
              await context.CheckpointAsync();
              this.checkpointStopWatch.Restart();
          }
      }
    }
    

    此类由 EventProcessorHost 调用,用于处理从事件中心接收的事件 。This class is called by the EventProcessorHost to process events received from the event hub. SimpleEventProcessor 类使用秒表定期对 EventProcessorHost 上下文调用检查点方法 。The SimpleEventProcessor class uses a stopwatch to periodically call the checkpoint method on the EventProcessorHost context. 此操作确保接收方重启时,其丢失的处理工作不会超过五分钟。This processing ensures that, if the receiver is restarted, it loses no more than five minutes of processing work.

更新 Main 方法以使用 SimpleEventProcessorUpdate the Main method to use SimpleEventProcessor

  1. Program 类中,在文件顶部添加以下 using 语句:In the Program class, add the following using statement at the top of the file:

    using Microsoft.ServiceBus.Messaging;
    
  2. Program 类中的 Main 方法替换为以下代码,从而替换为以前保存的事件中心名称和命名空间级别连接字符串,以及在前面部分复制的存储帐户和密钥。Replace the Main method in the Program class with the following code, substituting the event hub name and the namespace-level connection string that you saved previously, and the storage account and key that you copied in the previous sections.

    static void Main(string[] args)
    {
      string eventHubConnectionString = "{Event Hubs namespace connection string}";
      string eventHubName = "{Event Hub name}";
      string storageAccountName = "{storage account name}";
      string storageAccountKey = "{storage account key}";
      string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);
    
      string eventProcessorHostName = Guid.NewGuid().ToString();
      EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
      Console.WriteLine("Registering EventProcessor...");
      var options = new EventProcessorOptions();
      options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
      eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
    
      Console.WriteLine("Receiving. Press enter key to stop worker.");
      Console.ReadLine();
      eventProcessorHost.UnregisterEventProcessorAsync().Wait();
    }
    
  3. 运行程序,并确保没有任何错误。Run the program, and ensure that there are no errors.

后续步骤Next steps

请阅读以下文章:Read the following articles: