Microsoft Azure

イベント ストリーム指向のシステムの台頭

Christopher Bennage

最近はデータに関する話題が多くなっています。データによって、情報に基づく意思決定が可能になります。ビッグ データによって、そのような意思決定が的確になります。ビッグ データのストリームを利用すれば、そのように的確な意思決定をタイムリーに行えるようになります。このように途絶えることなく流れるデータのストリームをイベント ストリームと呼びます。そして、ますます一般的になっているのが、こうしたイベント ストリームの処理を主な目的とするソフトウェア システムをビルドすることです。

業界や分野が異なっても、このようなイベント ストリーム指向のシステムには、はっきりとした共通アーキテクチャ パターンがあります。最新のイベント ストリーム指向システムのアーキテクチャ パターンが果たす役割は、従来のオンプレミス エンタープライズ システムの n 層アーキテクチャが果たす基本的な役割と同じです。今回は、この基本パターンを簡単に調べるところから始めます。

パターンについて

まず、イベントという用語の意味をはっきりさせておきましょう。イベントとは、システムで起きていることを知らせる少量のデータにすぎません。バイト単位から KB 単位の比較的小さなサイズのデータです。メッセージやテレメトリ、または単にデータと呼ばれることもあります。

次に、イベントには発生元 (プロデューサー) があります。ほぼすべてのものがイベント プロデューサーになり得ます。ネットに接続可能な自動車、スマート サーモスタット、ゲーム器本体、個人用フィットネス機器、自己診断イベントを生成するソフトウェア システムなどがその例です。重要なのは、大半のイベント ストリーム指向のシステムは多くのイベント プロデューサーに対応する必要があると認識しておくことです。

多くのシステムは、数万~数千万以上の膨大な数のイベント プロデューサーを想定します。つまり、このようなシステムは大容量かつ高速という性質を併せ持つことになります。大容量とは全体として多くのデータに対処することを意味し、高速とはデータが頻繁に生成されることを意味します。

イベントには利用者 (コンシューマー) もいます。イベント ストリーム指向のシステムで実際に中心となるのがこのイベント コンシューマーです。イベント コンシューマーは、イベントの分析、解釈、およびイベントへの対応を行います。代表的なシステムでのコンシューマーの数は、数個~数十個になる可能性があります。イベントが特定のコンシューマーにルーティングされることはありません。各コンシューマーが同じ一連のイベントを受け取ります。Microsoft Azure で考えた場合、クラウドのサービスがコンシューマーに相当します。

ここで例を挙げて考えてみましょう。金融トランザクションを表すイベント ストリームがあるとします。このシナリオでのイベント プロデューサーは、小売店の POS システムです。この状況では、このストリームを分析して不正行為を検出し、警告を発するコンシューマーが考えられます。同じストリームを分析して、ジャストインタイムでサプライチェーンの最適化を行うコンシューマーも考えられます。このイベントを長期間の外部ストレージに保管して、後で分析できるようにするコンシューマーもあります。

大量かつ高速というイベントの性質と、このようなプロデューサーとコンシューマーのパターンを併せて考えると、いくつか興味深い疑問が生じます。

  • コンシューマーがイベント生成急増の影響を受けないようにする方法はあるか。つまり、イベント生成量が利用量を超え場合にどのように対処するか。
  • 高速に発生するイベントを個々のイベント コンシューマーに合わせてスケール変換する方法はあるか。

この問題に対して重要な役割を果たすのがイベント ブローカーです (図 1 参照)。最近リリースされた Microsoft Azure の Event Hubs がまさにこの役割を担当します。

Microsoft Azure の Event Hubs アーキテクチャ
図 1 Microsoft Azure の Event Hubs アーキテクチャ

ここからは、Event Hubs などのブローカーを使用して、上記の問題を実際に解決する方法について説明します。

Event Hubs とは

Event Hubs は、下流のコンシューマーの準備が完了するまで、イベントを取得して保持しておく弾力性を提供します。イベント ストリームの転送レートの変化を Event Hubs が効率的に平準化するため、イベントの急増についてコンシューマーが考慮する必要はありません。このような平準化が行われないと、受信側のコンシューマーに影響が及び、問題が出始めます。

ブローカーを使用することで、イベント プロデューサーとイベント コンシューマーが互いに分離されます。プロデューサーとコンシューマーの間に別の仲介者を必要とする、さらに高度なバージョンのアーキテクチャ パターンでは、このような分離が特に重要です。Event Hubs は、アーキテクチャの中で構成の境界として機能します。Event Hubs を介して相互作用するすべてのコンポーネントは、互いにコンポーネント固有の情報を必要としません。

ここまでの説明では、Event Hubs と、同じような分離を提供する従来のエンタープライズ メッセージング サービスと混同するかもしれません。しかし、いくつか大きな違いがあり、Event Hubs はこのアーキテクチャ パターン向けに最適化されています。

コンシューマーの独立性

Event Hubs では、パブリッシュ/サブスクライブのモデルが使用されます。各コンシューマーは、同じイベント ストリームに対して個別のビューを所持します。複数のコンシューマーがいる従来のメッセージング システムの場合、関係するコンシューマーごとにメッセージがコピーされます。速度と領域の点では非効率になる可能性がありますが、各コンシューマーが独自の "受信トレイ" を所持するメリットがあります。コンシューマーがメッセージを処理すると、受信トレイからそのメッセージが削除されます。コンシューマーはそれぞれ自身の受信トレイに独自のコピーを持つため、別のコンシューマーに影響を与えることはありません。

Event Hubs の場合は、イベントの不変のセットが 1 つあります。不変であるため、各コンシューマーはイベントのコピーが 1 つだけ必要になります。また、コンシューマーがシステムからイベントを削除することはありません。すべてのコンシューマーがイベントの同じセットを見ていることになります。このため、コンシューマーは、イベント ストリーム内でのイベントの現在位置を自身で管理する必要があります。イベントの現在位置は、イベント ストリームのオフセットを追跡することで管理します。実際には、このための API が SDK に組み込まれています。

時間を基準とする保持

従来のメッセージング システムでは、メッセージの処理が完了したタイミングをシステムに伝えるのはコンシューマーの役割です。この通知によって、システムはメッセージを削除できるようになります。Event Hubs のコンシューマーはイベント ストリーム内のイベントの位置をコンシューマー自体で管理するため、コンシューマーがイベントの処理を完了したタイミングを Event Hubs が知る方法はあるでしょうか。端的に言えば、イベント処理の完了は通知されません。Event Hubs を使用する場合はイベントを保持する期間を構成します。イベントはその保持期間格納されます。つまり、イベントの有効期限は独自に管理され、コンシューマーの操作とは無関係です。

時間を基準に保持されることから、コンシューマーはこの保持期間が切れる前にイベントを調べて処理する必要があります。時間を基準とする保持期間があるために、各コンシューマーは時間内に処理しなければならないというプレッシャーを受けます。さいわい、Event Hubs の基になる設計では、個々のコンシューマーが必要に応じてスケールを変換できるようになっています。

Event Hubs では、イベント ストリームを物理的にパーティション分割することで、このスケール変換をサポートします。Event Hubs をプロビジョニングする際にパーティションの数を設定します。詳細については、http://azure.microsoft.com/ja-jp/documentation/services/service-bus/ の公式ドキュメントを参照してください。

イベントは Event Hubs に発行されるときに、パーティションに配置されます。特定のイベントは、1 つのパーティションにしか配置されません。既定では、イベントがラウンドロビン方式で各パーティションに均等に分散されます。このため、パーティションとイベントの関係を提供するメカニズムが用意されています。最も一般的なのはイベントにパーティション キーのプロパティを設定できるようにして、同じキーを持つすべてのイベントが同じパーティションに配布されるようにします。

イベントのストリームをパーティションに分割することが、時間を基準とする保持にどのように役立つのでしょう。Event Hubs を実際に利用する場合、コンシューマー グループという用語が使われます。各コンシューマーは実際には複数のインスタンスから構成されるためコンシューマー グループと呼ばれます。そして各コンシューマー グループを構成する各インスタンスが 1 つのパーティションに対応します。つまり、コンシューマー グループがコンシューマー全体を表し、コンシューマーのインスタンスが特定のパーティションに関係するグループのメンバーを表します。

このため、コンシューマー グループは、ストリームのイベントを並列処理できます。グループ内の各コンシューマー インスタンスは、別のインスタンスとは無関係にパーティションを処理します。コンシューマー インスタンスをすべて 1 台のコンピューターに常駐させ、各インスタンスを互いに独立して実行させることができます。すべてのコンシューマー インスタンスを複数のコンピューターに分散させることも、コンシューマー インスタンスを実行する専用のコンピューターを指定することもできます。このようにして、Event Hubs では、これまでのパターンにまつわるコンシューマーの競合に関する典型的な問題を回避しています。

ここでは、「分離」が重要な考え方になります。まず、イベント プロデューサーとイベント コンシューマーが相互に分離されます。これにより、柔軟なアーキテクチャ構成や、負荷の平準化が可能になります。次に、コンシューマー グループが相互に分離され、グループ間で障害が連鎖する機会を減らしています。さらに、特定のコンシューマー グループ内のインスタンスが相互に分離され、個別のコンシューマー グループ内での水平方向のスケール変換が実現されます。

Event Hubs の使用

Event Hubs に着手するにあたり優れたチュートリアルがいくつかあります。公式ドキュメント (http://azure.microsoft.com/ja-jp/documentation/services/service-bus/) を確認し、利用するプラットフォームにあったチュートリアルを参照してください。

まず、Event Hubs をプロビジョニングする必要があります。手順は簡単です。試用版の Microsoft Azure アカウントを使用して簡単に試すことができます。Microsoft Azure の管理ポータルで、[サービス バス] セクションに移動します。サービス バスの名前空間を作成する必要があります (まだ作成していない場合)。作成後、Event Hubs の作成手順が記載された [イベント ハブ] というタブが表示されます (図 2 参照)。

Event Hubs の作成
図 2 Event Hubs の作成

また、使用開始前に、Event Hubs の共有アクセス ポリシーを設定することも必要です。このようなポリシーによって、Event Hubs のセキュリティを管理します。ポータルで、今作成した Event Hubs に移動し、[構成] タブをクリックします。

アクセス許可の [管理] を選択し、ポリシーに「スーパー」や「運用環境では使用しない」といった名前を付けます。名前を付けた後、[ダッシュボード] タブに切り替えて、画面下の [接続情報] をクリックします。表示される画面で、接続文字列と Event Hubs に付けた名前をメモします。

イベントの作成

以下に示すコードでは、.NET SDK を使用していますが、HTTP または AMQP をサポートする任意のプラットフォームを使用できます。Microsoft Azure Service Bus NuGet パッケージを参照する必要があります。必要なクラスは、Microsoft.ServiceBus.Messaging 名前空間にあります。以下のようにクライアントを作成して、イベントを作成および送信します。

var client = EventHubClient.CreateFromConnectionString (
  connectionString,
  eventHubName);
var body = Encoding.UTF8.GetBytes("My first event");
var eventData = new EventData (body);
await client.SendAsync (eventData);

簡単なコードですが、指摘しておくべき興味深い点がいくつかあります。イベントの本文は単なるバイトの配列です。このようなイベントを処理するすべてのコンシューマー グループは、バイト配列を解釈する方法を理解する必要があります。おそらく、本文のシリアル化解除する方法を判断するためのなんらかのヒントも必要です。イベントを送信する前に、メタデータを添付することができます。

eventData.Properties.Add ("event-type", "utf8string");

プロデューサーとコンシューマー グループの両方が把握しているキーと値を使用します。イベントのセットを同じパーティションに配信する場合は、パーティション キーを設定します。

eventData.PartitionKey = "something-meaningful-to-your-domain";

イベントとパーティションとの間に関係性がなければ、パフォーマンスが向上します。しかし、場合によっては、関連するイベントのセットを 1つのコンシューマー インスタンスにルーティングして処理する必要が生じます。特定のパーティションのイベントは、受信順に受け取ることが保証されます。Event Hubs の異なるパーティションのイベントを受け取る順序を保証する簡単な方法はありません。このようなことから、多くの場合、特定のパーティションとイベントの関係を設定することになります。

たとえば、スマート カーを実現するとしたら、特定の車のイベントはすべて同じパーティションに配布することになるでしょう。このような場合はパーティション キーに車両識別番号 (VIN) を選択します。スマート ビルディングを対象にするシステムでは、おそらく各ビルの中の数百ものデバイスが生成するイベントに対処することになります。このような場合は、パーティション キーにビル自体の ID を使用し、同じビルにあるすべてのデバイスで生成されるすべてのイベントを同じパーティションに配布します。

概して、イベントとパーティションの関係を使用する場合はリスクを伴うため、慎重に使用します。パーティション キーを適切に選択しないと、各パーティションに配布されるイベントが均等ではなくなります。その結果、最終的にはコンシューマー グループでスケーリングの問題が発生することになります。ただし、パーティションの関係のニーズを回避するために、システム設計を何回変更してもかまいません。

イベントの利用

これをすべて管理する方法について心配になるかもしれません。コンシューマー グループでは、イベント ストリームのオフセットを管理しなければなりません。各グループでは、パーティションごとに 1 つのインスタンスを所持する必要があります。さいわいなことに、そのための API があります。

NuGet パッケージ Microsoft Azure Service Bus Event Hub-EventProcessorHost を参照します。必要なクラスは、Microsoft.ServiceBus.Messaging 名前空間にあります。まず、IEventProcessor インターフェイスを 1 つ実装します。

イベント プロセッサーを実装後、EventProcessorHost のインスタンスを作成してイベント プロセッサーを登録します。ホストでは、すべての基本作業が自動的に処理されます。ホストを起動すると、Event Hubs が調査され、パーティションの数が確認されます。その後、利用可能なパーティションごとに、イベント プロセッサーのインスタンスが 1 つ作成されます。

実装する必要があるメソッドは 3 つです。最初の 2 つが OpenAsync メソッドと CloseAsync メソッドです。イベント プロセッサーのインスタンスがパーティションのリースを最初に許可されるときに、ホストが OpenAsync メソッドを呼び出します。つまり、イベント プロセッサーのインスタンスは、コンシューマー グループの対象パーティションのみにアクセスします。同様に、リースを終了するとき、またはシャットダウンされるときに、ホストは CloseAsync を呼び出します。最初は以下のような非常にシンプルな実装から始めます。

public Task OpenAsync(PartitionContext context)
{
  return Task.FromResult(true);
}
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
  return Task.FromResult(true);
}

どちらのメソッドも PartitionContext 引数を受け取ります。残りのメソッドも同様に PartitionContext 引数を受け取ります。イベント プロセッサーにリースされた特定のパーティションの詳細を確認する場合は、この引数を調べることができます。最後のメソッドは、実際にイベントを受信するメソッドです (図 3 参照)。

図 3 イベントを配信する最後のメソッド

public async Task ProcessEventsAsync (PartitionContext context, 
  IEnumerable<EventData> messages)
{
  foreach (var message in messages)
  {
    var eventType = message.Properties["event-type"];
    var bytes = message.GetBytes();
    if (eventType.ToString() == "utf8string") {
      var body = System.Text.Encoding.UTF8.GetString (bytes);
      // Do something interesting with the body
    } else {
      // Record that you don't know what to do with this event
    }
  }
  await context.CheckpointAsync();
  // This is not production-ready code
}

ご覧のとおり、コードは簡単です。列挙可能なイベントのセットを受信して、反復しながら必要な作業を実行します。また、メソッドの最後で context.CheckpointAsync の呼び出しも行っています。これにより、このイベント セットの処理を正常に終了し、チェックポイントを記録することをホストに指示します。チェックポイントは、バッチ内の最終イベントのオフセットです。

この方法により、コンシューマー グループは、パーティションごとにどのイベントの処理が完了したかを管理できます。ホストを開始すると、ホストは利用可能なパーティションのリースの取得を試みます。パーティションの処理が始まると、そのパーティションのチェックポイント情報を調べます。最終チェックポイントのオフセットよりも新しいイベントだけが各プロセッサーに送信されます。

ホストは、コンピューター間の負荷を自動的に平準化する機能も提供します。たとえば、16 個のパーティションを持つ Event Hubs があるとします。つまり、イベント プロセッサーのインスタンスが 16 個 (パーティションごとに 1 個) あります。1 台のコンピューターでホストを実行する場合は、同じコンピューターに 16 個のインスタンスが作成されます。2 台目のコンピューターで別のホストを開始し、そのホストを同じコンシューマー グループに含めた場合、2 つのホストは、2 台のコンピューター間でイベント プロセッサーのインスタンスを均等に分散する処理を開始します。最終的には、1 台のコンピューターあたり 8 個のイベント プロセッサーのインスタンスが存在するようになります。同様に、2 台目のコンピューターをシャットダウンすると、利用できなくなったパーティションが最初のホストに引き継がれます。

IEventProcessor の実装が MyEventProcessor だとします。この場合、ホストのインスタンスは、以下のように簡単に作成できます。

var host = new EventProcessorHost(
  hostName,
  eventHubName,
  consumerGroupName,
  eventHubConnectionString,
  checkpointConnectionString);
await host.RegisterEventProcessorAsync<MyEventProcessor>();

eventHubConnectionString 値および eventHubName 値は、前の例でイベントを送信する際に使用した値と同じです。接続文字列は、必要なものだけに使用を制限する共有アクセス ポリシーと一緒に使用することをお勧めします。

hostName は、EventProcessorHost のインスタンスを特定します。クラスター (複数のコンピューター) でホストを実行する場合は、ホストを実行するコンピューターの ID を反映する名前を指定するようにします。

consumerGroupName 引数は、このホストが表す論理コンシューマー グループを特定します。定数 EventHubConsumerGroup.DefaultGroupName を使用して参照できる、既定のコンシューマー グループがあります。他の名前を使用する場合は、最初にコンシューマー グループをプロビジョニングする必要があります。これを行うには、Microsoft.ServiceBus.NamespaceManager のインスタンスを作成し、CreateConsumerGroupAsync などのメソッドを使用します。

最後に、checkpointConnectionString を使用して、Microsoft Azure Storage のアカウントに接続文字列を提供する必要があります。このストレージ アカウントによって、ホストは、パーティションとイベントのオフセットに関連するすべての状態を管理します。この状態は、調査可能なプレーン テキストとして BLOB に格納されます。

Event Hubs に最初から統合されている Microsoft Azure サービスもあります。Microsoft Azure の Stream Analytics (現在プレビュー段階) では、Event Hubs から送られるイベント ストリームを変換して分析するための SQL に似た宣言型の構文が提供されます。同様に、Event Hubs は、人気急上昇中の Apache Storm を提供し、Microsoft Azure のプレビュー版として HDInsight 経由で利用できるようになっています。

まとめ

今回説明したアーキテクチャ パターンはごく基本的なものです。実際のシステムを実装する場合、検討すべき検討事項は他にも数多くあります。たとえば、高度なセキュリティ、イベント プロデューサーのプロビジョニングと管理、プロトコル変換、送信などを検討します。とは言うものの、Event Hubs などのイベント ブローカーを使用してシステムを構築するのに必要な基本的な考え方は理解できたと思います。


Christopher Bennage は、Microsoft patterns & practices チームの開発者です。彼は、コンピューターを使用してプログラムを作成することを楽しんでいます。

この記事のレビューに協力してくれたマイクロソフト技術スタッフの Mostafa Elhemali および Dan Rosanova に心より感謝いたします。