Azure Event Hubs のプログラミング ガイドProgramming guide for Azure Event Hubs

この記事では、Azure Event Hubs を使用してコードを作成する一般的なシナリオについて説明します。This article discusses some common scenarios in writing code using Azure Event Hubs. Event Hubs の予備知識があることを前提としています。It assumes a preliminary understanding of Event Hubs. Event Hub の概要/概念については、「 Event Hubs 概要」を参照してください。For a conceptual overview of Event Hubs, see the Event Hubs overview.

イベント発行元Event publishers

イベントは HTTP POST か AMQP 1.0 接続を使用して、イベント ハブに送信します。You send events to an event hub either using HTTP POST or via an AMQP 1.0 connection. 何をいつ利用するかは、解決対象の具体的なシナリオによります。The choice of which to use and when depends on the specific scenario being addressed. AMQP 1.0 接続は Service Bus の仲介型接続として課金され、頻繁にメッセージ量が多くなり、低遅延の要件があるシナリオに適しています。固定のメッセージング チャンネルが提供されるためです。AMQP 1.0 connections are metered as brokered connections in Service Bus and are more appropriate in scenarios with frequent higher message volumes and lower latency requirements, as they provide a persistent messaging channel.

.NET のマネージド API を使用する場合、Event Hubs にデータを発行するための主なコンストラクトは EventHubClient クラスと EventData クラスになります。When using the .NET managed APIs, the primary constructs for publishing data to Event Hubs are the EventHubClient and EventData classes. EventHubClient は、イベントがイベント ハブに送信されるときに使われる AMQP 通信チャンネルを提供します。EventHubClient provides the AMQP communication channel over which events are sent to the event hub. EventData クラスはイベントを表し、イベント ハブにメッセージを発行するために使用されます。The EventData class represents an event, and is used to publish messages to an event hub. このクラスには、本文、いくつかのメタデータ、イベントに関するヘッダー情報が含まれます。This class includes the body, some metadata, and header information about the event. その他のプロパティは EventData オブジェクトに追加され、イベント ハブに渡されます。Other properties are added to the EventData object as it passes through an event hub.

作業開始Get started

Event Hubs をサポートする .NET クラスが Microsoft.Azure.EventHubs NuGet パッケージ内に用意されています。The .NET classes that support Event Hubs are provided in the Microsoft.Azure.EventHubs NuGet package. Visual Studio ソリューション エクスプローラーまたは Visual Studio の パッケージ マネージャー コンソールを使用してインストールできます。You can install using the Visual Studio Solution explorer, or the Package Manager Console in Visual Studio. これを行うには、 パッケージ マネージャー コンソール のウィンドウに次のコマンドを入力します。To do so, issue the following command in the Package Manager Console window:

Install-Package Microsoft.Azure.EventHubs

イベント ハブの作成Create an event hub

Azure Portal、Azure PowerShell、または Azure CLI を使用して、Event Hubs を作成できます。You can use the Azure portal, Azure PowerShell, or Azure CLI to create Event Hubs. 詳細については、「Azure Portal を使用して Event Hubs 名前空間とイベント ハブを作成する」をご覧ください。For details, see Create an Event Hubs namespace and an event hub using the Azure portal.

Event Hub クライアントの作成Create an Event Hubs client

Event Hubs とやり取りするための主要クラスは Microsoft.Azure.EventHubs.EventHubClient です。The primary class for interacting with Event Hubs is Microsoft.Azure.EventHubs.EventHubClient. 次の例のように、CreateFromConnectionString メソッドを使用してこのクラスをインスタンス化できます。You can instantiate this class using the CreateFromConnectionString method, as shown in the following example:

private const string EventHubConnectionString = "Event Hubs namespace connection string";
private const string EventHubName = "event hub name";

var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
{
    EntityPath = EventHubName

};
eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

イベント ハブにイベントを送信するSend events to an event hub

EventHubClient インスタンスを作成し、それを SendAsync メソッドで送信することで、イベント ハブにイベントを非同期で送信します。You send events to an event hub by creating an EventHubClient instance and sending it asynchronously via the SendAsync method. このメソッドは EventData インスタンス パラメーターを 1 つ受け取り、それをイベント ハブに非同期的に送信します。This method takes a single EventData instance parameter and asynchronously sends it to an event hub.

イベントのシリアル化Event serialization

EventData クラスにはオーバーロードされたコンストラクターが 2 つあります。これらのコンストラクターは、イベント データ のペイロードを表すさまざまなパラメーター、バイト配列、またはバイト配列を受け取ります。The EventData class has two overloaded constructors that take a variety of parameters, bytes or a byte array, that represent the event data payload. JSON と共に EventData を使用するときには、JSON でエンコードされた文字列のバイト配列を取得するのに Encoding.UTF8.GetBytes() を使用できます。When using JSON with EventData, you can use Encoding.UTF8.GetBytes() to retrieve the byte array for a JSON-encoded string. 例:For example:

for (var i = 0; i < numMessagesToSend; i++)
{
    var message = $"Message {i}";
    Console.WriteLine($"Sending message: {message}");
    await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
}

パーティション キーPartition key

注意

パーティションをよく知らない場合は、この記事を参照してください。If you aren't familiar with partitions, see this article.

イベント データを送信するときに、パーティション割り当てを生成するためにハッシュされる値を指定できます。When sending event data, you can specify a value that is hashed to produce a partition assignment. PartitionSender.PartitionID プロパティを使用して、パーティションを指定します。You specify the partition using the PartitionSender.PartitionID property. ただし、パーティションを使用するという決定は、可用性と整合性のどちらを優先するかを選択することを意味します。However, the decision to use partitions implies a choice between availability and consistency.

可用性に関する考慮事項Availability considerations

パーティション キーを使用するかどうかは任意であり、慎重に検討する必要があります。Using a partition key is optional, and you should consider carefully whether or not to use one. イベントを発行するときにパーティション キーを指定しないと、ラウンド ロビン割り当てが使用されます。If you don't specify a partition key when publishing an event, a round-robin assignment is used. 多くの場合、イベントの順序設定が必要であればパーティション キーの使用をお勧めします。In many cases, using a partition key is a good choice if event ordering is important. パーティション キーを使用すると、これらのパーティションでは単一のノードに対する可用性が必要になるため、時間が経つにつれて、コンピューティング ノードの再起動時やパッチ適用時などに障害が発生する可能性があります。When you use a partition key, these partitions require availability on a single node, and outages can occur over time; for example, when compute nodes reboot and patch. そのため、パーティション ID を設定した場合にそのパーティションがなんらかの理由で使用不能になると、そのパーティション内のデータにアクセスできなくなります。As such, if you set a partition ID and that partition becomes unavailable for some reason, an attempt to access the data in that partition will fail. 高可用性がもっとも重要な場合は、パーティション キーを指定しないでください。指定を行うと、以前に説明したラウンドロビン モデルを使用してイベントがパーティションに送信されるようになります。If high availability is most important, do not specify a partition key; in that case events are sent to partitions using the round-robin model described previously. このシナリオでは、可用性 (パーティション ID なし) と整合性 (イベントをパーティション ID に固定) のどちらを優先するかを明確に選択することになります。In this scenario, you are making an explicit choice between availability (no partition ID) and consistency (pinning events to a partition ID).

別の検討事項として、イベントの処理の遅れへの対処があります。Another consideration is handling delays in processing events. 場合によっては、処理が遅れないようにするよりも、データを破棄して再試行した方が良いこともあります。前者では、ダウンストリームの処理がさらに遅れる可能性があります。In some cases, it might be better to drop data and retry than to try to keep up with processing, which can potentially cause further downstream processing delays. たとえば、株式相場表示機では最新のデータが揃うまで待つ方が適切ですが、ライブ チャットや VOIP のシナリオでは不完全でもデータを素早く用意する必要があります。For example, with a stock ticker it's better to wait for complete up-to-date data, but in a live chat or VOIP scenario you'd rather have the data quickly, even if it isn't complete.

こうした可用性に関する考慮事項を踏まえて、これらのシナリオでは次のエラー処理方法のいずれかを選択してください。Given these availability considerations, in these scenarios you might choose one of the following error handling strategies:

  • 停止 (問題が解決するまで Event Hubs からの読み取りを停止する)Stop (stop reading from Event Hubs until things are fixed)
  • 破棄 (重要ではないメッセージを破棄する)Drop (messages aren’t important, drop them)
  • 再試行 (表示されるメッセージが適切になるまで再試行する)Retry (retry the messages as you see fit)

可用性と一貫性の間のトレードオフに関する情報と詳細については、「Event Hubs における可用性と一貫性」を参照してください。For more information and a discussion about the trade-offs between availability and consistency, see Availability and consistency in Event Hubs.

イベントのバッチ送信処理Batch event send operations

イベントをバッチ送信すると、スループット向上の役に立ちます。Sending events in batches can help increase throughput. CreateBatch API を使用して、SendAsync 呼び出し用のデータ オブジェクトを後で追加できるバッチを作成できます。You can use the CreateBatch API to create a batch to which data objects can later be added for a SendAsync call.

単一のバッチは、イベントの 1 MB 制限を超えてはなりません。A single batch must not exceed the 1 MB limit of an event. また、バッチの各メッセージでは同じ発行元 ID が使用されます。Additionally, each message in the batch uses the same publisher identity. バッチが最大イベント サイズを超えないようにすることは送信元の責任となります。It is the responsibility of the sender to ensure that the batch does not exceed the maximum event size. 超えた場合、クライアント 送信 エラーが生成されます。If it does, a client Send error is generated. ヘルパー メソッド EventHubClient.CreateBatch を使用して、バッチが 1 MB を超えないようにします。You can use the helper method EventHubClient.CreateBatch to ensure that the batch does not exceed 1 MB. CreateBatch API から空の EventDataBatch を取得し、TryAdd を使用してイベントを追加し、バッチを構築します。You get an empty EventDataBatch from the CreateBatch API and then use TryAdd to add events to construct the batch.

非同期送信と大規模送信Send asynchronously and send at scale

イベントは、イベント ハブに非同期に送信されます。You send events to an event hub asynchronously. 非同期送信を利用すると、クライアントがイベントを送信できる速度が上がります。Sending asynchronously increases the rate at which a client is able to send events. SendAsyncTask オブジェクトを返します。SendAsync returns a Task object. クライアントで RetryPolicy クラスを使用して、クライアント側の再試行オプションを制御できます。You can use the RetryPolicy class on the client to control client retry options.

イベント コンシューマーEvent consumers

EventProcessorHost クラスは Event Hubs からのデータを処理します。The EventProcessorHost class processes data from Event Hubs. .NET プラットフォームでのイベント リーダーを作成するときには、この実装を使用すべきです。You should use this implementation when building event readers on the .NET platform. EventProcessorHost はイベント プロセッサ実装のためにスレッドセーフでマルチプロセスの安全なランタイム環境を提供します。さらに、その環境では、チェックポイント処理とパーティション リースの管理が提供されます。EventProcessorHost provides a thread-safe, multi-process, safe runtime environment for event processor implementations that also provides checkpointing and partition lease management.

EventProcessorHost クラスを使用するために、IEventProcessor を実装できます。To use the EventProcessorHost class, you can implement IEventProcessor. このインターフェイスには 4 つのメソッドが含まれています。This interface contains four methods:

イベント処理を開始するには、 EventProcessorHostをインスタンス化し、イベント ハブの適切なパラメーターを提供します。To start event processing, instantiate EventProcessorHost, providing the appropriate parameters for your event hub. 例:For example:

注意

EventProcessorHost およびその関連クラスは Microsoft.Azure.EventHubs.Processor パッケージ内に用意されています。EventProcessorHost and its related classes are provided in the Microsoft.Azure.EventHubs.Processor package. この記事の手順に従うか、パッケージ マネージャー コンソールのウィンドウで Install-Package Microsoft.Azure.EventHubs.Processor コマンドを発行して、パッケージをご自分の Visual Studio プロジェクトに追加します。Add the package to your Visual Studio project by following instructions in this article or by issuing the following command in the Package Manager Console window:Install-Package Microsoft.Azure.EventHubs.Processor.

var eventProcessorHost = new EventProcessorHost(
        EventHubName,
        PartitionReceiver.DefaultConsumerGroupName,
        EventHubConnectionString,
        StorageConnectionString,
        StorageContainerName);

次に、RegisterEventProcessorAsync を呼び出して、IEventProcessor の実装をランタイムに登録します。Then, call RegisterEventProcessorAsync to register your IEventProcessor implementation with the runtime:

await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();

この時点で、ホストは「どん欲な」アルゴリズムを利用して、イベント ハブにあるすべてのパーティションでリースの取得を試行します。At this point, the host attempts to acquire a lease on every partition in the event hub using a "greedy" algorithm. これらのリースは一定の期間存続しますが、その後、更新する必要があります。These leases last for a given timeframe and must then be renewed. 新しいノード (この場合は worker インスタンス) がオンラインになると、新しいノードはリースを予約し、時間と共にリースの追加取得を試行し、負荷がノード間を移動します。As new nodes, worker instances in this case, come online, they place lease reservations and over time the load shifts between nodes as each attempts to acquire more leases.

イベント プロセッサ ホスト

時間と共に、均衡が確立されます。Over time, an equilibrium is established. この動的機能により、スケールアップとスケールダウンの両方で、CPU に基づく自動スケールがコンシューマーに適用されます。This dynamic capability enables CPU-based autoscaling to be applied to consumers for both scale-up and scale-down. イベント ハブにはメッセージ カウントの直接的概念がないため、平均的な CPU 利用率が、多くの場合、バックエンドまたはコンシューマー スケールを測定する最良のメカニズムとなります。Because Event Hubs does not have a direct concept of message counts, average CPU utilization is often the best mechanism to measure back end or consumer scale. 発行元がコンシューマーが処理できる数を超えたイベントを発行し始めた場合、コンシューマーの CPU 増加を利用し、worker インスタンス カウントを自動拡張できます。If publishers begin to publish more events than consumers can process, the CPU increase on consumers can be used to cause an auto-scale on worker instance count.

EventProcessorHost クラスは Azure ストレージベースのチェックポイント処理メカニズムも実装します。The EventProcessorHost class also implements an Azure storage-based checkpointing mechanism. このメカニズムはパーティションごとにオフセットを保存します。そのため、各コンシューマーは前回のコンシューマーが保存した内容から、最後のチェックポイントを判断できます。This mechanism stores the offset on a per partition basis, so that each consumer can determine what the last checkpoint from the previous consumer was. パーティションがリースによってノード間を移動するにつれて、負荷移動を円滑にする同期メカニズムとなります。As partitions transition between nodes via leases, this is the synchronization mechanism that facilitates load shifting.

発行元失効Publisher revocation

EventProcessorHost の高度なランタイム機能に加え、Event Hubs は特定の発行元がイベント ハブにイベントを発行するのを防ぐ目的で発行元失効を有効にします。In addition to the advanced run-time features of EventProcessorHost, Event Hubs enables publisher revocation in order to block specific publishers from sending event to an event hub. このような機能は、発行元のトークンが侵害されたり、ソフトウェア更新によって不適切な動作が発生したりする場合に便利です。These features are useful if a publisher token has been compromised, or a software update is causing them to behave inappropriately. そのような状況では、SAS トークンの一部である発行元 ID を利用してイベントの発行をブロックできます。In these situations, the publisher's identity, which is part of their SAS token, can be blocked from publishing events.

発行元失効の詳細のほか、発行元として Event Hubs に送信する方法の詳細については、Event Hubs の大規模で安全な発行に関するサンプルを参照してください。For more information about publisher revocation and how to send to Event Hubs as a publisher, see the Event Hubs Large Scale Secure Publishing sample.

次の手順Next steps

Event Hubs シナリオに関する詳細については、次のリンク先を参照してください。To learn more about Event Hubs scenarios, visit these links: