Process events from Azure Event Hubs with Storm on HDInsight (Java)

Learn how to use Azure Event Hubs with Storm on HDInsight. This example uses Java-based components to read and write data in Azure Event Hubs.

Azure Event Hubs allows you to process massive amounts of data from websites, apps, and devices. The Event Hub spout makes it easy to use Apache Storm on HDInsight to analyze this data in real time. You can also write data to Event Hubs from Storm by using the Event Hubs bolt.

Prerequisites

  • An Apache Storm on HDInsight cluster version 3.6. For more information, see Get started with Storm on HDInsight cluster.

    Important

    Linux is the only operating system used on HDInsight version 3.4 or greater. For more information, see HDInsight retirement on Windows.

  • An Azure Event Hub.

  • Oracle Java Developer Kit (JDK) version 8 or equivalent, such as OpenJDK.

  • Maven: Maven is a project build system for Java projects.

  • A text editor or integrated development environment (IDE).

    Note

    Your editor or IDE may have specific functionality for working with Maven that is not addressed in this document. For information about the capabilities of your editing environment, see the documentation for the product you are using.

  • The ssh and scp commands. These are used to copy files to the HDInsight cluster. On Windows, you can get these through Bash on Windows 10.

Understanding the example

The hdinsight-java-storm-eventhub example contains two topologies:

The resources/writer.yaml topology writes random data to an Azure Event Hub. The data is generated by the DeviceSpout component, and is a random device ID and device value. So it's simulating some hardware that emits a string ID and a numeric value.

Thee resources/reader.yaml topology reads data from Event Hub (the data written by EventHubWriter,) parses the JSON data, and then logs the deviceId and deviceValue data.

The data is formatted as a JSON document before it is written to Event Hub, and when read by the reader it is parsed out of JSON and into tuples. The JSON format is as follows:

{ "deviceId": "unique identifier", "deviceValue": some value }

Project configuration

The POM.xml file contains configuration information for this Maven project. The interesting pieces are:

Event Hub components

The component that reads and writes to Azure Event Hubs is located in the HDInsight repository. The following sections in the POM.xml file load the components from this repository

<repositories>
    <repository>
        <id>hdinsight-examples</id>
        <url>http://raw.github.com/hdinsight/mvn-repo/master</url>
    </repository>
</repositories>

The EventHubs Storm Spout dependency

<dependency>
    <groupId>com.microsoft</groupId>
    <artifactId>eventhubs</artifactId>
    <version>${storm.eventhub.version}</version>
</dependency>

This xml defines a dependency for the eventhubs package, which contains both a spout for reading from Event Hubs, and a bolt for writing to it.

</source>
    <target>1.8</target>
    </configuration>
</plugin>

This xml configures the project to generate output for Java 8, which is used by HDInsight 3.5 or higher.

The maven-shade-plugin

<!-- build an uber jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
    <transformers>
    <!-- Keep us from getting a can't overwrite file error -->
    <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
    </transformers>
    <!-- Keep us from getting a bad signature error -->
    <filters>
    <filter>
        <artifact>*:*</artifact>
        <excludes>
            <exclude>META-INF/*.SF</exclude>
            <exclude>META-INF/*.DSA</exclude>
            <exclude>META-INF/*.RSA</exclude>
        </excludes>
    </filter>
    </filters>
</configuration>
<executions>
    <execution>
    <phase>package</phase>
    <goals>
        <goal>shade</goal>
    </goals>
    </execution>
</executions>
</plugin>

This xml configures the solution to package the output into an uber jar. The jar contains both the project code and required dependencies. It is also used to:

  • Rename license files for the dependencies.
  • Exclude security/signatures.
  • Ensure that multiple implementations of the same interface are merged into one entry.

These configuration settings prevent errors at runtime.

Topology definitions

This example uses the Flux framework. This framework uses YAML to define the topologies. The primary benefit is that you aren't hard coding the topology in Java code. Since the definition is YAML, you can change it before submitting the topology, without having to recompile everything.

writer.yaml:

---
# Topology that reads from Event Hubs
name: "eventhubwriter"

components:
  # Configure the Event Hub spout
  - id: "eventhubbolt-config"
    className: "org.apache.storm.eventhubs.bolt.EventHubBoltConfig"
    constructorArgs:
      # These are populated from the .properties file when the topology is started
      - "${eventhub.write.policy.name}"
      - "${eventhub.write.policy.key}"
      - "${eventhub.namespace}"
      - "servicebus.windows.net"
      - "${eventhub.name}"

spouts:
  - id: "device-emulator-spout"
    className: "com.microsoft.example.DeviceSpout"
    parallelism: ${eventhub.partitions}

bolts:
  - id: "eventhub-bolt"
    className: "org.apache.storm.eventhubs.bolt.EventHubBolt"
    constructorArgs:
      - ref: "eventhubbolt-config" # config declared in components section
    # parallelism hint. This should be the same as the number of partitions for your Event Hub, so we read it from the dev.properties file passed at run time.
    parallelism: ${eventhub.partitions}

  # Log information
  - id: "log-bolt"
    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
    parallelism: 1

# How data flows through the components
streams:
  - name: "spout -> eventhub" # just a string used for logging
    from: "device-emulator-spout"
    to: "eventhub-bolt"
    grouping:
        type: SHUFFLE

  - name: "spout -> logger"
    from: "device-emulator-spout"
    to: "log-bolt"
    grouping:
        type: SHUFFLE

reader.yaml:

---
# Topology that reads from Event Hubs
name: "eventhubreader"

components:
  # Configure the Event Hub spout
  - id: "eventhubspout-config"
    className: "org.apache.storm.eventhubs.spout.EventHubSpoutConfig"
    constructorArgs:
      # These are populated from the .properties file when the topology is started
      - "${eventhub.read.policy.name}"
      - "${eventhub.read.policy.key}"
      - "${eventhub.namespace}"
      - "${eventhub.name}"
      - ${eventhub.partitions}

spouts:
  - id: "eventhub-spout"
    className: "org.apache.storm.eventhubs.spout.EventHubSpout"
    constructorArgs:
      - ref: "eventhubspout-config" # config declared in components section
    # parallelism hint. This should be the same as the number of partitions for your Event Hub, so we read it from the dev.properties file passed at run time.
    parallelism: ${eventhub.partitions}

bolts:
  # Log information
  - id: "log-bolt"
    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
    parallelism: 1

  # Parses from JSON into tuples
  - id: "parser-bolt"
    className: "com.microsoft.example.ParserBolt"
    parallelism: ${eventhub.partitions}

# How data flows through the components
streams:
  - name: "spout -> parser" # just a string used for logging
    from: "eventhub-spout"
    to: "parser-bolt"
    grouping:
        type: SHUFFLE

  - name: "parser -> log-bolt"
    from: "parser-bolt"
    to: "log-bolt"
    grouping:
        type: SHUFFLE

Tell the topology about Event Hub

At run time, the dev.properties file is used to pass the Event Hub configuration to the topology. The following example is the default contents of the file:

eventhub.write.policy.name: writer
eventhub.write.policy.key: your_key_here
eventhub.read.policy.name: reader
eventhub.read.policy.key: your_key_here
eventhub.namespace: your_namespace_here
eventhub.name: storm
eventhub.partitions: 2

Configure environment variables

The following environment variables may be set when you install Java and the JDK on your development workstation. However, you should check that they exist and that they contain the correct values for your system.

  • JAVA_HOME - should point to the directory where the Java runtime environment (JRE) is installed. For example, in a Unix or Linux distribution, it should have a value similar to /usr/lib/jvm/java-7-oracle. In Windows, it would have a value similar to c:\Program Files (x86)\Java\jre1.7
  • PATH - should contain the following paths:

    • JAVA_HOME (or the equivalent path)
    • JAVA_HOME\bin (or the equivalent path)
    • The directory where Maven is installed

Configure Event Hub

Event Hubs is the data source for this example. Use the following steps to create a Event Hub.

  1. From the Azure Classic Portal, select NEW > Service Bus > Event Hub > Custom Create.

  2. On the Add a new Event Hub screen, enter an Event Hub Name. Select the Region to create the hub in, and then create a namespace or select an existing one. Finally, click the Arrow to continue.

    wizard page 1

    Note

    Select the same Location as your Storm on HDInsight server to reduce latency and costs.

  3. On the Configure Event Hub screen, enter the Partition count and Message Retention values. For this example, use a partition count of 10 and a message retention of 1. Note the partition count because you need this value later.

    wizard page 2

  4. After the event hub has been created, select the namespace, select Event Hubs, and then select the event hub that you created earlier.

  5. Select Configure, then create two new access policies by using the following information:

    NamePermissions
    WriterSend
    ReaderListen

    After You create the permissions, select the Save icon at the bottom of the page. These shared access policies are used to read and write to Event Hub.

    policies

  6. After you save the policies, use the Shared access key generator at the bottom of the page to retrieve the key for the writer and reader policies. Save these keys.

Download and build the project

  1. Download the project from GitHub: hdinsight-java-storm-eventhub. You can either download the package as a zip archive, or use git to clone the project locally.

  2. Modify the dev.properties file with the configuration for your Event Hub.

  3. Use the following to build and package the project:

     mvn package
    

    This command downloads required dependencies, builds, and then packages the project. The output is stored in the /target directory as EventHubExample-1.0-SNAPSHOT.jar.

Test locally

Since these topologies just read and write to Event Hubs, you can test them locally if you have a Storm development environment. Use the following steps to run locally in the dev environment:

  1. Run the writer:

     storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /writer.yaml --filter dev.properties
    
  2. Run the reader:

     storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /reader.yaml --filter dev.properties
    
Tip
  • --local: Run the topology in local mode (non-distributed).
  • -R /writer.yaml: Load the topology definition from the resources packaged in the jar. If the topology is a file on the local file system, specify the path to it as the last parameter instead.
  • --filter dev.properties: Use the contents of dev.properties to fill in the values in the topology definitions. For example, ${eventhub.read.policy.name}.

Output is logged to the console when running locally. Use Ctrl+C to stop the topology.

Deploy the topologies

  1. Use SCP to copy the jar package to your HDInsight cluster. Replace USERNAME with the SSH user for your cluster. Replace CLUSTERNAME with the name of your HDInsight cluster:

     scp ./target/EventHubExample-1.0-SNAPSHOT.jar USERNAME@CLUSTERNAME-ssh.azurehdinsight.net:.
    

    If you used a password for your SSH account, you are prompted to enter the password. If you used an SSH key with the account, you may need to use the -i parameter to specify the path to the key file. For example, scp -i ~/.ssh/id_rsa ./target/EventHubExample-1.0-SNAPSHOT.jar USERNAME@CLUSTERNAME-ssh.azurehdinsight.net:.

    This command copies the file to the home directory of your SSH user on the cluster.

  2. Once the file has finished uploading, use SSH to connect to the HDInsight cluster. Replace USERNAME the name of your SSH login. Replace CLUSTERNAME with your HDInsight cluster name:

     ssh USERNAME@CLUSTERNAME-ssh.azurehdinsight.net
    
    Note

    If you used a password for your SSH account, you are prompted to enter the password. If you used an SSH key with the account, you may need to use the -i parameter to specify the path to the key file. The following example loads the private key from ~/.ssh/id_rsa:

    ssh -i ~/.ssh/id_rsa USERNAME@CLUSTERNAME-ssh.azurehdinsight.net

  3. Use the following command to start the topologies:

     storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writer.yaml --filter dev.properties
     storm jar EventHubExample-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml --filter dev.properties
    
    Tip
    • --remote: Submits the topology to the Nimbus service, which starts it on the worker nodes in the cluster.
  4. To view the logged data, go to https://CLUSTERNAME.azurehdinsight.net/stormui, where CLUSTERNAME is the name of your HDInsight cluster. Select the topologies and drill down to the components. Select the port entry for an instance of a component to view logged information.

  5. Use the following commands to stop the topologies:

     storm kill reader
     storm kill writer
    

Delete your cluster

Warning

Billing for HDInsight clusters is prorated per minute, whether you are using them or not. Be sure to delete your cluster after you have finished using it. For more information, see How to delete an HDInsight cluster.

Next steps