Azure Event Hubs

Azure Event Hubs is a hyper-scale telemetry ingestion service that collects, transforms, and stores millions of events. As a distributed streaming platform, it gives you low latency and configurable time retention, which enables you to ingress massive amounts of telemetry into the cloud and read the data from multiple applications using publish-subscribe semantics.

This article explains how to use Structured Streaming with Azure Event Hubs and Azure Databricks clusters.

Note

Azure Event Hubs provides an endpoint compatible with Apache Kafka that you can use with the Structured Streaming Kafka connector, available in Databricks Runtime, to process messages from Azure Event Hubs. Databricks recommends using the Structured Streaming Kafka connector to process messages from Azure Event Hubs.

Requirements

For current release support, see “Latest Releases” in the Azure Event Hubs Spark Connector project readme file.

  1. Create a library in your Azure Databricks workspace using the Maven coordinate com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    Note

    This connector is updated regularly, and a more recent version may be available: we recommend that you pull the latest connector from the Maven repository

  2. Install the created library into your cluster.

Schema

The schema of the records is:

Column Type
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

The body is always provided as a byte array. Use cast("string") to explicitly deserialize the body column.

Configuration

This section discusses the configuration settings you need to work with Event Hubs.

For detailed guidance on configuring Structured Streaming with Azure Event Hubs, see the Structured Streaming and Azure Event Hubs Integration Guide developed by Microsoft.

For detailed guidance on using Structured Streaming, see Streaming on Azure Databricks.

Connection string

An Event Hubs connection string is required to connect to the Event Hubs service. You can get the connection string for your Event Hubs instance from the Azure portal or by using the ConnectionStringBuilder in the library.

Azure portal

When you get the connection string from the Azure portal, it may or may not have the EntityPath key. Consider:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

To connect to your EventHubs, an EntityPath must be present. If your connection string doesn’t have one, don’t worry. This will take care of it:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Alternatively, you can use the ConnectionStringBuilder to make your connection string.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

All configuration relating to Event Hubs happens in your EventHubsConf. To create an EventHubsConf, you must pass a connection string:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

See Connection String for more information about obtaining a valid connection string.

For a complete list of configurations, see EventHubsConf. Here is a subset of configurations to get you started:

Option Value Default Query type Description
consumerGroup String “$Default” Streaming and batch A consumer group is a view of an entire event hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. More information is available in the Microsoft documentation.
startingPosition EventPosition Start of stream Streaming and batch The starting position for your Structured Streaming job. See startingPositions for information about the order in which options are read.
maxEventsPerTrigger long partitionCount

* 1000
Streaming query Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume.

For each option, there exists a corresponding setting in EventHubsConf. For example:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf allows users to specify starting (and ending) positions with the EventPosition class. EventPosition defines the position of an event in an Event Hub partition. The position can be an enqueued time, offset, sequence number, the start of the stream, or the end of the stream.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

If you would like to start (or end) at a specific position, simply create the correct EventPosition and set it in your EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)