イベント処理について理解する

完了

Azure Stream Analytics は、ストリーミング データの複合イベント処理と分析のためのサービスです。 Stream Analytics は、次の目的に使用されます。

  • Azure イベント ハブ、Azure IoT Hub、Azure Storage Blob コンテナーなどの "入力" からデータを取り込みます。
  • "クエリ" を使用してデータ値の選択、プロジェクション、集計を行うことでデータを処理します。
  • Azure Data Lake Gen 2、Azure SQL Database、Azure Synapse Analytics、Azure Functions、Azure イベント ハブ、Microsoft Power BI などの "出力" に結果を書き込みます。

入力、クエリ、出力を含む Stream Analytics ジョブを示す図

いったん開始されると、Stream Analytics クエリが連続的に実行され、入力に到着した新しいデータが処理されて、結果が出力に格納されます。

Stream Analytics では、完全に 1 回のイベント処理、および 1 回以上のイベント配信を保証し、イベントが失われないようにしています。 これにはイベントの配信に失敗した場合のために、回復機能が組み込まれています。 また、Stream Analytics には、ご使用のジョブの状態を保持するために組み込みのチェックポイント機能があり、反復可能な結果を提供しています。 Azure Stream Analytics はサービスとしてのプラットフォーム (PaaS) サービスであるため、完全に管理されており、高い信頼性があります。 さまざまなソースと出力先が組み込みで統合されており、柔軟なプログラミング モデルが用意されています。 Stream Analytics エンジンではメモリ内で計算ができるため、高いパフォーマンスを発揮します。

Azure Stream Analytics ジョブとクラスター

Azure Stream Analytics を使用する最も簡単な方法は、Azure サブスクリプションで Stream Analytics "ジョブ" を作成し、その入力と出力を構成して、ジョブでデータの処理に使用されるクエリを定義することです。 クエリは、構造化照会言語 (SQL) 構文を使用して表され、参照値を提供するために複数のデータ ソースから静的参照データを組み込み、入力から取り込まれたストリーミング データと組み合わせることができます。

ストリーム プロセスの要件が複雑な場合、またはリソースを集中的に消費する場合は、ストリーム分析 "クラスター" を作成できます。クラスターは、Stream Analytics ジョブと同じ処理エンジンを基盤として使用しますが、専用のテナント内にあり (したがって、処理は他のユーザーの影響を受けません)、特定のシナリオに適したバランスでスループットとコストを定義できる構成可能なスケーラビリティを備えています。

入力

Azure Stream Analytics では、次の種類の入力からデータを取り込むことができます。

  • Azure Event Hubs
  • Azure IoT Hub
  • Azure BLOB ストレージ
  • Azure Data Lake Storage Gen2

入力は通常、ストリーミング データのソースを参照するために使用されます。これは、新しいイベント レコードが追加されると処理されます。 さらに、静的データの取り込みに使用される "参照" 入力を定義して、リアルタイムのイベント ストリーム データを拡張することもできます。 たとえば、気象局ごとに一意の ID を含むリアルタイム気象観測データのストリームを取り込み、そのデータを、気象局の ID をよりわかりやすい名前に一致させる静的参照入力で拡張することができます。

出力

出力は、ストリーム処理の結果が送信される宛先です。 Azure Stream Analytics では、次の用途に使用できるさまざまな出力がサポートされています。

  • さらに分析するためにストリーム処理の結果を保持する。たとえば、データ レイクやデータ ウェアハウスに読み込みます。
  • データ ストリームのリアルタイムの視覚化を表示する。たとえば、Microsoft Power BI でデータセットにデータを追加します。
  • ダウンストリーム処理のためにフィルター処理または要約されたイベントを生成する。たとえば、ストリーム処理の結果をイベント ハブに書き込みます。

クエリ

ストリーム処理のロジックは、クエリにカプセル化されます。 クエリは、1 つ以上の入力から (FROM) データ フィールドを選択 (SELECT) し、データをフィルター処理または集計し、結果を出力に (INTO) 書き込む SQL ステートメントを使用して定義されます。 たとえば、次のクエリでは、weather-events 入力からのイベントをフィルター処理して、temperature の値が 0 未満のイベントからのデータのみを含め、その結果を cold-temps 出力に書き込みます。

SELECT observation_time, weather_station, temperature
INTO cold-temps
FROM weather-events TIMESTAMP BY observation_time
WHERE temperature < 0

EventProcessedUtcTime という名前のフィールドは、Azure Stream Analytics クエリによってイベントが処理される時刻を定義するために自動的に作成されます。 このフィールドを使ってイベントのタイムスタンプを確認することも、またこの例で示されているように、TIMESTAMP BY 句を使って別の DateTime フィールドを明示的に指定することもできます。 ストリーミング データの読み取り元の入力に応じて、1 つ以上の潜在的なタイムスタンプ フィールドが自動的に作成される場合があります。たとえば、Event Hubs 入力を使用すると、EventQueuedUtcTime という名前のフィールドが生成され、イベント ハブ キューでイベントを受信した時刻が記録されます。

タイムスタンプとして使用されるフィールドは、テンポラル ウィンドウでデータを集計する場合に重要になります。これについては、次に説明します。