アプリケーションの複数のインスタンス間でパーティション負荷のバランスを取る

イベント処理アプリケーションをスケーリングするには、アプリケーションのインスタンスを複数実行し、それらのインスタンス間で負荷のバランスを取ります。 以前のバージョンと非推奨のバージョンでは、EventProcessorHost を使うと、イベントを受信するときに、プログラムの複数のインスタンスとチェックポイント イベントの間で負荷のバランスを取ることができました。 新しいバージョン (5.0 以降) では EventProcessorClient (.NET および Java) または EventHubConsumerClient (Python および JavaScript) を使用して、同じ処理を実行できます。 開発モデルは、イベントを使用することでより簡単に作成できます。 イベント ハンドラーを登録することで、目的のイベントをサブスクライブできます。 古いバージョンのクライアント ライブラリを使用している場合は、.NETJavaPythonJavaScript の各移行ガイドを参照してください。

この記事では、イベント ハブからイベントを読み取るためにクライアント アプリケーションの複数のインスタンスを使用するサンプル シナリオについて説明します。 また、イベント プロセッサ クライアントの機能について詳しく説明します。これにより、一度に複数のパーティションからイベントを受信し、同じイベント ハブとコンシューマー グループを使用する他のコンシューマーと負荷のバランスを取ることができます。

Note

Event Hubs をスケーリングするための鍵となるのは、"パーティション分割されたコンシューマー" のアイデアです。 競合コンシューマー パターンとは対照的に、パーティション分割されたコンシューマー パターンは、競合のボトルネックを除去し、エンド ツー エンドの並列処理を容易にすることによって、高スケールを可能にします。

サンプル シナリオ

シナリオの例として、10 万件の家を監視するホーム セキュリティ企業を考えてみましょう。 この会社では、各家庭に設置された動体検知器、ドアや窓の開閉センサー、ガラス破損検知器などのさまざまなセンサーから常にデータを取得しています。 この会社では、住民がほぼリアルタイムで自宅の様子を監視できる Web サイトを開設しています。

各センサーにより、データがイベント ハブにプッシュされます。 イベント ハブは、16 個のパーティションで構成されます。 使用側では、これらのイベントを読み取り、統合し (フィルター、集計など)、集計をストレージ BLOB にダンプし、ユーザー フレンドリな Web ページに投影できるメカニズムが必要です。

コンシューマー アプリケーション

分散環境でコンシューマーを設計する場合、シナリオで次の要件を扱う必要があります。

  1. スケール: 複数のコンシューマーを作成します。それぞれのコンシューマーは、いくつかの Event Hubs のパーティションからの読み取りの所有権を保持します。
  2. 負荷分散: コンシューマーを動的に増減します。 たとえば、新しいセンサーの種類 (たとえば、一酸化炭素検知器) が各家庭に追加されると、イベントの数が増加します。 その場合は、オペレーター (人間) がコンシューマー インスタンスの数を増やします。 すると、コンシューマーのプールにより、それ自体が所有するパーティションの数を再調整して、新しく追加されたコンシューマーと負荷を共有することができます。
  3. 失敗時のシームレスな再開: ホストとなっている仮想マシンが突然クラッシュしたなどの理由でコンシューマー (コンシューマー A) が失敗した場合、コンシューマー A が所有しているパーティションを他のコンシューマーが選択して続行できます。 また、"チェックポイント" または "オフセット" と呼ばれる継続ポイントは、コンシューマー A が失敗した正確なポイントであるか、その少し前のポイントである必要があります。
  4. イベントの使用: 前の 3 つのポイントではコンシューマーの管理を扱っていますが、イベントを使用して実用的な操作を行うには、コードが必要です。 たとえば、イベントを集計し、BLOB ストレージにアップロードするなどです。

イベント プロセッサまたはコンシューマー クライアント

これらの要件を満たすために独自のソリューションを構築する必要はありません。 この機能は、Azure Event Hubs SDK によって提供されます。 .NET または Java SDK では、イベント プロセッサ クライアント (EventProcessorClient) を使用し、Python と JavaScript SDK で EventHubConsumerClient を使用します。 以前のバージョンの SDK では、イベント プロセッサ ホスト (EventProcessorHost) がこれらの機能をサポートしていました。

ほとんどの運用シナリオでは、イベントの読み取りと処理にイベント プロセッサ クライアントを使用することをお勧めします。 プロセッサ クライアントは、イベント ハブのすべてのパーティションにわたって、パフォーマンスが高く、フォールト トレラントな方法でイベントを処理しながら、その進行状況にチェックポイントを設定する手段を提供するための堅牢なエクスペリエンスを提供することを目的としています。 イベント プロセッサ クライアントは、特定のイベント ハブ用にコンシューマー グループのコンテキスト内で協調的に動作できます。 クライアントは、インスタンスがそのグループに対して使用可能または使用不可能になると、自動的に作業の配布と分散を管理します。

パーティションの所有権

通常、イベント プロセッサ インスタンスは、1 つまたは複数のパーティションからのイベントを所有および処理します。 パーティションの所有権は、イベント ハブとコンシューマー グループの組み合わせに関連付けられているすべてのアクティブなイベント プロセッサ インスタンス間で均等に分散されます。

各イベント プロセッサには一意識別子が与えられ、チェックポイント ストアのエントリを追加または更新することで、パーティションの所有権を要求します。 すべてのイベント プロセッサ インスタンスによって、このストアとの定期的な通信が行われ、自身の処理状態が更新されるとともに、他のアクティブなインスタンスについての学習が行われます。 このデータは、アクティブなプロセッサ間で負荷を分散するために使用されます。 新しいインスタンスは、処理プールに参加してスケール アップできます。 障害またはスケール ダウンによってインスタンスがダウンした場合、パーティションの所有権は、他のアクティブなプロセッサに正常に転送されます。

チェックポイント ストアのパーティションの所有権レコードでは、Event Hubs 名前空間、イベント ハブ名、コンシューマー グループ、イベント プロセッサ識別子 (所有者とも呼ばれます)、パーティション ID、および最終変更時刻が追跡されます。

Event Hubs 名前空間 イベント ハブ名 コンシューマー グループ 所有者 Partition ID 最終変更時刻
mynamespace.servicebus.windows.net myeventhub myconsumergroup 3be3f9d3-9d9e-4c50-9491-85ece8334ff6 0 2020-01-15T01:22:15
mynamespace.servicebus.windows.net myeventhub myconsumergroup f5cc5176-ce96-4bb4-bbaa-a0e3a9054ecf 1 2020-01-15T01:22:17
mynamespace.servicebus.windows.net myeventhub myconsumergroup 72b980e9-2efc-4ca7-ab1b-ffd7bece8472 2 2020-01-15T01:22:10
:
:
mynamespace.servicebus.windows.net myeventhub myconsumergroup 844bd8fb-1f3a-4580-984d-6324f9e208af 15 2020-01-15T01:22:00

各イベント プロセッサ インスタンスは、パーティションの所有権を取得し、最後に認識されたチェックポイントからパーティションの処理を開始します。 プロセッサで障害が発生した場合 (VM がシャットダウンした場合)、他のインスタンスは最終変更時刻を確認することによってこれを検出します。 非アクティブなインスタンスによって以前所有されていたパーティションの所有権を取得しようとする試みは、他のインスタンスによって行われます。 チェックポイント ストアを使用すると、1 つのインスタンスのみがパーティションの所有権の要求に成功することが保証されます。 そのため、特定の時点で、1 つのパーティションからイベントを受け取るプロセッサは最大で 1 つです。

メッセージを受信する

イベント プロセッサを作成するときは、イベントとエラーを処理する関数を指定します。 イベントを処理する関数を呼び出すたびに、特定のパーティションから 1 つのイベントが配信されます。 このイベントの処理はユーザーが行う必要があります。 コンシューマーによってすべてのメッセージが 1 回以上処理されることを確認する場合は、再試行ロジックを含む独自のコードを作成する必要があります。 ただし、有害メッセージについて注意してください。

これは迅速に済ませることをお勧めします。 つまり、できる限り最小限の処理に留めます。 ストレージへの書き込みとルーティングを行う必要がある場合、2 つのコンシューマー グループを使用して 2 つのイベント プロセッサを所有することをお勧めします。

Checkpoint

チェックポイント処理とは、イベント プロセッサがパーティション内の最後に正常に処理されたイベントの位置をマークまたはコミットするために使用する処理です。 通常、チェックポイントのマーク付けはイベントを処理する関数内で実行され、コンシューマー グループ内のパーティションごとに発生します。

イベント プロセッサがパーティションから切断されると、別のインスタンスが、そのコンシューマー グループ内のそのパーティションの最後のプロセッサによって以前にコミットされたチェックポイントからパーティションの処理を再開できます。 プロセッサは接続の際に、このオフセットをイベント ハブに渡して、読み取りを開始する場所を指定します。 このように、チェックポイント処理を使用することで、ダウンストリーム アプリケーションごとにイベントに "完了" のマークを付けると共に、イベント プロセッサがダウンしたときに回復性をもたらすことができます。 このチェックポイント処理で、より小さなオフセットを指定すると、古いデータに戻ることができます。

イベントを処理済みとしてマークするためにチェックポイントが実行されると、チェックポイント ストア内のエントリが、イベントのオフセットとシーケンス番号で追加または更新されます。 ユーザーは、チェックポイントを更新する頻度を決定する必要があります。 正常に処理された各イベントの後に更新すると、基になっているチェックポイント ストアへの書き込み操作がトリガーされるため、パフォーマンスとコストへの影響が生じる可能性があります。 すべての単一イベントをチェックポイント処理することは、キューに格納されたメッセージング パターンを暗示しています。その場合は、イベント ハブよりも Service Bus キューの方がより適切なオプションになる可能性があります。 Event Hubs の背後にあるのは、"1 回以上" 大規模な配信を受ける、という考え方です。 ダウンストリームのシステムにべき等性を持たせることで、同じイベントが複数回受信される結果になるエラーまたは再起動から容易に復旧できます。

チェックポイント ストアとして Azure Blob Storage を使用する場合は、次の推奨事項に従ってください。

  • コンシューマー グループごとに個別のコンテナーを使用します。 同じストレージ アカウントを使用できますが、各グループごとに 1 つのコンテナーを使用します。
  • コンテナーを他の何かに使用しないでください。また、ストレージ アカウントも他の何かに使用しないでください。
  • ストレージ アカウントは、デプロイされたアプリケーションが配置されているのと同じリージョンに存在する必要があります。 アプリケーションがオンプレミスの場合は、可能な中で最も近いリージョンを選択することを試みてください。

Azure portal の [ストレージ アカウント] ページの [Blob service] セクションで、次の設定が無効になっていることを確認してください。

  • 階層型名前空間
  • BLOB の論理的な削除
  • バージョン管理

スレッドの安全性とプロセッサのインスタンス

既定では、イベントを処理する関数は、特定のパーティションに対して順番に呼び出されます。 後続のイベントと同じパーティションからのこの関数に対する呼び出しは、メッセージ ポンプが他のスレッドのバックグラウンドで引き続き実行されるため、バックグラウンドでキューに配置されます。 異なるパーティションからのイベントは同時に処理できるため、パーティションをまたがってアクセスされる共有状態は同期される必要があります。

次のステップ

次のクイック スタートを参照してください。