Azure Event Hubs

Azure Event Hubs は、数百万のイベントを収集、変換、保存する、ハイパースケールのテレメトリ インジェスト サービスです。 分散型のストリーミング プラットフォームとして、待機時間が短く、保持時間を設定可能なため、膨大な量のテレメトリをクラウドに送信し、パブリッシュ/サブスクライブ セマンティクスを使用して、複数のアプリケーションからデータを読み込むことができます。

この記事では、Azure Event Hubs と Azure Databricks クラスターの間で構造化ストリームを使用する方法を説明します。

Note

Azure Event Hubs は、Apache Kafka と互換性のあるエンドポイントを提供します。これを Databricks Runtime で使用できる構造化ストリーミング Kafka コネクタとともに使用して、Azure Event Hubs からのメッセージを処理することができます。 Databricks では、構造化ストリーミング Kafka コネクタを使用して、Azure Event Hubs からのメッセージを処理することをお勧めします。

要件

現在のリリースのサポートについては、Azure Event Hubs Spark コネクタ プロジェクトの readme ファイルの最新リリースの項を参照してください。

  1. Maven 座標 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 を使用して、Azure Databricks ワークスペース内にライブラリを作成します。

    注意

    このコネクタは定期的に更新されており、さらに新しいバージョンが利用できる可能性があります。Maven リポジトリから最新のコネクタをプルすることをお勧めします

  2. 作成したライブラリをクラスターにインストールします。

スキーマ

レコードのスキーマは次のとおりです。

Type
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

body は常にバイト配列として提供されます。 body 列を明示的に逆シリアル化するには、cast("string") を使用します。

クイック スタート

簡単な例 WordCount から始めましょう。 次のノートブックは、構造化ストリームを Azure Event Hubs と一緒に使用して WordCount を実行するために必要なすべてを備えています。

Azure Event Hubs WordCount と構造化ストリーム ノートブック

ノートブックを入手

構成

このセクションでは、Event Hubs を操作するために必要な構成設定について説明します。

Azure Event Hubs を使用した構造化ストリームの構成に関する詳細なガイダンスについては、Microsoft が開発した「構造化ストリーミングおよび Azure Event Hubs 統合ガイド」を参照してください。

構造化ストリームの使用に関する詳細なガイダンスについては、「Azure Databricks でのストリーミング」を参照してください。

接続文字列

Event Hubs サービスに接続するには、Event Hubs 接続文字列が必要です。 Event Hubs インスタンスの接続文字列は、Azure portal から、またはライブラリ内の ConnectionStringBuilder を使用して取得できます。

Azure ポータル

Azure portal から接続文字列を取得するときに、この文字列が EntityPath キーを持つ場合と持たない場合があります。 以下を検討してください。

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

EventHubs に接続するには、EntityPath が存在する必要があります。 接続文字列にない場合、心配しないでください。 これにより対応されます。

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

または、ConnectionStringBuilder を使用して、接続文字列を作成できます。

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

EventHubsConf で、Event Hubs と関連するすべての構成が発生します。 EventHubsConf を作成するには、次の接続文字列を渡す必要があります。

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

有効な接続文字列の取得の詳細については、「接続文字列」を参照してください。

構成の完全な一覧については、「EventHubsConf」を参照してください。 作業の開始に使用する構成のサブセットを次に示します。

オプション Default クエリの種類 説明
consumerGroup String “$Default” ストリーミングとバッチ コンシューマー グループは、イベント ハブ全体のビューです。 コンシューマー グループを使用することにより、複数のコンシューマー アプリケーションは、イベント ストリームの個別のビューをそれぞれ保有し、独自のペースで独自のオフセットによってストリームを別々に読み取ることができます。 詳細については、Microsoft のドキュメントに関するページを参照してください。
startingPosition EventPosition ストリームの開始 ストリーミングとバッチ 構造化ストリーム ジョブの開始位置。 オプションの読み取り順序については、「startingPositions」を参照してください。
maxEventsPerTrigger long partitionCount

* 1000
ストリーミング クエリ トリガー間隔ごとに処理されるイベントの最大数に対するレート制限。 指定されたイベントの総数は、異なるボリュームのパーティション間で比例的に分割されます。

オプションごとに、EventHubsConf に対応する設定が存在します。 次に例を示します。

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf では、ユーザーは EventPosition クラスを使用して開始位置 (および終了位置) を指定できます。 EventPosition は、イベント ハブ パーティション内のイベントの位置を定義します。 位置には、エンキューされた時間、オフセット、シーケンス番号、ストリームの始点、またはストリームの末尾を指定できます。

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

特定の位置から開始 (または終了) する場合は、正しい EventPosition を作成し、EventHubsConf に設定します。

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Azure Event Hubs を使用した運用環境での構造化ストリーム

ストリーミング クエリを実稼働環境で実行するときは、ノートブックをクラスターにアタッチし、ストリーミング クエリを対話形式で実行するときよりも、高い堅牢性とアップタイムを保証する必要がある場合があります。 次のノートブックをインポートして実行し、Azure Event Hubs と Azure Databricks を使用して、実稼働環境で構造化ストリームを構成して実行する方法のデモを行います。

詳細については、「構造化ストリーミングの運用に関する考慮事項」を参照してください。

Azure Event Hubs ノートブックを使用した運用環境での構造化ストリーム

ノートブックを入手