Share via


Apache Kafka から Azure Data Explorer にデータを取り込む

Apache Kafka は、システムまたはアプリケーション間でデータを確実に移動するリアルタイム ストリーミング データ パイプラインを構築するための分散ストリーミング プラットフォームです。 Kafka Connect は、Apache Kafka と他のデータ システムとの間でスケーラブルかつ高い信頼性でデータをストリーム配信するためのツールです。 Kusto Kafka シンクは Kafka からのコネクタとして機能し、コードを使用する必要はありません。 シンク コネクタ jar を Git リポジトリ または Confluent Connector Hub からダウンロードします。

この記事では、自己完結型 Docker セットアップを使用して Kafka クラスターと Kafka コネクタ クラスターのセットアップを簡略化して、Kafka でデータを取り込む方法について説明します。

詳細については、コネクタの Git リポジトリバージョンの詳細を参照してください。

前提条件

Microsoft Entra サービス プリンシパルを作成する

Microsoft Entra サービス プリンシパルは、次の例のように、Azure portalまたはプログラムによって作成できます。

このサービス プリンシパルは、Kusto でテーブルのデータを書き込むコネクタによって使用される ID です。 後で、Kusto リソースにアクセスするためのアクセス許可をこのサービス プリンシパルに付与します。

  1. Azure CLI を使用して Azure サブスクリプションにサインインします。 次に、ブラウザーで認証します。

    az login
    
  2. プリンシパルをホストするサブスクリプションを選択します。 この手順は、複数のサブスクリプションがある場合に必要です。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. サービス プリンシパルを作成します。 この例では、サービス プリンシパルを my-service-principal と呼びます。

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 返された JSON データから、今後使用するために appId、、 password、 を tenant コピーします。

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Microsoft Entra アプリケーションとサービス プリンシパルが作成されました。

ターゲット テーブルを作成する

  1. クエリ環境から、次のコマンドを使用して というテーブル Storms を作成します。

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. 次のコマンドを使用して、取り込まれたデータに対応するテーブル マッピング Storms_CSV_Mapping を作成します。

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. テーブルに インジェスト バッチ処理ポリシーを 作成して、キューに登録されたインジェスト待機時間を構成できます。

    ヒント

    インジェスト バッチ処理ポリシーはパフォーマンス オプティマイザーであり、3 つのパラメーターを含みます。 最初の条件が満たされると、Azure Data Explorer テーブルへのインジェストがトリガーされます。

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. 「Microsoft Entra サービス プリンシパルを作成する」のサービス プリンシパルを使用して、データベースを操作するアクセス許可を付与します。

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

ラボを実行する

次のラボは、データの作成の開始、Kafka コネクタの設定、およびコネクタを使用した Azure Data Explorer へのこのデータのストリーミングのエクスペリエンスをユーザーに提供するように設計されています。 その後、取り込まれたデータを確認できます。

Git リポジトリをクローンする

ラボの Git リポジトリをクローンします。

  1. お使いのマシン上にローカル ディレクトリを作成します。

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. リポジトリをクローンします。

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

クローンされたリポジトリの内容

次のコマンドを実行して、クローンされたリポジトリの内容を一覧表示します。

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

この検索の結果は次のとおりです。

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

クローンされたリポジトリ内のファイルを確認する

以下のセクションでは、上記のファイル ツリーのファイルの重要な部分について説明します。

adx-sink-config.json

このファイルには、特定の構成の詳細を更新する Kusto シンク プロパティ ファイルが含まれています。

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Azure Data Explorer のセットアップに従って、aad.auth.authorityaad.auth.appidaad.auth.appkeykusto.tables.topics.mapping (データベース名)、kusto.ingestion.urlkusto.query.url の各属性の値を置き換えてください。

connector - Dockerfile

このファイルには、コネクタ インスタンス用の Docker イメージを生成するコマンドが含まれています。 これには、Git リポジトリのリリース ディレクトリからのコネクタのダウンロードが含まれています。

storm-events-producer ディレクトリ

このディレクトリには、ローカルの "StormEvents.csv" ファイルを読み取り、そのデータを Kafka トピックに発行する Go プログラムがあります。

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

コンテナーを開始する

  1. ターミナルで、コンテナーを開始します。

    docker-compose up
    

    プロデューサー アプリケーションによって、storm-events トピックへのイベントの送信が開始されます。 次のようなログが表示されます。

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. ログを確認するには、別のターミナルで次のコマンドを実行します。

    docker-compose logs -f | grep kusto-connect
    

コネクタを開始する

Kafka Connect REST 呼び出しを使用して、コネクタを開始します。

  1. 別のターミナルで、次のコマンドを使用してシンク タスクを起動します。

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. 状態を確認するには、別のターミナルで次のコマンドを実行します。

    curl http://localhost:8083/connectors/storm/status
    

コネクタにより、Azure Data Explorer へのインジェスト プロセスがキューに格納され始めます。

注意

ログ コネクタの問題がある場合は、issue を作成します

データのクエリを実行して確認する

データ インジェストを確認する

  1. Storms テーブルにデータが到着するのを待ちます。 データの転送を確認するには、その行数を確認します。

    Storms | count
    
  2. インジェスト プロセスでエラーが発生していないことを確認します。

    .show ingestion failures
    

    データを確認したら、クエリをいくつか試してみてください。

データにクエリを実行する

  1. すべてのレコードを表示するには、次のクエリを実行します。

    Storms
    
  2. whereproject を使用して、特定のデータをフィルター処理します。

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. summarize 演算子を使用します。

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Azure Data Explorerでの Kafka クエリ縦棒グラフの結果のスクリーンショット。

クエリの例とガイダンスの詳細については、「KQL でクエリを記述する」およびドキュメントKusto 照会言語参照してください

リモート アクセスの

リセットするには、次の手順を実行します。

  1. コンテナーを停止します (docker-compose down -v)
  2. [削除] (drop table Storms)
  3. Storms テーブルを再作成します
  4. テーブル マッピングを再作成します
  5. コンテナーを再起動します (docker-compose up)

リソースをクリーンアップする

Azure Data Explorer リソースを削除するには、az cluster delete または az Kusto database delete を使用します。

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Kafka シンク コネクタのチューニング

インジェスト バッチ処理ポリシーを使用するために Kafka シンク コネクタを調整します。

  • Kafka シンク flush.size.bytes のサイズ制限を 1 MB から調整し、10 MB または 100 MB の増分ずつ増やします。
  • Kafka シンクを使用する場合、データは 2 回集計されます。 コネクタ側のデータはフラッシュ設定に従って集計され、Azure Data Explorer サービス側ではバッチ処理ポリシーに従って集計されます。 バッチ処理時間が短すぎてコネクタとサービスの両方でデータを取り込めなかった場合は、バッチ処理時間を長くする必要があります。 バッチ処理サイズを 1 GB に設定し、必要に応じて 100 MB ずつ増減します。 たとえば、フラッシュ サイズが 1 MB で、バッチ処理ポリシー サイズが 100 MB の場合、100 MB のバッチが Kafka Sink コネクタによって集計された後、Azure Data Explorer サービスによって 100 MB のバッチが取り込まれます。 バッチ処理ポリシー時間が 20 秒で、Kafka シンク コネクタが 20 秒の期間内に 50 MB をフラッシュする場合、サービスは 50 MB のバッチを取り込みます。
  • インスタンスと Kafka パーティションを追加することでスケーリングできます。 tasks.max をパーティションの数まで増やします。 flush.size.bytes 設定のサイズの BLOB に生成するのに十分なデータがある場合は、パーティションを作成します。 BLOB が小さい場合、バッチは制限時間に達すると処理されるため、パーティションは十分なスループットを受け取りません。 パーティションの数が多い場合は、処理オーバーヘッドが増えます。