您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

Azure 事件中心编程指南Programming guide for Azure Event Hubs

本文介绍使用 Azure 事件中心编写代码时的一些常见情况。This article discusses some common scenarios in writing code using Azure Event Hubs. 它假设你对事件中心已有初步的了解。It assumes a preliminary understanding of Event Hubs. 有关事件中心的概念概述,请参阅 事件中心概述For a conceptual overview of Event Hubs, see the Event Hubs overview.

事件发布者Event publishers

使用 HTTP POST 或通过 AMQP 1.0 连接将事件发送到事件中心。You send events to an event hub either using HTTP POST or via an AMQP 1.0 connection. 何时使用哪种方式的选择取决于要解决的特定方案。The choice of which to use and when depends on the specific scenario being addressed. AMQP 1.0 连接计量为服务总线中的中转连接计量,对于经常要以较高的消息量和较低的延迟传送消息的方案,适合选择此方式,因为它们提供持久的消息传递通道。AMQP 1.0 connections are metered as brokered connections in Service Bus and are more appropriate in scenarios with frequent higher message volumes and lower latency requirements, as they provide a persistent messaging channel.

使用 .NET 托管 API 时,用于将数据发布到事件中心的主要构造是 EventHubClientEventData 类。When using the .NET managed APIs, the primary constructs for publishing data to Event Hubs are the EventHubClient and EventData classes. EventHubClient 提供 AMQP 信道,事件将通过该信道发送到事件中心。EventHubClient provides the AMQP communication channel over which events are sent to the event hub. EventData 类表示一个事件,用于将消息发布到事件中心。The EventData class represents an event, and is used to publish messages to an event hub. 此类包括正文、一些元数据和有关事件的标头信息。This class includes the body, some metadata, and header information about the event. 其他属性将在 EventData 对象通过事件中心时添加到该对象。Other properties are added to the EventData object as it passes through an event hub.

开始使用Get started

支持事件中心的 .NET 类在 Microsoft.Azure.EventHubs NuGet 包中提供。The .NET classes that support Event Hubs are provided in the Microsoft.Azure.EventHubs NuGet package. 可以通过 Visual Studio 解决方案资源管理器进行安装,也可以使用 Visual Studio 中的包管理器控制台来进行。You can install using the Visual Studio Solution explorer, or the Package Manager Console in Visual Studio. 为此,请在 “Package Manager Console” 窗口中发出以下命令:To do so, issue the following command in the Package Manager Console window:

Install-Package Microsoft.Azure.EventHubs

创建事件中心Create an event hub

可以使用 Azure 门户、Azure PowerShell 或 Azure CLI 来创建事件中心。You can use the Azure portal, Azure PowerShell, or Azure CLI to create Event Hubs. 有关详细信息,请参阅使用 Azure 门户创建事件中心命名空间和事件中心For details, see Create an Event Hubs namespace and an event hub using the Azure portal.

创建事件中心客户端Create an Event Hubs client

用来与事件中心交互的主类是 Microsoft.Azure.EventHubs.EventHubClientThe primary class for interacting with Event Hubs is Microsoft.Azure.EventHubs.EventHubClient. 可以使用 CreateFromConnectionString 方法实例化此类,如以下示例所示:You can instantiate this class using the CreateFromConnectionString method, as shown in the following example:

private const string EventHubConnectionString = "Event Hubs namespace connection string";
private const string EventHubName = "event hub name";

var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
{
    EntityPath = EventHubName

};
eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

将事件发送到事件中心Send events to an event hub

可通过以下方式将事件发送到事件中心:创建一个 EventHubClient 实例并通过 SendAsync 方法异步发送该实例。You send events to an event hub by creating an EventHubClient instance and sending it asynchronously via the SendAsync method. 此方法采用单个 EventData 实例参数,并以异步方式将其发送至事件中心。This method takes a single EventData instance parameter and asynchronously sends it to an event hub.

事件序列化Event serialization

EventData 类具有两个重载构造函数,这些构造函数采用各种参数、字节或字节数组来表示事件数据有效负载。The EventData class has two overloaded constructors that take a variety of parameters, bytes or a byte array, that represent the event data payload. 将 JSON 与 EventData类结合使用时,可以使用 Encoding.UTF8.GetBytes() 来检索 JSON 编码字符串的字节数组。When using JSON with EventData, you can use Encoding.UTF8.GetBytes() to retrieve the byte array for a JSON-encoded string. 例如:For example:

for (var i = 0; i < numMessagesToSend; i++)
{
    var message = $"Message {i}";
    Console.WriteLine($"Sending message: {message}");
    await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
}

分区键Partition key

备注

如果您不熟悉分区,请参阅这篇文章If you aren't familiar with partitions, see this article.

发送事件数据时,可以指定一个在经哈希处理后生成分区分配的值。When sending event data, you can specify a value that is hashed to produce a partition assignment. 请使用 PartitionSender.PartitionID 属性指定分区。You specify the partition using the PartitionSender.PartitionID property. 但是,决定使用分区意味着在可用性和一致性之间进行选择。However, the decision to use partitions implies a choice between availability and consistency.

可用性注意事项Availability considerations

可以选择使用分区键,应仔细考虑是否使用分区键。Using a partition key is optional, and you should consider carefully whether or not to use one. 如果在发布事件时未指定分区键,则会使用循环分配。If you don't specify a partition key when publishing an event, a round-robin assignment is used. 在许多情况下,如果事件排序较为重要,使用分区键将是一个不错的选择。In many cases, using a partition key is a good choice if event ordering is important. 使用分区键时,这些分区需要单个节点上的可用性,并且可能会随时间推移发生故障;例如,在计算节点重启和修补时。When you use a partition key, these partitions require availability on a single node, and outages can occur over time; for example, when compute nodes reboot and patch. 因此,如果设置了分区 ID,并且由于某种原因该分区变得不可用,则对该分区中的数据的访问尝试会失败。As such, if you set a partition ID and that partition becomes unavailable for some reason, an attempt to access the data in that partition will fail. 如果高可用性是最重要的,请不要指定分区键;在这种情况下,将使用前述的轮循机制模型将事件发送到分区。If high availability is most important, do not specify a partition key; in that case events are sent to partitions using the round-robin model described previously. 在这种情况下,需在可用性(无分区 ID)和一致性(将事件固定到分区 ID)之间做出明确选择。In this scenario, you are making an explicit choice between availability (no partition ID) and consistency (pinning events to a partition ID).

另一个注意事项是处理事件处理中的延迟。Another consideration is handling delays in processing events. 在某些情况下,丢弃数据并重试可能比尝试跟上处理要更好,后者可能会进而导致下游处理延迟。In some cases, it might be better to drop data and retry than to try to keep up with processing, which can potentially cause further downstream processing delays. 例如,在拥有股票行情自动收报机的情况下,最好等待接收到完整的最新数据,但在实时聊天或 VOIP 的情况下,则更希望能快速获得数据,即使数据不完整。For example, with a stock ticker it's better to wait for complete up-to-date data, but in a live chat or VOIP scenario you'd rather have the data quickly, even if it isn't complete.

考虑到这些可用性需求,在这些情况下,可以选择以下错误处理策略之一:Given these availability considerations, in these scenarios you might choose one of the following error handling strategies:

  • 停止(在修复之前停止从事件中心读取)Stop (stop reading from Event Hubs until things are fixed)
  • 删除(消息不重要,将其删除)Drop (messages aren’t important, drop them)
  • 重试(根据需要重试消息)Retry (retry the messages as you see fit)

有关详细信息以及可用性与一致性之间权衡的讨论,请参阅事件中心中的可用性和一致性For more information and a discussion about the trade-offs between availability and consistency, see Availability and consistency in Event Hubs.

批处理事件发送操作Batch event send operations

分批发送事件可有助于提高吞吐量。Sending events in batches can help increase throughput. 可以使用 CreateBatch API 来创建一个批,以便稍后向其添加数据对象进行 SendAsync 调用。You can use the CreateBatch API to create a batch to which data objects can later be added for a SendAsync call.

单个批不能超过事件的 1 MB 限制。A single batch must not exceed the 1 MB limit of an event. 此外,批中的每个消息都要使用相同的发布者标识。Additionally, each message in the batch uses the same publisher identity. 发送者负责确保批不超过最大事件大小。It is the responsibility of the sender to ensure that the batch does not exceed the maximum event size. 如果超过该限制,会生成客户端 Send 错误。If it does, a client Send error is generated. 可以使用帮助器方法 EventHubClient.CreateBatch 来确保批不超过 1 MB。You can use the helper method EventHubClient.CreateBatch to ensure that the batch does not exceed 1 MB. CreateBatch API 获取空的 EventDataBatch,然后使用 TryAdd 添加事件来构造批。You get an empty EventDataBatch from the CreateBatch API and then use TryAdd to add events to construct the batch.

异步发送和按比例发送Send asynchronously and send at scale

请通过异步方式将事件发送到事件中心。You send events to an event hub asynchronously. 以异步方式发送可以增大客户端发送事件的速率。Sending asynchronously increases the rate at which a client is able to send events. SendAsync 返回一个 Task 对象。SendAsync returns a Task object. 可以在客户端上使用 RetryPolicy 类来控制客户端重试选项。You can use the RetryPolicy class on the client to control client retry options.

事件使用者Event consumers

EventProcessorHost 类处理来自事件中心的数据。The EventProcessorHost class processes data from Event Hubs. 在 .NET 平台上构建事件读取者时,应该使用此实现。You should use this implementation when building event readers on the .NET platform. EventProcessorHost 为事件处理器实现提供线程安全、多进程安全的运行时环境,该环境还能提供检查点和分区租用管理。EventProcessorHost provides a thread-safe, multi-process, safe runtime environment for event processor implementations that also provides checkpointing and partition lease management.

若要使用 EventProcessorHost 类,可以实现 IEventProcessorTo use the EventProcessorHost class, you can implement IEventProcessor. 此接口包含四个方法:This interface contains four methods:

若要开始处理事件,请实例化 EventProcessorHost,为事件中心提供适当的参数。To start event processing, instantiate EventProcessorHost, providing the appropriate parameters for your event hub. 例如:For example:

备注

EventProcessorHost 及其相关类在 Microsoft.Azure.EventHubs.Processor 包中提供。EventProcessorHost and its related classes are provided in the Microsoft.Azure.EventHubs.Processor package. 按照此文中的说明或在包管理器控制台窗口中发出以下命令,将包添加到 Visual Studio 项目中:Install-Package Microsoft.Azure.EventHubs.ProcessorAdd the package to your Visual Studio project by following instructions in this article or by issuing the following command in the Package Manager Console window:Install-Package Microsoft.Azure.EventHubs.Processor.

var eventProcessorHost = new EventProcessorHost(
        EventHubName,
        PartitionReceiver.DefaultConsumerGroupName,
        EventHubConnectionString,
        StorageConnectionString,
        StorageContainerName);

然后,调用 RegisterEventProcessorAsync,将 IEventProcessor 实现注册到运行时:Then, call RegisterEventProcessorAsync to register your IEventProcessor implementation with the runtime:

await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();

此时,主机将尝试使用“贪婪”算法获取事件中心内每个分区上的租约。At this point, the host attempts to acquire a lease on every partition in the event hub using a "greedy" algorithm. 这些租用只在指定的时间段内有效,之后必须续订。These leases last for a given timeframe and must then be renewed. 当新节点(本例中的工作线程实例)进入联机状态时,它们将保留租约,以后每次尝试获取更多租约时,负载会在节点之间转移。As new nodes, worker instances in this case, come online, they place lease reservations and over time the load shifts between nodes as each attempts to acquire more leases.

事件处理程序主机

经过一段时间后,就会建立平衡。Over time, an equilibrium is established. 通过这种动态功能,可以向使用者应用基于 CPU 的自动缩放,以实现向上扩展和向下缩减。This dynamic capability enables CPU-based autoscaling to be applied to consumers for both scale-up and scale-down. 由于事件中心没有直接的消息计数概念,平均 CPU 利用率通常是度量后端或使用者规模的最佳机制。Because Event Hubs does not have a direct concept of message counts, average CPU utilization is often the best mechanism to measure back end or consumer scale. 如果发布者开始发布的事件数超过了使用者可以处理的数量,可以使用使用者的 CPU 增大功能来实现工作线程实例数的自动缩放。If publishers begin to publish more events than consumers can process, the CPU increase on consumers can be used to cause an auto-scale on worker instance count.

EventProcessorHost 类还实现了基于 Azure 存储的检查点机制。The EventProcessorHost class also implements an Azure storage-based checkpointing mechanism. 此机制按分区存储偏移量,每个使用者都能确定前一个使用者的最后一个检查点是什么。This mechanism stores the offset on a per partition basis, so that each consumer can determine what the last checkpoint from the previous consumer was. 当分区通过租约在节点之间转移时,正是这个同步机制在促进负载转移。As partitions transition between nodes via leases, this is the synchronization mechanism that facilitates load shifting.

发布者吊销Publisher revocation

除了 EventProcessorHost 的高级运行时功能外,事件中心还支持吊销发布者,以阻止特定发布者向事件中心发送事件。In addition to the advanced run-time features of EventProcessorHost, Event Hubs enables publisher revocation in order to block specific publishers from sending event to an event hub. 当发布者令牌已泄露,或者软件更新导致发布者行为不当时,这些功能很有用。These features are useful if a publisher token has been compromised, or a software update is causing them to behave inappropriately. 在这些情况下,可以阻止发布者的标识(其 SAS 令牌的一部分)发布事件。In these situations, the publisher's identity, which is part of their SAS token, can be blocked from publishing events.

有关发布者吊销以及如何以发布者身份向事件中心发送事件的详细信息,请参阅 Event Hubs Large Scale Secure Publishing(事件中心大规模安全发布)示例。For more information about publisher revocation and how to send to Event Hubs as a publisher, see the Event Hubs Large Scale Secure Publishing sample.

后续步骤Next steps

若要了解有关事件中心方案的详细信息,请访问以下链接:To learn more about Event Hubs scenarios, visit these links: