Integrating Apache Spark with Azure Event Hubs

Azure Event Hubs seamlessly integrates with Apache Spark to enable building distributed streaming applications. This integration supports Spark Core, Spark Streaming, and Structured Streaming. The Event Hubs connector for Apache Spark is available on GitHub. This library is also available for use in Maven projects from the Maven Central Repository.

This article describes how to create a continuous application in Azure Databricks. While this article uses Azure Databricks, Spark clusters are also available with HDInsight.

The example in this article uses two Scala notebooks: one for streaming events from an event hub and another for sending events back to it.

Prerequisites

Stream events from your event hub using the following code:

import org.apache.spark.eventhubs._

// To connect to an event hub, EntityPath is required as part of the connection string.
// Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("{EVENT HUB CONNECTION STRING FROM AZURE PORTAL}")
  .setEventHubName("{EVENT HUB NAME}")
  .build 
val ehConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

// Create a stream that reads data from the specified Event Hub.
val reader = spark.readStream
  .format("eventhubs")
  .options(ehConf.toMap)
  .load()
val eventhubs = reader.select($"body" cast "string")

eventhubs.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

The following code sends events to your event hub with the Spark batch APIs. You can also write a streaming query to send events to the event hub:

import org.apache.spark.eventhubs._
import org.apache.spark.sql.functions._

// To connect to an event hub, EntityPath is required as part of the connection string.
// Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("{EVENT HUB CONNECTION STRING FROM AZURE PORTAL}")
  .setEventHubName("{EVENT HUB NAME}")
  .build
val ehConf = EventHubsConf(connectionString)

// Create random strings as the body of the message.
val bodyColumn = concat(lit("random nunmber: "), rand()).as("body")

// Write 200 rows to the specified event hub.
val df = spark.range(200).select(bodyColumn)
df.write
  .format("eventhubs")
  .options(ehConf.toMap)
  .save() 

Next steps

Now you know how to set up a scalable, fault-tolerant stream using the Event Hubs Connector for Apache Spark. Learn more about using Event Hubs with Structured Streaming and Spark Streaming by following these links: