Spark Streaming: Process events from Azure Event Hubs with Apache Spark cluster on HDInsight

In this article, you understand some concepts related to streaming using Apache Spark and then create a streaming solution that involves the following steps:

  1. You use a standalone application to ingest messages into an Azure Event Hub.

  2. You retrieve the messages from Event Hub in real-time using an application running in Spark cluster on Azure HDInsight.

  3. You route the data to different outputs such as Azure Storage Blob, Hive table, or a SQL table.

Prerequisites

Spark streaming concepts

For an in-depth explanation of how streaming is handled in Apache Spark, see Apache Spark Streaming Overview. HDInsight brings the same streaming functions to a Spark cluster on Azure.

What does this solution do?

In this article, to create a streaming solution, you do the following steps:

  1. Create an Azure Event Hub that will receive a stream of events.

  2. Run a local standalone application that generates events and pushes it to the Azure Event Hub. The sample application that does this is published at https://github.com/hdinsight/spark-streaming-data-persistence-examples.

  3. Run a streaming application remotely on a Spark cluster that reads streaming events from Azure Event Hub and pushes it out to different locations (Azure Blob, Hive table, and SQL database table).

Create Azure Event Hub

  1. Log on to the Azure Portal, and click New at the top left of the screen.

  2. Click Internet of Things, then click Event Hubs.

    Create event hub

  3. In the Create namespace blade, enter a namespace name. choose the pricing tier (Basic or Standard). Also, choose an Azure subscription, resource group, and location in which to create the resource. Click Create to create the namespace.

    Create event hub

    Note

    You should select the same Location as your Apache Spark cluster in HDInsight to reduce latency and costs.

  4. In the Event Hubs namespace list, click the newly-created namespace.

  5. In the namespace blade, click Event Hubs, and then click + Event Hub to create a new Event Hub.

    Create event hub

  6. Type a name for your Event Hub, set the partition count to 10, and message retention to 1. We are not archiving the messages in this solution so you can leave the rest as default, and then click Create.

    Create event hub

  7. The newly created Event Hub is listed in the Event Hub blade.

  8. Back in the namespace blade (not the specific Event Hub blade), click Shared access policies, and then click RootManageSharedAccessKey.

  9. Click the copy button to copy the RootManageSharedAccessKey primary key and connection string to the clipboard. Save these to use later in the tutorial.

Send messages to an Azure Event Hub using a Scala application

In this section you use a standalone local Scala application that generates a stream of events and sends it to Azure Event Hub that you created in the previous step. This application is available on GitHub at https://github.com/hdinsight/eventhubs-sample-event-producer. The steps here assume that you have already forked this GitHub repository.

  1. Make sure you have the following installed on the computer where you are running this application.

    • Oracle Java Development kit. You can install it from here.
    • A Java IDE. This article uses IntelliJ IDEA 15.0.1. You can install it from here.
  2. Open the application, EventhubsSampleEventProducer, in IntelliJ IDEA.

  3. Build the project. From the Build menu, click Make Project. Depending on your IntelliJ IDEA configuration, the output jar is created under \classes\artifacts.

    Tip

    You can also use an option available in IntelliJ IDEA to directly create the project from a GitHub repository. To understand how to use that approach, use the instructions in the next section for guidance. Note that a lot of steps that are described in the next section will not be applicable for the Scala application that you create in this step. For example:

    • You do not have to update the POM to include the Spark version. That's because there is no dependency on Spark for creating this application
    • You do not have to add some dependency jars to the project library. Those jars are not required for this project.

Receive messages from the Event Hub using a streaming application running on Spark cluster

A sample Scala application to receive the event and route it to different destinations is available at https://github.com/hdinsight/spark-streaming-data-persistence-examples. Follow the steps below to update the application and create the output jar.

  1. Launch IntelliJ IDEA and from the launch screen select Check out from Version Control and then click Git.

    Get sources from Git

  2. In the Clone Repository dialog box, provide the URL to the Git repository to clone from, specify the directory to clone to, and then click Clone.

    Clone from Git

  3. Follow the prompts till the project is completely cloned. Press Alt + 1 to open the Project View. It should resemble the following.

    Project View

  4. Make sure the application code is compiled with Java8. To ensure this, click File, click Project Structure, and on the Project tab, make sure Project language level is set to 8 - Lambdas, type annotations, etc..

    Project structure

  5. Open the pom.xml and make sure the Spark version is correct. Under <properties> node, look for the following snippet and verify the Spark version.

     <scala.version>2.11.8</scala.version>
     <scala.compat.version>2.11.8</scala.compat.version>
     <scala.binary.version>2.11</scala.binary.version>
     <spark.version>2.0.0</spark.version>
    
  6. The application requires a dependency jar called JDBC driver jar. This is required to write the messages received from Event Hub into an Azure SQL database. You can download v4.1 or later of this jar file from here. Add reference to this jar in the project library. Perform the following steps:

    1. From IntelliJ IDEA window where you have the application open, click File, click Project Structure, and then click Libraries.
    2. Click the add icon (add icon), click Java, and then navigate to the location where you downloaded the JDBC driver jar. Follow the prompts to add the jar file to the project library.

      add missing dependencies

    3. Click Apply.
  7. Create the output jar file. Perform the following steps.

    1. In the Project Structure dialog box, click Artifacts and then click the plus symbol. From the pop-up dialog box, click JAR, and then click From modules with dependencies.

      Create JAR

    2. In the Create JAR from Modules dialog box, click the ellipsis (ellipsis) against the Main Class.
    3. In the Select Main Class dialog box, select any of the available classes and then click OK.

      Create JAR

    4. In the Create JAR from Modules dialog box, make sure that the option to extract to the target JAR is selected, and then click OK. This creates a single JAR with all dependencies.

      Create JAR

    5. The Output Layout tab lists all the jars that are included as part of the Maven project. You can select and delete the ones on which the Scala application has no direct dependency. For the application we are creating here, you can remove all but the last one (microsoft-spark-streaming-examples compile output). Select the jars to delete and then click the Delete icon (delete icon).

      Create JAR

      Make sure Build on make box is selected, which ensures that the jar is created every time the project is built or updated. Click Apply.

    6. In the Output Layout tab, right at the bottom of the Available Elements box, you have the SQL JDBC jar that you added earlier to the project library. You must add this to the Output Layout tab. Right-click the jar file, and then click Extract Into Output Root.

      Extract dependency jar

      The Output Layout tab should now look like this.

      Final output tab

      In the Project Structure dialog box, click Apply and then click OK.

    7. From the menu bar, click Build, and then click Make Project. You can also click Build Artifacts to create the jar. The output jar is created under \classes\artifacts.

      Create JAR

Run the applications remotely on a Spark cluster using Livy

We use Livy to run the streaming application remotely on a Spark cluster. For detailed discussion on how to use Livy with HDInsight Spark cluster, see Submit jobs remotely to an Apache Spark cluster on Azure HDInsight. Before you can start running the remote jobs to stream events using Spark there are a couple of things you should do:

  1. Start the local standalone application to generate events and sent to Event Hub. Use the following command to do so:

     java -cp com-microsoft-azure-eventhubs-client-example.jar com.microsoft.eventhubs.client.example.EventhubsClientDriver --eventhubs-namespace "mysbnamespace" --eventhubs-name "myeventhub" --policy-name "mysendpolicy" --policy-key "<policy key>" --message-length 32 --thread-count 32 --message-count -1
    
  2. Copy the streaming jar (spark-streaming-data-persistence-examples.jar) to the Azure Blob storage associated with the cluster. This makes the jar accessible to Livy. You can use AzCopy, a command line utility, to do so. There are a lot of other clients you can use to upload data. You can find more about them at Upload data for Hadoop jobs in HDInsight.

  3. Install CURL on the computer where you are running these applications from. We use CURL to invoke the Livy endpoints to run the jobs remotely.

Run the applications to receive the events into an Azure Storage Blob as text

Open a command prompt, navigate to the directory where you installed CURL, and run the following command (replace username/password and cluster name):

curl -k --user "admin:mypassword1!" -v -H "Content-Type: application/json" -X POST --data @C:\Temp\inputBlob.txt "https://mysparkcluster.azurehdinsight.net/livy/batches"

The parameters in the file inputBlob.txt are defined as follows:

{ "file":"wasbs:///example/jars/spark-streaming-data-persistence-examples.jar", "className":"com.microsoft.spark.streaming.examples.workloads.EventhubsEventCount", "args":["--eventhubs-namespace", "mysbnamespace", "--eventhubs-name", "myeventhub", "--policy-name", "myreceivepolicy", "--policy-key", "<put-your-key-here>", "--consumer-group", "$default", "--partition-count", 10, "--batch-interval-in-seconds", 20, "--checkpoint-directory", "/EventCheckpoint", "--event-count-folder", "/EventCount/EventCount10"], "numExecutors":20, "executorMemory":"1G", "executorCores":1, "driverMemory":"2G" }

Let us understand what the parameters in the input file are:

  • file is the path to the application jar file on the Azure storage account associated with the cluster.
  • className is the name of the class in the jar.
  • args is the list of arguments required by the class
  • numExecutors is the number of cores used by Spark to run the streaming application. This should always be at least twice the number of Event Hub partitions.
  • executorMemory, executorCores, driverMemory are parameters used to assign required resources to the streaming application.
Note

You do not need to create the output folders (EventCheckpoint, EventCount/EventCount10) that are used as parameters. The streaming application creates them for you.

When you run the command, you should see an output like the following:

< HTTP/1.1 201 Created
< Content-Type: application/json; charset=UTF-8
< Location: /18
< Server: Microsoft-IIS/8.5
< X-Powered-By: ARR/2.5
< X-Powered-By: ASP.NET
< Date: Tue, 01 Dec 2015 05:39:10 GMT
< Content-Length: 37
<
{"id":1,"state":"starting","log":[]}* Connection #0 to host mysparkcluster.azurehdinsight.net left intact

Make a note of the batch ID in the last line of the output (in this example it is '1'). To verify that the application runs successfully, you can look at your Azure storage account associated with the cluster and you should see the /EventCount/EventCount10 folder created there. This folder should contain blobs that captures the number of events processed within the time period specified for the parameter batch-interval-in-seconds.

The application will continue to run until you kill it. To do so, use the following command:

curl -k --user "admin:mypassword1!" -v -X DELETE "https://mysparkcluster.azurehdinsight.net/livy/batches/1"

Run the applications to receive the events into an Azure Storage Blob as JSON

Open a command prompt, navigate to the directory where you installed CURL, and run the following command (replace username/password and cluster name):

curl -k --user "admin:mypassword1!" -v -H "Content-Type: application/json" -X POST --data @C:\Temp\inputJSON.txt "https://mysparkcluster.azurehdinsight.net/livy/batches"

The parameters in the file inputJSON.txt are defined as follows:

{ "file":"wasbs:///example/jars/spark-streaming-data-persistence-examples.jar", "className":"com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON", "args":["--eventhubs-namespace", "mysbnamespace", "--eventhubs-name", "myeventhub", "--policy-name", "myreceivepolicy", "--policy-key", "<put-your-key-here>", "--consumer-group", "$default", "--partition-count", 10, "--batch-interval-in-seconds", 20, "--checkpoint-directory", "/EventCheckpoint", "--event-count-folder", "/EventCount/EventCount10", "--event-store-folder", "/EventStore10"], "numExecutors":20, "executorMemory":"1G", "executorCores":1, "driverMemory":"2G" }

The parameters are similar to what you specified for the text output, in the previous step. Again, you do not need to create the output folders (EventCheckpoint, EventCount/EventCount10) that are used as parameters. The streaming application creates them for you.

After you run the command, you can look at your Azure storage account associated with the cluster and you should see the /EventStore10 folder created there. Open any file prefixed with part- and you should see the events processed in a JSON format.

Run the applications to receive the events into a Hive table

To run the application that streams events into a Hive table you need some additional components. These are:

  • datanucleus-api-jdo-3.2.6.jar
  • datanucleus-rdbms-3.2.9.jar
  • datanucleus-core-3.2.10.jar
  • hive-site.xml

The .jar files are available on your HDInsight Spark cluster at /usr/hdp/current/spark-client/lib. The hive-site.xml is available at /usr/hdp/current/spark-client/conf.

You can use WinScp to copy over these files from the cluster to your local computer. You can then use tools to copy these files over to your storage account associated with the cluster. For more information on how to upload files to the storage account, see Upload data for Hadoop jobs in HDInsight.

Once you have copied over the files to your Azure storage account, open a command prompt, navigate to the directory where you installed CURL, and run the following command (replace username/password and cluster name):

curl -k --user "admin:mypassword1!" -v -H "Content-Type: application/json" -X POST --data @C:\Temp\inputHive.txt "https://mysparkcluster.azurehdinsight.net/livy/batches"

The parameters in the file inputHive.txt are defined as follows:

{ "file":"wasbs:///example/jars/spark-streaming-data-persistence-examples.jar", "className":"com.microsoft.spark.streaming.examples.workloads.EventhubsToHiveTable", "args":["--eventhubs-namespace", "mysbnamespace", "--eventhubs-name", "myeventhub", "--policy-name", "myreceivepolicy", "--policy-key", "<put-your-key-here>", "--consumer-group", "$default", "--partition-count", 10, "--batch-interval-in-seconds", 20, "--checkpoint-directory", "/EventCheckpoint", "--event-count-folder", "/EventCount/EventCount10", "--event-hive-table", "EventHiveTable10" ], "jars":["wasbs:///example/jars/datanucleus-api-jdo-3.2.6.jar", "wasbs:///example/jars/datanucleus-rdbms-3.2.9.jar", "wasbs:///example/jars/datanucleus-core-3.2.10.jar"], "files":["wasbs:///example/jars/hive-site.xml"], "numExecutors":20, "executorMemory":"1G", "executorCores":1, "driverMemory":"2G" }

The parameters are similar to what you specified for the text output, in the previous steps. Again, you do not need to create the output folders (EventCheckpoint, EventCount/EventCount10) or the output Hive table (EventHiveTable10) that are used as parameters. The streaming application creates them for you. Note that the jars and files option includes paths to the .jar files and the hive-site.xml that you copied over to the storage account.

To verify that the hive table was successfully created, you can SSH into the cluster and run Hive queries. For instructions, see Use Hive with Hadoop in HDInsight with SSH. Once you are connected using SSH, you can run the following command to verify that the Hive table, EventHiveTable10, is created.

show tables;

You should see an output similar to the following:

OK
eventhivetable10
hivesampletable

You can also run a SELECT query to view the contents of the table.

SELECT * FROM eventhivetable10 LIMIT 10;

You should see an output like the following:

ZN90apUSQODDTx7n6Toh6jDbuPngqT4c
sor2M7xsFwmaRW8W8NDwMneFNMrOVkW1
o2HcsU735ejSi2bGEcbUSB4btCFmI1lW
TLuibq4rbj0T9st9eEzIWJwNGtMWYoYS
HKCpPlWFWAJILwR69MAq863nCWYzDEw6
Mvx0GQOPYvPR7ezBEpIHYKTKiEhYammQ
85dRppSBSbZgThLr1s0GMgKqynDUqudr
5LAWkNqorLj3ZN9a2mfWr9rZqeXKN4pF
ulf9wSFNjD7BZXCyunozecov9QpEIYmJ
vWzM3nvOja8DhYcwn0n5eTfOItZ966pa
Time taken: 4.434 seconds, Fetched: 10 row(s)

Run the applications to receive the events into an Azure SQL database table

Before running this step, make sure you have an Azure SQL database created. For instructions, see Create a SQL database in minutes. To complete this section, you need values for database name, database server name, and the database administrator credentials as parameters. You do not need to create the database table though. The streaming application creates that for you.

Open a command prompt, navigate to the directory where you installed CURL, and run the following command:

curl -k --user "admin:mypassword1!" -v -H "Content-Type: application/json" -X POST --data @C:\Temp\inputSQL.txt "https://mysparkcluster.azurehdinsight.net/livy/batches"

The parameters in the file inputSQL.txt are defined as follows:

{ "file":"wasbs:///example/jars/spark-streaming-data-persistence-examples.jar", "className":"com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureSQLTable", "args":["--eventhubs-namespace", "mysbnamespace", "--eventhubs-name", "myeventhub", "--policy-name", "myreceivepolicy", "--policy-key", "<put-your-key-here>", "--consumer-group", "$default", "--partition-count", 10, "--batch-interval-in-seconds", 20, "--checkpoint-directory", "/EventCheckpoint", "--event-count-folder", "/EventCount/EventCount10", "--sql-server-fqdn", "<database-server-name>.database.windows.net", "--sql-database-name", "mysparkdatabase", "--database-username", "sparkdbadmin", "--database-password", "<put-password-here>", "--event-sql-table", "EventContent" ], "numExecutors":20, "executorMemory":"1G", "executorCores":1, "driverMemory":"2G" }

To verify that the application runs successfully, you can connect to the Azure SQL database using SQL Server Management Studio. For instructions on how to do that, see Connect to SQL Database with SQL Server Management Studio. Once you are connected to the database, you can navigate to the EventContent table that was created by the streaming application. You can run a quick query to get the data from the table. Run the following query:

SELECT * FROM EventCount

You should see output similar to the following:

00046b0f-2552-4980-9c3f-8bba5647c8ee
000b7530-12f9-4081-8e19-90acd26f9c0c
000bc521-9c1b-4a42-ab08-dc1893b83f3b
00123a2a-e00d-496a-9104-108920955718
0017c68f-7a4e-452d-97ad-5cb1fe5ba81b
001KsmqL2gfu5ZcuQuTqTxQvVyGCqPp9
001vIZgOStka4DXtud0e3tX7XbfMnZrN
00220586-3e1a-4d2d-a89b-05c5892e541a
0029e309-9e54-4e1b-84be-cd04e6fce5ec
003333cf-874f-4045-9da3-9f98c2b4ea49
0043c07e-8d73-420a-9af7-1fcb94575356
004a11a9-0c2c-4bc0-a7d5-2e0ebd947ab9

See also

Scenarios

Create and run applications

Tools and extensions

Manage resources