競合コンシューマー パターン

複数の同時実行コンシューマーが、同じメッセージング チャネルで受信したメッセージを処理できるようにします。 このパターンでは、システムは複数のメッセージを同時に処理して、スループットを最適化し、スケーラビリティと可用性を向上させ、ワークロードのバランスを取ることができます。

コンテキストと問題

クラウドで実行されるアプリケーションには、多数の要求を処理することが求められます。 各要求を同期的に処理するのではなく、アプリケーションがメッセージング システムを介して、要求を非同期で処理する別のサービス (コンシューマー サービス) に要求を渡すのが一般的な手法です。 この方法では、要求が処理されている間に、アプリケーションのビジネス ロジックがブロックされないようにすることができます。

多くの理由から、要求の数が経時的に大きく変わる可能性があります。 ユーザー アクティビティや複数のテナントから集約された要求の急増により、予測不可能なワークロードが発生する場合があります。 ピーク時には、システムは 1 秒あたり何百件もの要求を処理する必要がありますが、他の時間帯は要求数が非常に少ない可能性があります。 さらに、これらの要求を処理するために実行される作業の性質は非常に多様であると考えられます。 コンシューマー サービスのインスタンスを 1 つしか使用していない場合、そのインスタンスが要求で溢れてしまう可能性があります。また、アプリケーションから送信されるメッセージの流入によって、メッセージング システムが過負荷になることもあります。 この変動するワークロードを処理するために、システムはコンシューマー サービスの複数のインスタンスを実行できます。 ただし、各メッセージが 1 つのコンシューマーにのみ配信されるように、これらのコンシューマーを調整する必要があります。 また、コンシューマー間でワークロードを負荷分散して、インスタンスがボトルネックにならないようにする必要があります。

解決策

メッセージ キューを使用して、アプリケーションとコンシューマー サービスのインスタンス間の通信チャネルを実装します。 アプリケーションは要求をメッセージの形でキューに入れ、コンシューマー サービス インスタンスはキューからメッセージを受け取って処理します。 この方法を使用すると、コンシューマー サービス インスタンスの同じプールで、アプリケーションのインスタンスからのメッセージを処理できます。 次の図は、メッセージ キューを使用した複数のサービス インスタンスへの処理の分散を示しています。

メッセージ キューを使用した複数のサービス インスタンスへの処理の分散

このソリューションには次の利点があります。

  • アプリケーション インスタンスから送信される要求量が大幅に変動する場合でも要求を処理できる負荷平準化システムがあります。 キューは、アプリケーション インスタンスとコンシューマー サービス インスタンス間のバッファーとして機能します。 これは、「Queue-based Load Leveling pattern」(キューベースの負荷平準化パターン) で説明されているよう、アプリケーションとサービス インスタンスの両方の可用性と応答性に影響を最小限に抑えるために役立ちます。 一部の実行時間が長い処理が必要なメッセージの処理では、コンシューマー サービスの他のインスタンスによる他のメッセージの同時処理が回避されません。

  • そのため、信頼性が向上します。 プロデューサーがこのパターンを使用するのではなく、コンシューマーと直接通信し、コンシューマーの監視は行わない場合、コンシューマーが失敗したときにメッセージが失われたり、処理に失敗したりする可能性が高くなります。 このパターンでは、メッセージは特定のサービス インスタンスに送信されません。 失敗したサービス インスタンスによってプロデューサーはブロックされず、機能している任意のサービス インスタンスがメッセージを処理できます。

  • コンシューマー間、またはプロデューサー インスタンスとコンシューマー インスタンス間に複雑な調整は必要ありません。 メッセージ キューによって、各メッセージは少なくとも 1 回は配信されます。

  • スケーラブルです。 メッセージ量の変動に応じて、システムはコンシューマー サービスのインスタンス数を動的に増減することができます。

  • メッセージ キューでトランザクションの読み取り操作を提供する場合、回復性を改善できます。 コンシューマー サービス インスタンスがトランザクション操作の一環としてメッセージの読み取りと処理を行い、コンシューマー サービス インスタンスが失敗した場合、このパターンによってメッセージはキューに返され、コンシューマー サービスの別インスタンスで受け取り、処理することができます。

問題と注意事項

このパターンの実装方法を決めるときには、以下の点に注意してください。

  • メッセージの順序付け。 コンシューマー サービス インスタンスでメッセージを受け取る順序は保証されていません。また、メッセージが作成された順序を反映しているとは限りません。 メッセージが処理される順序への依存を排除できるので、メッセージ処理がべき等になるようにシステムを設計します。 詳細については、Jonathan Oliver のブログ「Idempotency Patterns」(べき等パターン) を参照してください。

    Microsoft Azure Service Bus Queues は、メッセージ セッションを使用して、保証された先入れ先出しの順序を実装することができます。 詳細については、「セッションを使用するメッセージング パターン」を参照してください。

  • 回復性に対応するサービスの設計。 失敗したサービス インスタンスを検出して再起動するようにシステムを設計する場合、必要に応じて、サービス インスタンスがべき等操作として実行する処理を実装し、単一のメッセージが複数回取得および処理される影響を最小限に抑えます。

  • 有害メッセージの検出。 不適切な形式のメッセージ、または使用できないリソースにアクセスする必要があるタスクによって、サービス インスタンスが失敗する可能性があります。 システムでこのようなメッセージがキューに返されないように防ぎ、代わりにこれらのメッセージをキャプチャしてどこか別の場所に格納して、必要に応じて分析できるようにすることをお勧めします。

  • 結果の処理。 メッセージを処理するサービス インスタンスは、メッセージを生成するアプリケーション ロジックから完全に切り離されているので、直接通信できない可能性があります。 サービス インスタンスから、アプリケーション ロジックに戻す必要がある結果が生成される場合、この情報は、両方からアクセスできる場所に保存する必要があります。 アプリケーション ロジックで不完全なデータが取得されないようにするために、システムで処理が完了したときを示す必要があります。

    Azure を使用している場合、ワーカー プロセスで専用のメッセージ返信キューを使用することで、アプリケーション ロジックに結果を戻すことができます。 アプリケーション ロジックで、このような結果を元のメッセージと関連付けられる必要があります。 このシナリオの詳細については、「Asynchronous Messaging Primer」(非同期メッセージングの基本) を参照してください。

  • メッセージング システムのスケーリング。 大規模なソリューションの場合、1 つのメッセージ キューが大量のメッセージで一杯になり、システムのボトルネックになる可能性があります。 このような場合は、メッセージング システムをパーティション分割し、特定のプロデューサーからのメッセージを特定のキューに送信するか、負荷分散を使用して、複数のメッセージ キュー全体にメッセージを分散させることを検討します。

  • メッセージング システムの信頼性を確保。 信頼できるメッセージング システムは、アプリケーションがメッセージをキューに格納した後に、メッセージが失われないことを保証する必要があります。 これは、すべてのメッセージを少なくとも 1 回配信するために重要です。

このパターンを使用する状況

このパターンは次の状況で使用します。

  • アプリケーションのワークロードは、非同期に実行できる複数のタスクに分割されます。
  • タスクは独立しており、並列して実行できます。
  • 作業量の変動が大きい場合、スケーラブルなソリューションが必要です。
  • ソリューションは高可用性を提供する必要があります。また、タスクの処理が失敗した場合に回復できる必要があります。

このパターンが適さない状況

  • アプリケーションのワークロードを個別のタスクに分離することが容易ではない場合、またはタスク間の依存度が高い場合。
  • タスクを同期して実行する必要があり、アプリケーション ロジックで 1 つのタスクが完了するまで待ってから続行する必要がある場合。
  • 特定の順序でタスクを実行する必要がある場合。

一部のメッセージング システムは、プロデューサーがメッセージをグループ化し、そのすべてを同じコンシューマーが処理するように確保するセッションをサポートしています。 このメカニズムを優先度が付けられたメッセージ (優先度付けがサポートされている場合) に使用して、プロデューサーから単一のコンシューマーに対して順番にメッセージを配信するメッセージの順序付けのフォームを実装することができます。

Azure には、Service Bus キューと Azure 関数キュー トリガーが用意されています。これらは、組み合わされるとこのクラウド設計パターンの直接実装となります。 Azure Functions はトリガーとバインドを使用して Azure Service Bus と統合されます。 Service Bus と統合すると、発行元によって送信されるキュー メッセージを利用する関数を構築できます。 発行元のアプリケーションはメッセージをキューに投稿します。(Azure 関数として実装される) コンシューマーは、そのキューからメッセージを取得して処理できます。

Service Bus キューでは回復性のために、コンシューマーがキューからメッセージを取得するときに PeekLock モードを使用できます。このモードでは、メッセージが実際に削除されるのではなく、他のコンシューマーに対して非表示にされるだけです。 Azure Functions Runtime は PeekLock モードでメッセージを受け取ります。関数が成功するとメッセージで完了が呼び出され、関数が失敗すると破棄が呼び出されます。メッセージは再び表示され、別のコンシューマーが取得できるようになります。 関数が実行されている限り、関数の実行時間が PeekLock タイムアウトよりも長くなると、ロックが自動的に更新されます。

Azure 関数はキューの深さに基づいてスケールアウト/インでき、すべてがキューの競合するコンシューマーとして動作します。 関数の複数のインスタンスが作成されると、メッセージのプルと処理が個別に行われて、それらすべてが競合します。

Azure Service Bus キューの使用の詳細については、「Service Bus のキュー、トピック、サブスクリプション」を参照してください。

キューによってトリガーされる Azure 関数については、「Azure Functions の Azure Service Bus トリガー」を参照してください。

次のコードは、新しいメッセージを作成し、QueueClient インスタンスを使用して Service Bus キューに送信する方法を示しています。

private string serviceBusConnectionString = ...;
...

  public async Task SendMessagesAsync(CancellationToken  ct)
  {
   try
   {
    var msgNumber = 0;

    var queueClient = new QueueClient(serviceBusConnectionString, "myqueue");

    while (!ct.IsCancellationRequested)
    {
     // Create a new message to send to the queue
     string messageBody = $"Message {msgNumber}";
     var message = new Message(Encoding.UTF8.GetBytes(messageBody));

     // Write the body of the message to the console
     this._logger.LogInformation($"Sending message: {messageBody}");

     // Send the message to the queue
     await queueClient.SendAsync(message);

     this._logger.LogInformation("Message successfully sent.");
     msgNumber++;
    }
   }
   catch (Exception exception)
   {
    this._logger.LogException(exception.Message);
   }
  }

次のコード サンプルは、メッセージ メタデータを読み取って Service Bus キュー メッセージをログに記録するコンシューマー (C# Azure 関数として記述されます) を示しています。 それを Service Bus キューにバインドするために ServiceBusTrigger 属性がどのように使用されるかに注目してください。

[FunctionName("ProcessQueueMessage")]
public static void Run(
    [ServiceBusTrigger("myqueue", Connection = "ServiceBusConnectionString")]
    string myQueueItem,
    Int32 deliveryCount,
    DateTime enqueuedTimeUtc,
    string messageId,
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function consumed message: {myQueueItem}");
    log.LogInformation($"EnqueuedTimeUtc={enqueuedTimeUtc}");
    log.LogInformation($"DeliveryCount={deliveryCount}");
    log.LogInformation($"MessageId={messageId}");
}

このパターンを実装する場合は、次のパターンとガイダンスが関連している可能性があります。

  • 非同期メッセージングの基本。 メッセージ キューは、非同期通信メカニズムです。 コンシューマー サービスがアプリケーションに返信を送信する必要がある場合、状況に応じて何らかの形式の応答メッセージングを実装します。 「Asynchronous Messaging Primer」(非同期メッセージングの基本) では、メッセージ キューを使用して要求/返信メッセージングを実装する方法が説明されています。

  • 自動スケール ガイダンス。 キュー アプリケーションの投稿メッセージの長さは変動するので、コンシューマー サービスのインスタンスを開始および停止できることがあります。 自動スケールは、ピーク時処理中のスループットの維持に役立ちます。

  • Compute Resource Consolidation パターン。 複数のインスタンスのコンシューマー サービスを 1 つのプロセスに統合して、コストと管理のオーバーヘッドを軽減できることがあります。 「Compute Resource Consolidation」(コンピューティング リソース統合パターン) では、この手法に従う場合の利点とトレードオフについて説明しています。

  • キュー ベースの負荷平準化パターン。 メッセージ キューを導入すると、システムに回復性が加わり、アプリケーション インスタンスからの変動が大きい要求量をサービス インスタンスで処理できるようになります。 メッセージ キューはバッファーとして機能し、負荷が平準化されます。 「Queue-based Load Leveling pattern」(キューベースの負荷平準化パターン) では、このスキーマについて詳しく説明しています。