チュートリアル: Parquet 形式で Event Hubs データをキャプチャし、Azure Synapse Analytics を使用して分析する

このチュートリアルでは、Stream Analytics のノー コード エディターを使用して、Event Hubs のデータを Parquet 形式で Azure Data Lake Storage Gen2 にキャプチャするジョブを作成する方法を示します。

このチュートリアルでは、以下の内容を学習します。

  • イベント ハブにサンプル イベントを送信するイベント ジェネレーターをデプロイする
  • ノー コード エディターを使用して Stream Analytics のジョブを作成する
  • 入力データとスキーマを確認する
  • イベント ハブ データをキャプチャする Azure Data Lake Storage Gen2 を構成する
  • Stream Analytics ジョブの実行
  • Azure Synapse Analytics を使用して Parquet ファイルのクエリを実行する

前提条件

始める前に、次の手順が完了していることを確認してください。

ノー コード エディターを使用して Stream Analytics のジョブを作成する

  1. TollApp イベント ジェネレーターがデプロイされたリソース グループがある場所を探します。

  2. Azure Event Hubs 名前空間を選択します。

  3. [Event Hubs 名前空間] ページで、左側のメニューの [エンティティ] の下にある [Event Hubs] を選択します。

  4. entrystream インスタンスを選択します。

    イベント ハブの選択を示すスクリーンショット。

  5. [Event Hubs のインスタンス] ページで、左側のメニューの [機能] セクションで [データの処理] を選択します。

  6. [Capture data to ADLS Gen2 in Parquet format] (ADLS Gen2 へのデータを Parquet 形式でキャプチャする) タイルの [開始] を選択します。

    [データを Parquet 形式で ADLS Gen2 にキャプチャする] タイルの選択を示すスクリーンショット。

  7. ジョブに parquetcapture という名前を付け、[作成] を選択します。

    [新しい Stream Analytics ジョブ] ページのスクリーンショット。

  8. イベント ハブの構成ページで、次の設定を確認し、[接続] を選択します。

    • "コンシューマー グループ": 既定値

    • 入力データの "シリアル化の種類": JSON

    • ジョブでイベント ハブへの接続に使用される "認証モード": 接続文字列。

      イベント ハブの構成ページのスクリーンショット。

  9. 数秒以内に、サンプルの入力データとそのスキーマが表示されます。 フィールドの削除、フィールド名の変更、データ型の変更を実行することもできます。

    フィールドとデータのプレビューを示すスクリーンショット。

  10. キャンバス上の[Azure Data Lake Storage Gen2] タイルを選択し、次を指定して構成します。

    • Azure Data Lake Gen2 アカウントが配置されているサブスクリプション
    • ストレージ アカウント名。これは、「前提条件」セクションで行われた Azure Synapse Analytics ワークスペースで使用されたのと同じ ADLS Gen2 アカウントである必要があります。
    • Parquet ファイルが作成されるコンテナー。
    • {date}/{time} に設定されたパス パターン
    • 既定の yyyy-mm-dd および HH としての日付と時刻のパターン。
    • [接続] を選択します

    Data Lake Storage の構成設定を示すスクリーンショット。

  11. 上部のリボンで [保存] を選択してジョブを保存し、[開始] を選択してジョブを実行します。 ジョブが開始されたら、右上隅にある [X] を選択して [Stream Analytics ジョブ] ページを閉じます。

    [Stream Analytics ジョブの開始] ページを示すスクリーンショット。

  12. これで、ノー コード エディターを使用して作成されたすべての Stream Analytics ジョブの一覧が表示されます。 そして 2 分以内に、ジョブは [実行中] の状態になります。 ページの [最新の情報に更新] ボタンを選択すると、状態が [作成済み] -> [開始] -> [実行中] に変わります。

    Stream Analytics ジョブの一覧を示すスクリーンショット。

Azure Data Lake Storage Gen 2 アカウントの出力を表示する

  1. 前の手順で使用した Azure Data Lake Storage Gen2 アカウントを見つけます。

  2. 前の手順で使用したコンテナーを選択します。 前の手順で使用した {date}/{time} パス パターンに基づいて作成された Parquet ファイルが表示されます。

    Azure Data Lake Storage Gen 2 でキャプチャした Parquet ファイルを示すスクリーンショット。

Azure Synapse Analytics を使用してキャプチャしたデータに Parquet 形式でクエリを実行する

Azure Synapse Spark を使用してクエリを実行する

  1. Azure Synapse Analytics ワークスペースを見つけて、Synapse Studio を開きます。

  2. まだ存在しない場合は、ワークスペースにサーバーレス Apache Spark プールを作成します。

  3. Synapse Studio で [開発] ハブに移動し、新しい Notebook を作成します。

  4. 新しいコード セルを作成し、以下のコードをそのセルに貼り付けます。 containeradlsname を、前の手順で使用したコンテナーと ADLS Gen2 アカウントの名前に置き換えます。

    %%pyspark
    df = spark.read.load('abfss://container@adlsname.dfs.core.windows.net/*/*/*.parquet', format='parquet')
    display(df.limit(10))
    df.count()
    df.printSchema()
    
  5. ツール バーの [アタッチ先] で、ドロップダウン リストから Spark プールを選択します。

  6. [すべて実行] を選択して結果を確認します。

    Azure Synapse Analytics の Spark 実行結果のスクリーンショット。

Azure Synapse サーバーレス SQLを使用したクエリ

  1. [開発] ハブで、新しいSQL スクリプトを作成します。

    新しい SQL スクリプト メニューが選択された開発者ページを示すスクリーンショット。

  2. 次のスクリプトを貼り付け、組み込みのサーバーレス SQL エンドポイントを使用して実行します。 containeradlsname を、前の手順で使用したコンテナーと ADLS Gen2 アカウントの名前に置き換えます。

    SELECT
        TOP 100 *
    FROM
        OPENROWSET(
            BULK 'https://adlsname.dfs.core.windows.net/container/*/*/*.parquet',
            FORMAT='PARQUET'
        ) AS [result]
    

    Azure Synapse Analytics の SQL スクリプトの結果のスクリーンショット。

リソースをクリーンアップする

  1. Event Hubs のインスタンスの場所を探し、[データの処理] セクションの Stream Analytics ジョブの一覧を確認します。 実行中のジョブをすべて停止します。
  2. TollApp イベント ジェネレーターのデプロイ時に使用したリソース グループに移動します。
  3. [リソース グループの削除] を選択します。 リソース グループの名前を入力して削除することを確認します。

次のステップ

このチュートリアルでは、ノー コード エディターを使用して Stream Analytics ジョブを作成し、Event Hubs データ ストリームを Parquet 形式でキャプチャする方法について説明しました。 その後、Azure Synapse Analytics を使用して、Synapse Spark と Synapse SQL の両方を使用して Parquet ファイルのクエリを実行しました。