Azure Event Hubs データ接続

Azure Event Hubs は、ビッグ データのストリーミング プラットフォームとなるイベント インジェスト サービスです。 Azure Data Explorer は、お客様が管理する Event Hubs からの継続的なインジェストを提供します。

Event Hubs のインジェスト パイプラインは、いくつかのステップで、Azure Data Explorer にイベントを転送します。 最初に、Azure portal でイベント ハブを作成します。 次に、特定の形式のデータが、指定されたインジェスト プロパティを使用して取り込まれるターゲット テーブルを Azure Data Explorer に作成します。 Event Hubs 接続はイベント ルーティングを認識している必要があります。 データは、イベント システム プロパティのマッピングに従って、選択したプロパティを使用して埋め込むことができます。 Event Hubs への接続を作成して、イベント ハブを作成し、イベントを送信します。 このプロセスは、Azure portalC# または Python によるプログラム、または Azure Resource Manager テンプレートを使用して管理できます。

Azure Data Explorer でのデータ インジェストに関する一般的な情報については、「Azure Data Explorer のデータ インジェスト概要」を参照してください。

Azure Data Explorer のデータ接続認証メカニズム

注意事項

マネージド ID のアクセス許可がデータ ソースから削除された場合、データ接続は無効になり、そのデータ ソースからデータをフェッチすることはできません。

  • キーベースのデータ接続: マネージド ID がデータ接続で指定されていない場合、接続は自動的にキーベースの認証に設定されます。 キーベースの接続では、リソース接続文字列 (Azure Event Hubs 接続文字列など) を使用してデータがフェッチされます。 Azure Data Explorer では、指定したリソースのリソース接続文字列が生成され、データ接続に安全に保存されます。 その後、接続文字列を使用して、データ ソースからデータがフェッチされます。

注意事項

キーがローテーションされると、データ接続は無効になり、データ ソースからデータをフェッチすることはできません。 この問題を解決するには、データ接続を更新または再作成します。

データ形式

  • データは EventData オブジェクトの形式でイベント ハブから読み取られます。

  • サポートされる形式を確認してください。

    注意

  • GZip 圧縮アルゴリズムを使用してデータを圧縮できます。 CompressionCompressionを使って動的に指定するか、静的なデータ接続設定で指定することができます。

    Note

    圧縮形式 (Avro、Parquet、ORC、ApacheAvro、および W3CLOGFILE) については、データ圧縮はサポートされません。 カスタム エンコードおよび埋め込みシステム プロパティは、圧縮データではサポートされていません。

Event Hubs のプロパティ

Azure Data Explorer では、次の Event Hubs プロパティがサポートされています。

Note

メタデータをイベントに関連付けるために使用される Event Hubs カスタム プロパティの取り込みはサポートされていません。 カスタム プロパティを取り込む必要がある場合は、イベント データの本文でそれらを送信します。 詳細については「カスタム プロパティの取り込み」を参照してください。

インジェストのプロパティ

インジェスト プロパティは、インジェスト プロセス、データのルーティング先と処理方法を指示します。 EventData.Properties を使用して、イベント インジェストのインジェスト プロパティを指定できます。 以下のプロパティを設定できます。

Note

プロパティ名は大文字と小文字が区別されます。

プロパティ 説明
データベース 大文字と小文字が区別される、ターゲット データベースの名前。 既定では、データはデータ接続に関連付けられているターゲット データベースに取り込まれます。 既定のデータベースをオーバーライドし、別のデータベースにデータを送信するには、このプロパティを使用します。 これを行うには、最初にマルチデータベース接続として接続を設定する必要があります。
テーブル 大文字と小文字が区別される、既存のターゲット テーブルの名前。 [Data Connection] ペインで設定された Table をオーバーライドします。
形式 データ形式。 [Data Connection] ペインで設定された Data format をオーバーライドします。
IngestionMappingReference 使用する既存のインジェスト マッピングの名前。 [Data Connection] ペインで設定された Column mapping をオーバーライドします。
圧縮 データ圧縮、None (既定)、または GZip 圧縮。
エンコード データ エンコード (既定値は UTF8)。 .NET でサポートされているエンコードのいずれかを指定できます。
タグ JSON 配列文字列として書式設定された、取り込まれたデータに関連付けられるタグの一覧。 タグを使用すると、パフォーマンスに影響します。
RawHeaders イベント ソースが Kafka であり、Azure Data Explorerがバイト配列の逆シリアル化を使用して他のルーティング プロパティを読み取る必要があることを示します。 値は無視されます。

注意

データ接続の作成後にエンキューされたイベントのみが取り込まれたます。

イベント ルーティング

クラスターへのデータ接続を作成するときに、取り込まれたデータの送信先のルーティングを指定できます。 既定のルーティングは、ターゲット データベースに関連付けられている接続文字列で指定されたターゲット テーブルです。 データに対する既定のルーティングは、"静的ルーティング" と呼ばれることもあります。 前述のイベント データのプロパティを設定することで、データの代替ルーティングを指定できます。

イベント データを別のデータベースにルーティングする

代替データベースへのデータのルーティングは、既定では無効になっています。 データを別のデータベースに送信するには、最初に、マルチデータベース接続として接続を設定する必要があります。 これは、Azure portalC#Python、または ARM テンプレートで行うことができます。 データベース ルーティングを許可するために使用されるユーザー、グループ、サービス プリンシパル、またはマネージド ID には、少なくともクラスターに対する共同作成者ロールと書き込みアクセス許可が必要です。

別のデータベースを指定するには、"データベース" のインジェストのプロパティを設定します。

警告

マルチデータベースのデータ接続として接続を設定せずに別のデータベースを指定すると、インジェストが失敗します。

イベント データを別のテーブルにルーティングする

各イベントに別のテーブルを指定するには、TableFormatCompression、およびインジェストのプロパティのマッピングを設定します。 EventData.Properties の指定に従って、取り込まれたデータが接続で動的にルーティングされ、このイベントの静的プロパティがオーバーライドされます。

次の例では、イベント ハブの詳細を設定し、気象メトリック データを代替データベース (MetricsDB) とテーブル (WeatherMetrics) に送信する方法を示します。 データは JSON 形式で、mapping1 はテーブル WeatherMetrics で事前定義されています。

// This sample uses Azure.Messaging.EventHubs which is a .Net Framework library.
await using var producerClient = new EventHubProducerClient("<eventHubConnectionString>");
// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(
    new { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }
)));
eventData.Properties.Add("Database", "MetricsDB");
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['myDataTag']");
var events = new[] { eventData };
// Send events
await producerClient.SendAsync(events);

イベント システム プロパティのマッピング

イベントがエンキューされるときに、Event Hubs サービスによって設定されたプロパティがシステム プロパティに格納されます。 イベント ハブへのデータ接続では、選択した一連のシステム プロパティを、特定のマッピングに基づいてテーブルに読み込まれたデータに埋め込むことができます。

注意

  • システム プロパティの埋め込みは、json および表形式 (つまり JSONMultiJSONCSVTSVPSVSCsvSOHsvTSVE) でサポートされています。
  • サポートされていない形式 (例: TXT や、ParquetAvro などの 圧縮形式) を使用してもデータは取り込まれますが、プロパティは無視されます。
  • イベント ハブ メッセージの圧縮が設定されている場合、システム プロパティの埋め込みはサポートされません。 このようなシナリオでは、該当するエラーが生成され、データは取り込まれません。
  • 表形式データの場合、システム プロパティは単一レコードのイベント メッセージでのみサポートされます。
  • json データの場合、システム プロパティは複数レコードのイベント メッセージでもサポートされます。 このような場合、システム プロパティは、イベント メッセージの最初のレコードにのみ追加されます。
  • マッピングの場合 CSV 、プロパティは、データ接続の作成に記載されている順序でレコードの先頭に追加されます。 将来変更される可能性があるため、これらのプロパティの順序に依存しないでください。
  • JSON マッピングの場合、システム プロパティの表のプロパティ名に従ってプロパティが追加されます。

Event Hubs サービスでは、次のシステム プロパティが公開されます。

プロパティ データ型 説明
x-opt-enqueued-time datetime イベントがエンキューされた UTC 時刻
x-opt-sequence-number long イベント ハブのパーティション ストリーム内のイベントの論理シーケンス番号
x-opt-offset string イベント ハブのパーティション ストリームからのイベントのオフセット。 このオフセット識別子は、イベント ハブ ストリームのパーティション内で一意です
x-opt-publisher string 発行元の名前 (発行元のエンドポイントにメッセージが送信された場合)
x-opt-partition-key string イベントが格納されている、対応するパーティションのパーティション キー

IoT Central イベント ハブを操作するときに、IoT Hub のシステム プロパティーをペイロードに埋め込むこともできます。 完全な一覧については、IoT Hub のシステム プロパティに関する記事を参照してください。

テーブルの [データ ソース] セクションで [イベント システムのプロパティ] を選択した場合は、テーブルのスキーマとマッピングにプロパティを含める必要があります。

スキーマ マッピングの例

テーブル スキーマ マッピングの例

データに 3 つの列 (TimespanMetric、および Value) が含まれており、含めるプロパティが x-opt-enqueued-time および x-opt-offset の場合は、次のコマンドを使用してテーブル スキーマを作成または変更します。

    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

CSV マッピングの例

次のコマンドを実行して、レコードの先頭にデータを追加します。 序数値に注意してください。

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

JSON マッピングの例

データは、システム プロパティのマッピングを使用して追加されます。 これらのコマンドを実行します。

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

Event Hub Capture Avro ファイルのスキーマ マッピング

イベント ハブ データを使う方法の 1 つは、Azure Blob Storage か Azure Data Lake Storage の Azure Event Hubs を通じてイベントをキャプチャすることです。 その後、Azure Data Explorer の Event Grid データ接続を使って、書き込まれたキャプチャ ファイルを取り込むことができます。

キャプチャしたファイルのスキーマは、イベント ハブに送信された元のイベントのスキーマとは異なります。 この違いを考慮した上で、宛先テーブルのスキーマを設計する必要があります。 具体的には、イベントのペイロードはキャプチャ ファイルではバイト配列で表され、この配列は Event Grid の Azure Data Explorer データ接続では自動的にデコードされません。 Event Hub Avro キャプチャ データのファイル スキーマに関する特定の情報については、「Azure Event Hubs でキャプチャされた Avro ファイルの操作」を参照してください。

イベント ペイロードを正しくデコードするには:

  1. キャプチャしたイベントの Body フィールドを、宛先テーブルの型 dynamic の列にマップします。
  2. unicode_codepoints_to_string() 関数を使用して、バイト配列を読み取り可能な文字列に変換する更新ポリシーを適用します。

カスタム プロパティの取り込み

Event Hubs からイベントを取り込む場合、データはイベント データ オブジェクトの body セクションから取得されます。 ただし、Event Hubs の カスタム プロパティは、オブジェクトの properties セクションで定義され、取り込まれません。 顧客のプロパティを取り込むには、オブジェクトのbody セクションのデータに埋め込む必要があります。

次の例では、Event Hubs (左側) によって "定義された" カスタムプロパティ customProperty を含むイベント データ オブジェクトと、インジェスト (右側) に必要な "埋め込み" プロパティを比較しています。

{
"body":{
"value": 42
},
"properties":{
"customProperty": "123456789"
}
}
{
"body":{
"value": 42,
"customProperty": "123456789"
}
}

次のいずれかの方法を使用して、イベント データ オブジェクトの body セクション内のデータにカスタム プロパティを埋め込むことができます。

  • Event Hubs では、イベント データ オブジェクトを作成するときに、オブジェクトの body セクションのデータの一部としてカスタム プロパティを埋め込みます。
  • Azure Stream Analytics を使用して、イベント ハブからのイベントを処理し、カスタム プロパティをイベント データに埋め込みます。 Azure Stream Analytics から、Azure Data Explorer 出力コネクタを使用してデータをネイティブに取り込んだり、データを別のイベント ハブにルーティングして、そこから自分のクラスターにルーティングしたりすることができます。
  • Azure Functions を使用して、カスタム プロパティを追加し、データを取り込みます。

リージョンをまたがる Event Hub データ接続

最適なパフォーマンスを得るには、クラスターと同じリージョンに次のすべてのリソースを作成します。 他の代替手段がない場合は、 Premium または Dedicated Event Hub レベルの使用を検討してください。 Event Hub レベルの比較については、 こちらを参照してください

イベント ハブの作成

まだ用意していない場合は、イベント ハブを作成します。 イベント ハブへの接続は、Azure portalC# または Python によるプログラム、または Azure Resource Manager テンプレートを使用して管理できます。

注意

  • イベント ハブの作成後にパーティションを動的に追加する機能は、Event Hubs の Premium と Dedicated レベルでのみ使用できます。 パーティション数を選択する場合は、長期間にわたるスケールを考慮してください。
  • コンシューマー グループは、コンシューマーごとに一意である "必要があります"。 Azure Data Explorer 接続専用のコンシューマー グループを作成します。

送信イベント

データを生成してイベント ハブに送信するサンプル アプリをご覧ください。

サンプル データの生成方法の例については、「イベント ハブから Azure Data Explorer にデータを取り込む」を参照してください。

geo ディザスター リカバリー ソリューションの設定

イベント ハブには、geo ディザスター リカバリー ソリューションが用意されています。 Azure Data Explorer では Alias イベント ハブ名前空間がサポートされていません。 ソリューションに geo ディザスター リカバリーを実装するには、2 つのイベント ハブ データ接続を作成します。1 つはプライマリ名前空間用で、もう 1 つはセカンダリ名前空間用です。 Azure Data Explorer は、両方のイベント ハブ接続をリッスンします。

注意

プライマリ名前空間からセカンダリ名前空間へのフェールオーバーを実装するのは、ユーザーの責任です。