Azure Stream Analytics を使用したコードなしストリーム処理 (プレビュー)

Azure Stream Analytics を使用して、Azure Event Hubs でリアルタイムのデータ ストリームを処理できます。 コード エディターなしを使用すると、1 行のコードを記述せずに Stream Analytics ジョブを簡単に開発できます。 数分で、次のような多くのシナリオに取り組むジョブを開発して実行できます。

  • Azure Synapse SQL へのフィルター処理と取り込み
  • Azure Data Lake Storage Gen2 での Parquet 形式での Event Hubs データのキャプチャ
  • Azure Cosmos DB でのデータの具体化

このエクスペリエンスでは、入力ソースに接続してストリーミング データをすばやく確認できるキャンバスが提供されます。 その後、Azure で選択した宛先に書き込む前に変換できます。

次の操作を行うことができます。

  • 入力スキーマを変更する
  • 結合やフィルターなどのデータ準備操作を実行する
  • グループ化操作のためのタイム ウィンドウ集計 (タンブリング ウィンドウ、ホッピング ウィンドウ、セッション ウィンドウ) などの高度なシナリオに取り組む

Stream Analytics ジョブを作成して実行すると、運用ワークロードを簡単に運用化できます。 監視とトラブルシューティングのために適切な組み込みメトリックのセットを使用します。 Stream Analytics ジョブは、実行中の価格モデルに従って課金されます。

前提条件

コード エディターなしを使用して Stream Analytics ジョブを開発する前に、これらの要件を満たす必要があります。

  • Azure Event Hubs 名前空間と、書き込むターゲットの宛先リソースは、パブリックにアクセスでき、Azure Virtual Network 内に存在することはできません。
  • ストリーミング入力リソースおよび出力リソースにアクセスするには、必要なアクセス許可がなければなりません。
  • Azure Stream Analytics リソースを作成および変更するには、アクセス許可を維持する必要があります。

Azure Stream Analytics ジョブ

Stream Analytics ジョブは、ストリーミング入力変換出力の 3 つの主要コンポーネントに基づいて構築されています。 複数の入力、複数の変換を使用した並列分岐、複数の出力など、必要な数のコンポーネントを含めることができます。 詳細については、Azure Stream Analytics のドキュメントを参照してください。

コード エディターなしを使用して Stream Analytics ジョブを簡単に作成するには、Event Hubs インスタンスを開きます。 [データの処理] を選択し、任意のテンプレートを選択します。

Screenshot showing navigation to create a new Stream Analytics job.

次のスクリーンショットは、完了済みの Stream Analytics ジョブを示しています。 作成中に使用できるすべてのセクションが強調表示されます。

Screenshot showing the authoring interface sections.

  1. リボン - リボンでは、セクションはクラシック/分析プロセスの順序に従います。Event Hubs は、入力 (データソースとも呼ばれる)、変換 (ETL 操作のストリーミング)、出力、進行状況を保存するためのボタン、およびジョブを開始するためのボタンとして機能します。
  2. [ダイアグラム] ビュー - 入力から操作、出力まで、Stream Analytics ジョブをグラフィカルに表現します。
  3. 作業ウィンドウ - [ダイアグラム] ビューで選択したコンポーネントに応じて、入力、変換、または出力を変更するための設定があります。
  4. データのプレビュー、作成エラー、ランタイム エラーのタブ - 表示されているタイルごとに、データのプレビューには、その手順の結果が表示されます (入力の場合はライブ、変換と出力の場合はオンデマンド)。 このセクションでは、開発中のジョブで発生するおそれのある作成エラーまたは警告の概要も表示されます。 各エラーまたは警告を選択すると、その変換が選択されます。

ストリーミング入力としての Event Hubs

Azure Event Hubs は、ビッグ データのストリーミング プラットフォームであり、イベント インジェスト サービスです。 1 秒間に何百万ものイベントを受信して処理することができます。 イベント ハブに送信されたデータは、任意のリアルタイム分析プロバイダーやバッチ処理/ストレージ アダプターを使用して、変換および保存できます。

ジョブの入力としてイベント ハブを構成するには、[イベント ハブ] 記号を選択します。 構成および接続用の作業ウィンドウを含むタイルが [ダイアグラム] ビューに表示されます。

Event Hubs の資格情報を設定し、 [接続] を選択した後、フィールド名がわかっている場合は [+ フィールドの追加] を使用して手動でフィールドを追加することができます。 代わりに受信メッセージのサンプルに基づいてフィールドとデータ型を自動的に検出するには、 [フィールドの自動検出] を選択します。 必要に応じて、歯車記号を選択して、資格情報を編集できます。 Stream Analytics ジョブでフィールドが検出されると、それらが一覧に表示されます。 ダイアグラム ビューの下にある [データのプレビュー] テーブルには、受信メッセージのライブ プレビューも表示されます。

各フィールドの横にある 3 つのドット記号を選択して、いつでもフィールド名を編集したり、データ型を削除または変更したりすることができます。 さらに、次の図に示すように、受信メッセージから、入れ子になったフィールドを展開、選択、および編集することもできます。

Screenshot showing Event Hub fields where you add, remove, and edit the fields.

使用できるデータの種類は次のとおりです。

  • DateTime - ISO 形式の日付と時刻フィールド
  • Float - 10 進数
  • Int - 整数
  • Record - 複数のレコードを含む入れ子になったオブジェクト
  • String - テキスト

変換

ストリーミング データの変換はバッチ データの変換とは本質的に異なります。 ほとんどすべてのストリーミング データには時間コンポーネントがあり、関連するデータの準備タスクに影響を与えます。

ストリーミング データ変換をジョブに追加するには、その変換のリボンにある変換記号を選択します。 それぞれのタイルがダイアグラム ビューにドロップされます。 それを選択すると、その変換用の作業ウィンドウが表示され、構成することができます。

Assert

フィルター変換を使用して、入力のフィールドの値に基づいてイベントをフィルター処理します。 データ型 (数値またはテキスト) によっては、変換で、選択された条件に一致する値が保持されます。

Screenshot showing the Filter event fields view.

注意

すべてのタイル内には、変換の準備を完了するために他に何が必要であるかについて情報が表示されます。 たとえば、新しいタイルを追加すると、Set-up required というメッセージが表示されます。 ノード コネクタがない場合は、[エラー] または [警告] メッセージが表示されます。

フィールドの管理

フィールドの管理変換では、入力または別の変換から受信するフィールドの追加、削除、または名前の変更を行うことができます。 作業ウィンドウの設定で、 [フィールドの追加] を選択して新しいフィールドを追加するか、またはすべてのフィールドを一度に追加するかを選択できます。

Screenshot showing the Manage fields view.

ヒント

タイルを構成すると、ダイアグラム ビューで、タイル自体の設定を簡単に確認できます。 たとえば、前の画像の [フィールドの管理] 領域では、最初の 3 つのフィールドが管理されており、それらに新しい名前が割り当てられているのがわかります。 各タイルには、それに関連する情報が含まれます。

Aggregate

集計変換を使用すると、一定期間に新しいイベントが発生するたびに、集計 ( [合計][最小][最大] 、または [平均] ) を計算できます。 この操作により、データ内の他のディメンションに基づいて集計のフィルター処理またはスライスを行うこともできます。 同じ変換に 1 つ以上の集計を含めることができます。

集計を追加するには、変換記号を選択します。 次に、入力を接続し、集計を選択して、フィルターまたはスライス ディメンションを追加し、集計を計算する期間を選択します。 この例では、過去 10 秒間の通行料金の合計を、車両の出発地の州別に計算します。

Screenshot showing the Aggregate view.

同じ変換に別の集計を追加するには、 [集計関数の追加] を選択します。 フィルターまたはスライスは、変換内のすべての集計に適用されることに注意してください。

Join

結合変換を使用することにより、選択したフィールドのペアに基づいて、2 つの入力からのイベントを組み合わせます。 フィールドのペアを選択しない場合、結合は、既定で時間に基づきます。 この既定によって、この変換をバッチ変換とは異なるものにしています。

通常の結合と同様に、結合ロジックにはさまざまなオプションがあります。

  • 内部結合 - ペアが一致する両方のテーブルからのレコードのみが含まれます。 この例では、それはライセンス プレートが両方の入力と一致している場所になります。
  • 左外部結合 - 左側 (最初) のテーブルからのすべてのレコードと、フィールドのペアと一致する、2 番目のテーブルからのレコードのみが含まれます。 一致するものがない場合、2 番目の入力からのフィールドは空白になります。

結合の種類を選択するには、作業ウィンドウで、優先する種類の記号を選択します。

最後に、結合を計算する期間を選択します。 この例では、結合は、過去 10 秒間を対象とします。 期間が長くなるほど、出力の頻度が少なくなり、変換に使用される処理リソースが増えることに注意してください。

既定では、両方のテーブルのすべてのフィールドが含まれます。 出力でプレフィックスの left (最初のノード) と right (2 番目のノード) は、ソースを区別するのに役立ちます。

Screenshot showing the Join view.

グループ化

グループ化変換を使用して、特定の時間ウィンドウ内のすべてのイベントの集計を計算します。 1 つまたは複数のフィールドの値でグループ化することができます。 これは、集計変換に似ていますが、より多くの集計のためのオプションが用意されています。 より複雑な時間ウィンドウ オプションも含まれています。 さらに、集計と同様に、変換あたり複数の集計を追加できます。

変換で使用できる集計は次のとおりです。

  • Average
  • Count
  • [最大]
  • 最小
  • パーセンタイル (連続および不連続)
  • Standard Deviation
  • Sum
  • Variance

この変換を構成するには:

  1. 優先する集計を選択します。
  2. 集計するフィールドを選択します。
  3. 別のディメンションまたはカテゴリに対する集計計算を取得する場合は、オプションのグループ化フィールドを選択します。 たとえば、[状態] です。
  4. 時間ウィンドウの関数を選択します。

同じ変換に別の集計を追加するには、 [集計関数の追加] を選択します。 [グループ化] フィールドとウィンドウ関数は、変換内のすべての集計に適用されることに注意してください。

Screenshot showing the Group by view.

時間ウィンドウの終了のタイム スタンプは、参照のために変換出力の一部として提供されます。 Stream Analytics ジョブでサポートされる時間枠の詳細については、「ウィンドウ関数 (Azure Stream Analytics)」を参照してください。

Union

和集合変換を使用して、2 つ以上の入力を接続して、共有フィールド (同じ名前とデータ型を持つ) を含むイベントを 1 つのテーブルに追加します。 一致しないフィールドは削除され、出力に含まれません。

Expand

配列の展開とは、配列内の値ごとに新しい行を作成することです。

Screenshot showing the Expand view.

ストリーミング出力

現在、コードなしのドラッグ アンド ドロップ エクスペリエンスでは、処理されたリアルタイム データを格納するための 3 つの出力がサポートされています。

Screenshot showing Streaming output options.

Azure Data Lake Storage Gen2

Data Lake Storage Gen2 によって、Azure Storage は、Azure 上にエンタープライズ データ レイクを構築するための基盤となります。 それは、数百ギガビットのスループットを維持しながら、数ペタバイトの情報を提供するように開始から設計されています。 これにより、大量のデータを簡単に管理できます。 Azure Blob Storage を使用すると、大量の非構造化データをクラウドに保存する場合に、コスト効果の高いスケーラブルなソリューションを実現できます。

Stream Analytics ジョブの出力として ADLS Gen2 を選択し、ジョブの出力を送信するコンテナーを選択します。 Stream Analytics ジョブの Azure Data Lake Gen2 出力の詳細については、「Azure Stream Analytics からの BLOB ストレージと Azure Data Lake Gen2 出力」を参照してください。

Azure Synapse Analytics

Azure Stream Analytics ジョブを使用すると、Azure Synapse Analytics 内の専用 SQL プール テーブルに出力でき、最大 200 MB/秒のスループット レートを処理できます。これにより、レポートやダッシュボードなどのワークロードに対して、最も要求の厳しいリアルタイム分析とホットパス データ処理がサポートされます。

重要

Stream Analytics ジョブに出力として専用 SQL プール テーブルを追加するには、事前にそれが存在している必要があります。 テーブルのスキーマを、使用するジョブの出力内のフィールドとその型と一致させる必要があります。

Stream Analytics ジョブの出力として Synapse を選択し、ジョブの出力を送信する SQL プール テーブルを選択します。 Stream Analytics ジョブの Synapse 出力の詳細については、「Azure Stream Analytics からの Azure Synapse Analytics 出力」を参照してください。

Azure Cosmos DB

Azure Cosmos DB はグローバル分散型データベース サービスです。世界各地に対応する、制限のないエラスティックなスケーリング、スキーマに依存しないデータ モデルでの豊富なクエリと自動インデックス作成機能を提供します。

Stream Analytics ジョブの出力として [CosmosDB] を選択します。 Stream Analytics ジョブの Cosmos DB 出力の詳細については、「Azure Stream Analytics からの Azure Cosmos DB 出力」を参照してください。

データのプレビューとエラー

コードなしドラッグ アンド ドロップ エクスペリエンスには、ストリーミング データの分析パイプラインの作成、トラブルシューティング、パフォーマンスの評価を行うのに役立つツールが用意されています。

入力のライブ データ プレビュー

イベント ハブに接続し、ダイアグラム ビュー ([データ プレビュー] タブ) でそのタイルを選択すると、次のすべての条件が当てはまる場合に、受信するデータのライブ プレビューが表示されます。

  • データがプッシュされている。
  • 入力が正しく構成されている。
  • フィールドが追加された。

次のスクリーンショットに示すように、特定のものを表示またはドリルダウンする場合は、プレビューを一時停止することができます (1)。 または、完了したら、再度開始できます。

また、特定のレコード (テーブル内の セル) の詳細を、それを選択し、[詳細の表示]/[詳細の非表示] を選択して表示することもできます (2)。 スクリーンショットに、レコードの入れ子になったオブジェクトの詳細ビューが示されています。

Screenshot showing the Data Preview tab where you can pause the streaming preview and show/hide details.

変換および出力の静的プレビュー

ダイアグラム ビューでステップを追加して設定したら、[静的プレビューの取得] を選択して、それらの動作をテストできます。

Screenshot showing the Get static preview option.

その後、Stream Analytics ジョブはすべての変換と出力を評価して、それらが正しく構成されていることを確認します。 次に、Stream Analytics により、次の図に示すように、結果が静的データ プレビューに表示されます。

Screenshot showing the Data Preview tab where you can refresh the static preview.

プレビューを更新するには、 [静的プレビューの更新] を選択します (1)。 プレビューを更新すると、Stream Analytics ジョブは入力から新しいデータを受け取り、すべての変換を評価します。 その後、実行した可能性のある更新プログラムが再度出力されます。 [詳細の表示]/[詳細の非表示] オプションも使用できます (2)。

作成エラー

作成エラーや警告がある場合は、次のスクリーンショットに示すように、[作成エラー] タブにそれらが一覧表示されます。 この一覧には、エラーや警告の詳細、カードの種類 (入力、変換、または出力)、エラー レベル、エラーまたは警告の説明が含まれます。

Screenshot showing the Authoring errors tab that shows a list of example errors.

実行時エラー

ランタイム エラーは、警告/エラー/重大レベルのエラーです。 これらのエラーは、トラブルシューティングのために Stream Analytics ジョブ トポロジ/構成を編集する場合に役立ちます。 次のスクリーンショットの例では、ユーザーが正しくないテーブル名で Synapse 出力を構成しています。 ユーザーがジョブを開始しましたが、出力テーブルのスキーマ定義が見つからないことを示すランタイム エラーがあります。

Screenshot showing the Runtime errors tab where you can select a timespan to filter error events.

Stream Analytics ジョブの開始

ジョブの Event Hubs、操作、ストリーミング出力を構成したら、ジョブを保存して開始します。

Screenshot showing the Save and Start options.

  • 出力の開始時刻 - ジョブを開始するときは、ジョブが出力の作成を開始する時刻を選択します。
    • 今すぐ - 出力イベント ストリームの開始点をジョブの開始時刻と同じにします。
    • カスタム - 出力の開始点を選択できます。
    • 最終停止時刻 - このオプションは、以前にジョブが開始されたが、手動で停止されたか失敗した場合に使用できます。 このオプションを選択すると、データが失われないように、最後の出力時刻を使用してジョブが再開されます。
  • ストリーミング ユニット - ストリーミング ユニットは、実行中にジョブに割り当てられたコンピューティングとメモリの量を表します。 選択する SU の数がわからない場合は、3 つから始めて、必要に応じて調整することをお勧めします。
  • 出力データ エラー処理 - 出力データ エラー処理ポリシーは、Stream Analytics ジョブで生成された出力イベントがターゲット シンクのスキーマに準拠しないときにのみ適用されます。 [再試行][ドロップ] を選択することでこのポリシーを構成できます。 詳細については、「Azure Stream Analytics の出力エラー ポリシー」を参照してください。
  • 開始 - Stream Analytics ジョブを開始します。

Screenshot showing the Start Stream Analytics job window where you review the job configuration and start the job.

Stream Analytics ジョブの一覧表示

[データの処理]>[Stream Analytics ジョブ] の下にコードなしのドラッグ アンド ドロップで作成されたすべての Stream Analytics ジョブのリストが表示されます。

Screenshot showing the Stream Analytics job list where you review job status.

  • フィルター - ジョブ名で一覧をフィルター処理できます。
  • 更新 - リストは現在自動更新されません。 リストを更新し、最新の状態を表示するには、このオプションを使用します。
  • ジョブ名 - ジョブの作成の最初の手順で指定した名前。 編集することはできません。 ジョブ名を選択して、ジョブをコードなしのドラッグ アンド ドロップ エクスペリエンスで開きます。このエクスペリエンスでは、ジョブを停止して編集し、再開できます。
  • 状態 - ジョブの状態。 一覧の上部にある [更新] を選択すると、最新の状態が表示されます。
  • ストリーミング ユニット - ジョブの開始時に選択されたストリーミング ユニットの数。
  • 出力の透かし - ジョブによって生成されたデータの活動状態のインジケーター。 タイムスタンプより前のすべてのイベントは既に計算されています。
  • ジョブの監視 - [オープン メトリック] を選択して、この Stream Analytics ジョブに関連するメトリックを表示します。 Stream Analytics ジョブの監視に使用できるメトリックの詳細については、「Stream Analytics で使用できるメトリック」を参照してください。
  • 操作 - ジョブを開始、停止、または削除します。

次の手順

事前定義されたテンプレートを使用して一般的なシナリオに対処するためにコードなしエディターを使用する方法について説明します。