Azure Event HubsAzure Event Hubs

Azure Event Hubsは、数百万のイベントを収集、変換、保存するハイパースケールテレメトリインジェストサービスです。Azure Event Hubs is a hyper-scale telemetry ingestion service that collects, transforms, and stores millions of events. 分散型のストリーミング プラットフォームとして、待機時間が短く、保持時間を設定可能なため、膨大な量のテレメトリをクラウドに送信し、パブリッシュ/サブスクライブ セマンティクスを使用して、複数のアプリケーションからデータを読み込むことができます。As a distributed streaming platform, it gives you low latency and configurable time retention, which enables you to ingress massive amounts of telemetry into the cloud and read the data from multiple applications using publish-subscribe semantics.

このトピックでは、Azure Event Hubs と Azure Databricks クラスターで構造化ストリーミングを使用する方法について説明します。This topic explains how to use Structured Streaming with Azure Event Hubs and Azure Databricks clusters.

前提条件Requirements

Microsoft によって開発された Azure Event Hubs Spark コネクタには、 3.5 LTS以上の Databricks Runtime が必要です。The Azure Event Hubs Spark Connector, developed by Microsoft, requires Databricks Runtime 3.5 LTS or above.

現在のリリースサポートについては、Azure Event Hubs Spark コネクタプロジェクトのreadme ファイルの「最新リリース」を参照してください。For current release support, see “Latest Releases” in the Azure Event Hubs Spark Connector project readme file.

  1. Maven 座標 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.6を使用して、Azure Databricks ワークスペースにライブラリを作成します。Create a library in your Azure Databricks workspace using the Maven coordinate com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.6.

    注意

    このコネクタは定期的に更新され、より新しいバージョンが利用可能になる可能性があります。 Maven リポジトリから最新のコネクタをプルすることをお勧めします。This connector is updated regularly, and a more recent version may be available: we recommend that you pull the latest connector from the Maven repository

  2. 作成したライブラリをクラスターにインストールします。Install the created library into your cluster.

スキーマSchema

レコードのスキーマは次のとおりです。The schema of the records is:

Column タイプType
bodybody binarybinary
partitionpartition stringstring
offsetoffset stringstring
sequenceNumbersequenceNumber longlong
enqueuedTimeenqueuedTime timestamptimestamp
publisherpublisher stringstring
partitionKeypartitionKey stringstring
プロパティproperties map [string, json]map[string,json]

body は、常にバイト配列として提供されます。The body is always provided as a byte array. body 列を明示的に逆シリアル化するには、cast("string") を使用します。Use cast("string") to explicitly deserialize the body column.

クイック スタートQuick Start

まず、WordCount の簡単な例を見てみましょう。Let’s start with a quick example: WordCount. 次の notebook は、Azure Event Hubs で構造化ストリーミングを使用して WordCount を実行するために必要なものです。The following notebook is all that it takes to run WordCount using Structured Streaming with Azure Event Hubs.

Azure Event Hubs WordCount と構造化ストリーミング notebookAzure Event Hubs WordCount with Structured Streaming notebook

ノートブックを取得するGet notebook

構成Configuration

このセクションでは、Event Hubs を操作するために必要な構成設定について説明します。This section discusses the configuration settings you need to work with Event Hubs.

Azure Event Hubs を使用した構造化ストリーミングの構成に関する詳細なガイダンスについては、Microsoft が開発した構造化ストリーミングと azure Event Hubs 統合ガイドを参照してください。For detailed guidance on configuring Structured Streaming with Azure Event Hubs, see the Structured Streaming and Azure Event Hubs Integration Guide developed by Microsoft.

構造化ストリーミングの使用に関する詳細なガイダンスについては、「構造化ストリーミング」を参照してください。For detailed guidance on using Structured Streaming, see Structured Streaming.

接続文字列Connection string

Event Hubs サービスに接続するには、Event Hubs 接続文字列が必要です。An Event Hubs connection string is required to connect to the Event Hubs service. Event Hubs インスタンスの接続文字列は、 Azure portalから取得することも、ライブラリの ConnectionStringBuilder を使用して取得することもできます。You can get the connection string for your Event Hubs instance from the Azure portal or by using the ConnectionStringBuilder in the library.

Azure portalAzure portal

Azure portal から接続文字列を取得した場合、EntityPath キーが含まれている場合とない場合があります。When you get the connection string from the Azure portal, it may or may not have the EntityPath key. 以下を検討してください。Consider:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

EventHubs に接続するには、EntityPath が存在している必要があります。To connect to your EventHubs, an EntityPath must be present. 接続文字列に含まれていない場合は、心配しないでください。If your connection string doesn’t have one, don’t worry. これによって、次の処理が行われます。This will take care of it:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilderConnectionStringBuilder

または、ConnectionStringBuilder を使用して接続文字列を作成することもできます。Alternatively, you can use the ConnectionStringBuilder to make your connection string.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConfEventHubsConf

Event Hubs に関連するすべての構成は、EventHubsConf で発生します。All configuration relating to Event Hubs happens in your EventHubsConf. EventHubsConfを作成するには、接続文字列を渡す必要があります。To create an EventHubsConf, you must pass a connection string:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

有効な接続文字列の取得の詳細については、「接続文字列」を参照してください。See Connection String for more information about obtaining a valid connection string.

構成の完全な一覧については、「 EventHubsConf」を参照してください。For a complete list of configurations, see EventHubsConf. 開始する_構成のサブセット_を次に示します。Here is a subset of configurations to get you started:

オプションOption ValueValue DefaultDefault クエリの種類Query type descriptionDescription
consumerGroupconsumerGroup StringString "$Default"“$Default” ストリーミングとバッチStreaming and batch コンシューマーグループは、event hub 全体のビューです。A consumer group is a view of an entire event hub. コンシューマー グループを使用することにより、複数のコンシューマー アプリケーションは、イベント ストリームの個別のビューをそれぞれ保有し、独自のペースで独自のオフセットによってストリームを別々に読み取ることができます。Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. 詳細については、 Microsoft のドキュメントを参照してください。More information is available in the Microsoft documentation.
startingPositionstartingPosition EventPositionEventPosition ストリームの開始Start of stream ストリーミングとバッチStreaming and batch 構造化ストリーミングジョブの開始位置。The starting position for your Structured Streaming job. オプションの読み取り順序の詳細については、「 startingPositions 」を参照してください。See startingPositions for information about the order in which options are read.
maxEventsPerTriggermaxEventsPerTrigger longlong partitionCountpartitionCount
* 1000* 1000
ストリーミングクエリStreaming query トリガー間隔ごとに処理されるイベントの最大数のレート制限。Rate limit on maximum number of events processed per trigger interval. 指定されたイベントの合計数は、異なるボリュームのパーティション間で均等に分割されます。The specified total number of events will be proportionally split across partitions of different volume.

オプションごとに、EventHubsConf に対応する設定が存在します。For each option, there exists a corresponding setting in EventHubsConf. 例えば次が挙げられます。For example:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPositionEventPosition

EventHubsConf を使用すると、ユーザーは EventPosition クラスを使用して開始位置 (終了) を指定できます。EventHubsConf allows users to specify starting (and ending) positions with the EventPosition class. EventPosition は、イベントハブパーティション内のイベントの位置を定義します。EventPosition defines the position of an event in an Event Hub partition. 位置には、エンキューされた時刻、オフセット、シーケンス番号、ストリームの先頭、またはストリームの末尾を指定できます。The position can be an enqueued time, offset, sequence number, the start of the stream, or the end of the stream.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Specifies any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

特定の位置で開始 (または終了) する場合は、適切な EventPosition を作成し、EventHubsConf に設定するだけです。If you would like to start (or end) at a specific position, simply create the correct EventPosition and set it in your EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Azure Event Hubs を使用した運用環境の構造化ストリーミングProduction Structured Streaming with Azure Event Hubs

運用環境でストリーミングクエリを実行する場合は、単に notebook をクラスターにアタッチし、ストリーミングクエリを対話形式で実行する場合よりも、堅牢性とアップタイムの保証が必要になります。When you run streaming queries in production, you probably want more robustness and uptime guarantees than you would have when you simply attach a notebook to a cluster and run your streaming queries interactively. 次の notebook をインポートして実行し、Azure Event Hubs と Azure Databricks を使用して運用環境で構造化ストリーミングを構成して実行する方法を説明します。Import and run the following notebook for a demonstration of how to configure and run Structured Streaming in production with Azure Event Hubs and Azure Databricks.

詳細については、「運用環境での構造化ストリーミング」を参照してください。For more information, see Structured Streaming in Production.

Azure Event Hubs notebook を使用した運用環境の構造化ストリーミングProduction Structured Streaming with Azure Event Hubs notebook

ノートブックを取得するGet notebook

エンドツーエンドの Event Hubs ストリーミングのチュートリアルEnd-to-end Event Hubs streaming tutorial

Event Hubs を使用してクラスターにデータをストリーミングするエンドツーエンドの例については、「チュートリアル: Event Hubs を使用して Azure Databricks にデータをストリーム配信する」を参照してください。For an end-to-end example of streaming data into a cluster using Event Hubs, see Tutorial: Stream data into Azure Databricks using Event Hubs.