Azure Stream Analytics でのクエリの並列処理の活用

この記事では、Azure Stream Analytics で並列処理を活用する方法を示します。 入力パーティションの構成と分析クエリ定義のチューニングによって Stream Analytics ジョブをスケールする方法について説明します。 前提条件として、「ストリーミング ユニットを効率的に使用できるようにジョブを最適化する」で説明されているストリーミング ユニットの概念について理解しておく必要があります。

Stream Analytics ジョブの構成について教えてください。

Stream Analytics のジョブ定義には少なくとも、入力、クエリ、出力が含まれています。 入力は、ジョブがデータ ストリームを読み取る場所です。 クエリは、データ入力ストリームを変換するために使用されます。出力は、ジョブ結果の送信先です。

入力と出力のパーティション

パーティション分割すると、パーティション キーに基づいてデータをサブセットに分割できます。 入力 (たとえば、Event Hubs) がキーによってパーティション分割されている場合、Stream Analytics ジョブに入力を追加するとき、このパーティション キーを指定することを強くお勧めします。 Stream Analytics ジョブのスケーリングでは、入力と出力でパーティションを利用します。 Stream Analytics ジョブでは、さまざまなパーティションを同時に使用し、書き込みを実行できるので、スループットが向上します。

入力

Azure Stream Analytics のすべての入力では、パーティション分割を利用できます。

  • EventHub (互換性レベルが 1.1 以上の場合、PARTITION BY キーワードを使用してパーティション キーを明示的に設定する必要があります)
  • IoT Hub (互換性レベルが 1.1 以上の場合、PARTITION BY キーワードを使用してパーティション キーを明示的に設定する必要があります)
  • BLOB ストレージ

出力

Azure Stream Analytics を使用するときは、出力でパーティション分割を利用できます。

  • Azure Data Lake Storage
  • Azure Functions
  • Azure テーブル
  • Blob Storage (パーティション キーを明示的に設定できます)
  • Cosmos DB (パーティション キーを明示的に設定する必要があります)
  • Event Hubs (パーティション キーを明示的に設定する必要があります)
  • IoT Hub (パーティション キーを明示的に設定する必要があります)
  • Service Bus
  • オプションのパーティション分割を使用した SQL および Azure Synapse Analytics: 詳細については、Azure SQL Database への出力に関するページを参照してください。

Power BI では、パーティション分割がサポートされていません。 ただし、このセクションの説明に従って入力をパーティション分割することはできます

パーティションの詳細については、次の記事をご覧ください。

驚異的並列ジョブ

驚異的並列ジョブは、Azure Stream Analytics において最もスケーラブルなシナリオです。 入力の 1 つのパーティションを、出力の 1 つのパーティションに対するクエリの 1 つのインスタンスに接続します。 この並列処理には次の要件があります。

  1. クエリ ロジックが同じクエリ インスタンスによって処理される同じキーに依存する場合、イベントが入力の同じパーティションに送信されるようにする必要があります。 Event Hubs または IoT Hub の場合、イベント データに PartitionKey 値が設定されている必要があります。 代わりに、パーティション分割された送信元を使用することもできます。 Blob Storage の場合、イベントが同じパーティション フォルダーに送信される必要があります。 たとえば、クエリ インスタンスで userID 別にデータを集計するとき、パーティション キーとして userID を使用し、入力イベント ハブがパーティション分割されます。 ただし、クエリ ロジックで、同じクエリ インスタンスによって処理される同じキーが不要の場合は、この要件を無視してかまいません。 このロジックの例として、単純な select-project-filter クエリがあります。

  2. 次の手順は、クエリをパーティション分割することです。 互換性レベル 1.2 以上 (推奨) のジョブの場合、入力設定でカスタム列をパーティション キーとして指定でき、ジョブは PARTITION BY 句がなくても自動的に並列化されます。 互換性レベル 1.0 または 1.1 のジョブでは、クエリのあらゆる手順で PartitionId 別のパーティションを使用する必要があります。 複数のステップが許可されますが、すべてのステップが同じキーでパーティション分割されている必要があります。

  3. Stream Analytics でサポートされている出力のほとんどでは、パーティション分割が活用されます。 パーティション分割をサポートしていない種類の出力を使用する場合、ジョブは "驚異的並列" になりません。 イベント ハブの出力については、パーティション キー列がクエリで使用されているパーティション キーと同じに設定されていることを確認してください。 詳しくは、「出力」セクションをご覧ください。

  4. 入力パーティションの数が出力パーティションの数と同じである必要があります。 Blob Storage 出力では、パーティションをサポートでき、アップストリーム クエリのパーティション構成を継承します。 Blob Storage のパーティション キーを指定すると、データが入力パーティションごとにパーティション分割されるため、結果も完全に並列になります。 完全な並列ジョブを可能にするパーティション値の例を次に示します。

    • 8 個のイベント ハブ入力パーティションと 8 個のイベント ハブ出力パーティション
    • 8 個のイベント ハブ入力パーティションと Blob Storage 出力
    • 8 個のイベント ハブ 入力パーティションと任意のカーディナリティのカスタム フィールドによってパーティション分割された Blob Storage 出力
    • 8 個の Blob Storage 入力パーティションと Blob Storage 出力
    • 8 個の Blob Storage 入力パーティションと 8 個のイベント ハブ出力パーティション

以下のセクションでは、驚異的並列であるシナリオの例を示します。

単純なクエリ

  • 次の内容を入力します。8 個のパーティションがあるイベント ハブ
  • 出力:8 個のパーティションがあるイベント ハブ ([パーティション キー列] は "PartitionId" を使用するように設定する必要があります)

クエリ:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

このクエリは単純なフィルターです。 そのため、イベント ハブに送信される入力のパーティション分割を気にする必要はありません。 互換性レベルが 1.2 より前のジョブには、上記の要件 2 を満たすために、PARTITION BY PartitionId 句を含める必要があることに注意してください。 出力については、パーティション キーが PartitionId に設定されたイベント ハブ出力をジョブで構成する必要があります。 最後に、入力パーティションと出力パーティションの数が同じであることを確認します。

グループ化キーが含まれたクエリ

  • 次の内容を入力します。8 個のパーティションがあるイベント ハブ
  • 出力:BLOB ストレージ

クエリ:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

このクエリにはグループ化キーが含まれています。 そのため、グループ化されたイベントは、同じイベント ハブ パーティションに送信される必要があります。 この例では TollBoothID でグループ化するので、イベントがイベント ハブに送信されるときに、パーティション キーとして TollBoothID が使用されることを確認する必要があります。 その後 ASA で、PARTITION BY PartitionId を使用して、このパーティション構成から継承し、完全な並列処理を有効にすることができます。 出力は Blob Storage であるため、要件 4. に記載されているように、パーティション キー値の構成を気にする必要はありません。

驚異的並列では "ない" シナリオの例

前のセクションでは、驚異的並列のシナリオをいくつか紹介しました。 このセクションでは、驚異的並列のすべての要件を満たしているわけではないシナリオについて説明します。

パーティション数の不一致

  • 次の内容を入力します。8 個のパーティションがあるイベント ハブ
  • 出力:32 個のパーティションがあるイベント ハブ

入力パーティション数と出力パーティション数が一致しない場合、クエリに関係なく、トポロジは驚異的並列ではありません。 ただし、一部のレベルまたは並列処理を得ることもできます。

パーティション分割されていない出力を使用したクエリ

  • 次の内容を入力します。8 個のパーティションがあるイベント ハブ
  • 出力:Power BI

現在、Power BI 出力ではパーティション分割がサポートされていません。 そのため、このシナリオは驚異的並列ではありません。

PARTITION BY 値が異なる複数ステップのクエリ

  • 次の内容を入力します。8 個のパーティションがあるイベント ハブ
  • 出力:8 個のパーティションがあるイベント ハブ
  • 互換性レベル:1.0 または 1.1

クエリ:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

ご覧のように、2 番目のステップは TollBoothId をパーティション キーとして使用しています。 このステップは最初のステップと異なるので、シャッフルを実行する必要があります。

PARTITION BY 値が異なる複数ステップのクエリ

  • 次の内容を入力します。8 個のパーティションがあるイベント ハブ
  • 出力:8 個のパーティションがあるイベント ハブ ([パーティション キー列] は "TollBoothId" を使用するように設定する必要があります)
  • 互換性レベル - 1.2 以上

クエリ:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

互換性レベル 1.2 以上では、既定で並列クエリの実行が可能です。 たとえば、前のセクションのクエリは、"TollBoothId" 列が入力パーティション キーとして設定されている限りパーティション分割されます。 PARTITION BY PartitionId 句は不要です。

ジョブのストリーミング ユニットの最大数を計算する

Stream Analytics ジョブで使用できるストリーミング ユニットの合計数は、ジョブに定義されたクエリのステップ数と各ステップのパーティション数によって異なります。

クエリでのステップ

1 つのクエリに 1 つ以上のステップを含めることができます。 各ステップは、WITH キーワードで定義されたサブクエリです。 次のクエリの SELECT ステートメントのように、WITH キーワードの外にあるクエリ (1 つのクエリのみ) もステップとしてカウントされます。

クエリ:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

このクエリには 2 つのステップがあります。

注意

このクエリについては、この記事で後ほど詳しく説明します。

ステップをパーティション分割する

ステップをパーティション分割するには、次の条件を満たす必要があります。

  • 入力ソースはパーティション分割する。
  • クエリの SELECT ステートメントは、パーティション分割された入力ソースから読み取る。
  • ステップ内のクエリに PARTITION BY キーワードを含める。

クエリをパーティション分割すると、入力イベントが処理されて個々のパーティション グループに集計され、グループごとに出力イベントが生成されます。 集計を結合する場合は、パーティション分割されていない 2 つ目のステップを集計用に作成する必要があります。

ジョブのストリーミング ユニットの最大数を計算する

パーティション分割されていないステップは、Stream Analytics ジョブの 6 個のストリーミング ユニット (SU) にスケールアップできます。 さらに、パーティション分割されたステップでパーティションごとに 6 SU を追加できます。 いくつかのを次の表に示します。

クエリ ジョブの最大 SU 数
  • クエリに 1 つのステップが含まれている。
  • ステップはパーティション分割されていない。
6
  • 入力データ ストリームは 16 個にパーティション分割されている。
  • クエリに 1 つのステップが含まれている。
  • ステップはパーティション分割されていない。
96 (6 * 16 パーティション)
  • クエリに 2 つのステップが含まれている。
  • どのステップもパーティション分割されていない。
6
  • 入力データ ストリームは 3 つにパーティション分割されている。
  • クエリに 2 つのステップが含まれている。 入力ステップはパーティション分割されているが、2 番目のステップはされていない。
  • SELECT ステートメントはパーティション分割された入力から読み取る。
24 (パーティション分割されたステップ用に 18 + パーティション分割されていないステップ用に 6)

スケーリングの例

次のクエリでは、3 つのブースがある料金所を 3 分間に通過する車の台数を計算します。 このクエリは、最大 6 個の SU にスケールできます。

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

クエリに使用する SU を増やすには、入力データ ストリームとクエリの両方をパーティション分割する必要があります。 データ ストリーム パーティションが 3 に設定されているので、変更を加えた次のクエリを最大 18 個の SU にスケールできます。

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

クエリがパーティション分割されている場合、入力イベントは処理されて個々のパーティション グループに集計されます。 出力イベントは、それぞれのグループに対しても生成されます。 GROUP BY フィールドが入力データ ストリームのパーティション キーでない場合、パーティション分割を実行すると予期しない結果になることがあります。 たとえば、前のクエリの TollBoothId フィールドは Input1 のパーティション キーではありません。 そのため、TollBooth 1 のデータが複数のパーティションに分散される可能性があります。

Input1 の各パーティションは、Stream Analytics によって個別に処理されます。 その結果、同じタンブリング ウィンドウで同じ料金所ブースの複数の通過台数レコードが作成されます。 入力パーティション キーを変更できない場合は、次の例のように、パーティション分割されていないステップを追加してパーティション間の値を集計すると、この問題を解決できます。

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

このクエリは 24 個の SU にスケールできます。

Note

2 つのストリームを結合する場合は、結合に使用する列のパーティション キーでストリームがパーティション分割されていることを確認します。 また、両方のストリームに同じ数のパーティションがあることも確認します。

大規模な高いスループットの実現

大規模な高いスループットを維持するために、驚異的並列ジョブが必要ですが十分ではありません。 すべてのストレージ システムおよびその対応する Stream Analytics 出力では、可能な限り最適な書き込みスループットを実現する方法がそれぞれ異なっています。 あらゆる大規模シナリオと同じようにいくつかの課題があり、それらは適切な構成を使用して解決できます。 このセクションでは、いくつかの一般的な出力の構成について説明し、1 秒あたり 1,000、5,000、および 10,000 のイベントの取り込み率を実現するためのサンプルを提供します。

以下の観察ではステートレス (パススルー) クエリを使用する Stream Analytics ジョブが使用され、これは Event Hub、Azure SQL DB、または Cosmos DB に書き込む基本的な JavaScript UDF です。

イベント ハブ

取り込み率 (1 秒あたりのイベント数) ストリーミング ユニット数 出力リソース
1,000 1 2 TU
5,000 6 6 TU
10,000 12 10 TU

Event Hub ソリューションは、ストリーミング ユニット (SU) とスループットの観点から直線的にスケールするため、Stream Analytics からのデータを分析およびストリーム配信するための最も効率的かつパフォーマンスの高い方法となります。 ジョブは最大 192 SU までスケールアップでき、これは概算して最大 200 MB/秒、あるいは 1 日あたり 19 兆個のイベントの処理に換算されます。

Azure SQL

取り込み率 (1 秒あたりのイベント数) ストリーミング ユニット数 出力リソース
1,000 3 S3
5,000 18 P4
10,000 36 P6

Azure SQL では、パーティション分割の継承と呼ばれる並列書き込みがサポートされていますが、既定では有効になっていません。 ただし、パーティション分割の継承と完全な並列クエリを一緒に有効にしても、高いスループットを実現するには十分でない場合があります。 SQL 書き込みスループットは、データベース構成およびテーブル スキーマに大きく依存します。 SQL 出力パフォーマンスに関する記事に、書き込みスループットを最大限に高めることができるパラメーターの詳細が記載されています。 Azure SQL Database への Azure Stream Analytics 出力に関する記事に記載されているように、このソリューションは 8 個のパーティションを超える完全な並列パイプラインとして直線的にスケールせず、SQL 出力の前に再分割が必要な場合があります (INTO に関する記事を参照してください)。 高い IO 率と、数分おきに発生するログ バックアップからのオーバーヘッドを維持するために、Premium SKU が必要です。

Cosmos DB

取り込み率 (1 秒あたりのイベント数) ストリーミング ユニット数 出力リソース
1,000 3 20,000 RU
5,000 24 60,000 RU
10,000 48 120,000 RU

Stream Analytics から出力される Cosmos DB は、互換性レベル 1.2 のネイティブ統合を使用するように更新されました。 互換性レベル 1.2 は、新しいジョブの既定の互換性レベルである 1.1 に比べてスループットが著しく高く、RU 消費量が低下しています。 ソリューションは /deviceId 上にパーティション分割された CosmosDB コンテナーを使用し、ソリューションの残りの部分は同じように構成されています。

Azure の大規模なストリーミングのサンプルはどれも、負荷をシミュレートするテスト クライアントによってデータが取り込まれるイベント ハブを入力として使用します。 各入力イベントは 1KB の JSON ドキュメントであるため、構成された取り込み率からスループット レートに簡単に換算できます (1MB/秒、5MB/秒、および10MB/秒)。 イベントは、最大 1,000 台のデバイス向けに以下の JSON データ (短縮された形式) を送信する IoT デバイスをシミュレートします。

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Note

構成は、ソリューションで使用されるさまざまなコンポーネントによって変更される可能性があります。 見積もりの精度を高めるには、ご使用のシナリオに合わせてサンプルをカスタマイズしてください。

ボトルネックの特定

Azure Stream Analytics ジョブの [メトリックス] ウィンドウを使用して、パイプラインのボトルネックを特定します。 スループットについての [Input/Output Events](入出力イベント) および [透かしの遅延] または [Backlogged Events](バックログされたイベント) を確認して、ジョブが入力速度に対応しているかどうかを確認します。 イベント ハブのメトリックスについては、 [Throttled Requests] (スロットルされた要求数) を検索し、必要に応じてしきい値ユニットを調整します。 Cosmos DB メトリックスについては、スループットの下の [パーティション キーの範囲ごとの使用された最大 RU/秒] を確認して、パーティション キーの範囲が均一に消費されていることを確認します。 Azure SQL DB については、 [ログ IO] および [CPU] を監視します。

ヘルプの参照

詳細については、Azure Stream Analytics に関する Microsoft QA 質問ページを参照してください。

次のステップ