サブプロトコルを使用して信頼性の高い WebSocket を作成する

間欠的なネットワークの問題が原因で WebSocket クライアント接続が切断されると、メッセージが失われる恐れがあります。 Pub/Sub システムでは、パブリッシャーはサブスクライバーから分離されています。そのためパブリッシャーは、サブスクライバーでの接続の切断またはメッセージの損失を検出できない場合があります。 間欠的なネットワークの問題を克服し、信頼性の高いメッセージ配信を維持することは、クライアントにとって不可欠です。 それを実現するには、リライアブル Azure Web PubSub サブプロトコルを活用して、信頼性の高い WebSocket クライアントを作成します。

リライアブル プロトコル

Web PubSub サービスでは、2 つのリライアブル サブプロトコル (json.reliable.webpubsub.azure.v1protobuf.reliable.webpubsub.azure.v1) がサポートされています。 クライアントでは、信頼性を実現するために、サブプロトコルのパブリッシャー、サブスクライバー、復旧の部分に従う必要があります。 サブプロトコルの正しい実装に失敗すると、メッセージ配信が期待どおりに動作しないか、プロトコル違反が原因でサービスでクライアントを終了する可能性があります。

簡単な方法 - クライアント SDK の使用

信頼性の高いクライアントを作成する最も簡単な方法は、クライアント SDK を使用することです。 クライアント SDK で Web PubSub クライアント仕様を実装し、既定で json.reliable.webpubsub.azure.v1 を使用します。 クイック スタートについては、 クライアント 間の発行/サブスクライブに関するページを参照してください。

難しい方法 - 手作業での実装

以下のチュートリアルでは、Web PubSub クライアント仕様の実装における重要な部分について説明します。 このガイドは、すぐに始めたいと考えているユーザーではなく、信頼性の実現の原則を知りたいユーザーを対象にしています。 すぐに始めるには、クライアント SDK を使用してください。

初期化

リライアブル サブプロトコルを使用するためには、WebSocket 接続の構築時にサブプロトコルを設定する必要があります。 JavaScript では、次のコードを使用できます。

  • Json リライアブル サブプロトコルを使用する。

    var pubsub = new WebSocket(
      "wss://test.webpubsub.azure.com/client/hubs/hub1",
      "json.reliable.webpubsub.azure.v1"
    );
    
  • Protobuf リライアブル サブプロトコルを使用する。

    var pubsub = new WebSocket(
      "wss://test.webpubsub.azure.com/client/hubs/hub1",
      "protobuf.reliable.webpubsub.azure.v1"
    );
    

接続の回復

接続の復旧は信頼性の実現の基礎であり、json.reliable.webpubsub.azure.v1 および protobuf.reliable.webpubsub.azure.v1 プロトコルを使用する場合に実装する必要があります。

Websocket 接続は TCP に依存します。 接続が切断されない場合、メッセージは失われずに、順番に配信されます。 切断された接続でメッセージが失われるのを防ぐために、Web PubSub サービスでは、グループとメッセージの情報を含む接続状態の情報を保持します。 この情報は、接続の復旧時にクライアントを復元するために使用されます

クライアントでリライアブル サブプロトコルを使用してサービスに再接続する際、クライアントは connectionIdreconnectionToken を含んだ Connected メッセージを受信します。 connectionId では、サービスにおける接続のセッションを識別します。

{
  "type": "system",
  "event": "connected",
  "connectionId": "<connection_id>",
  "reconnectionToken": "<reconnection_token>"
}

WebSocket 接続が切断されると、クライアントでは同一のセッションを復元するために、同じ connectionId を使用して再接続を試みる必要があります。 クライアントでサーバーとネゴシエートして access_token を取得する必要はありません。 代わりに、接続を復旧するには、クライアントでサービス ホスト名、connection_idreconnection_token を使用して、WebSocket 接続の要求をサービスに対して直接行う必要があります。

wss://<service-endpoint>/client/hubs/<hub>?awps_connection_id=<connection_id>&awps_reconnection_token=<reconnection_token>

ネットワークの問題がまだ解消していない場合、接続の復旧は失敗する可能性があります。 クライアントでは、次の状況になるまで再接続し続ける必要があります。

  1. WebSocket 接続が状態コード 1008 で閉じる。 この状態コードは connectionId がサービスから削除されたことを意味します。
  2. 復旧の失敗が 1 分を超えて発生し続ける。

公開元

イベント ハンドラーにイベントを送信するか、他のクライアントにメッセージを発行するクライアントをパブリッシャーと呼びます。 パブリッシャーでは、メッセージの発行が成功したかどうかに関する肯定応答を Web PubSub サービスから受信するように、メッセージで ackId を設定する必要があります。

ackId はメッセージの識別子であり、それぞれの新しいメッセージで一意の ID を使用する必要があります。 元の ackId は、メッセージを再送信する際に使用する必要があります。

グループ送信メッセージのサンプル:

{
  "type": "sendToGroup",
  "group": "group1",
  "dataType": "text",
  "data": "text data",
  "ackId": 1
}

確認応答のサンプル:

{
  "type": "ack",
  "ackId": 1,
  "success": true
}

success: true を含む ack 応答が Web PubSub サービスから返された場合、サービスによるメッセージの処理は完了しています。また、クライアントでは、すべてのサブスクライバーにメッセージが配信されると想定して構いません。

サービスで一時的な内部エラーが発生し、メッセージをサブスクライバーに送信できない場合、パブリッシャーは success: false を含む ack を受信します。 パブリッシャーでは、エラーを読み取って、メッセージを再送信するかどうかを判断する必要があります。 メッセージを再送信する場合は、同一の ackId を使用する必要があります。

{
  "type": "ack",
  "ackId": 1,
  "success": false,
  "error": {
    "name": "InternalServerError",
    "message": "Internal server error"
  }
}

Message Failure

WebSocket 接続が切断されたためにサービスの ack 応答が失われた場合、パブリッシャーでは復旧後に同一の ackId を使用してメッセージを再送信する必要があります。 そのメッセージが以前にサービスで処理された場合は、Duplicate エラーを含む ack が送信されます。 パブリッシャーでは、このメッセージの再送信を停止する必要があります。

{
  "type": "ack",
  "ackId": 1,
  "success": false,
  "error": {
    "name": "Duplicate",
    "message": "Message with ack-id: 1 has been processed"
  }
}

Message duplicated

サブスクライバー

イベント ハンドラーまたはパブリッシャーからメッセージを受信するクライアントをサブスクライバーと呼びます。 ネットワークの問題が原因で接続が切断されると、サブスクライバーに送信されたメッセージの数を Web PubSub サービスで把握できなくなります。 サブスクライバーで最後に受信したメッセージを特定するために、サービスでは sequenceId を含むデータ メッセージを送信します。 サブスクライバーではシーケンス ack メッセージで応答します。

シーケンス ACK のサンプル:

{
  "type": "sequenceAck",
  "sequenceId": 1
}

sequenceId は、connection-id セッション内の uint64 の増分値です。 サブスクライバーでは、受信した最大の sequenceId を記録し、より大きな sequenceId を持つメッセージのみを受信して、より小さいか等しい sequenceId を持つメッセージを削除する必要があります。 サブスクライバーでは、既にサブスクライバーで受信済みのメッセージの再配信をサービスでスキップできるように、記録した最大の sequenceId に肯定応答する必要があります。 たとえば、サブスクライバーで sequenceId: 5 を持つ sequenceAck で応答した場合、サービスでは 5 より大きい sequenceId を持つメッセージのみを再送信します。

すべてのメッセージは、WebSocket 接続が切断されるまで、サブスクライバーに配信されます。 sequenceId を使用すると、セッション内の WebSocket 接続の全体においてサブスクライバーで受信したメッセージの数を、サービスで把握できます。 WebSocket 接続の切断後、サービスではサブスクライバーからの肯定応答がないメッセージを再配信します。 このサービスには、限られた数の未確認のメッセージが格納されます。 メッセージの数が制限を超えると、サービスで WebSocket 接続を閉じて、セッションを削除します。 したがってサブスクライバーは、できるだけ速やかに sequenceId に対して肯定応答する必要があります。