チュートリアル:HDInsight で Apache Kafka による Apache Spark 構造化ストリーミングを使用するTutorial: Use Apache Spark Structured Streaming with Apache Kafka on HDInsight

このチュートリアルでは、Azure HDInsight で Apache Kafka による Apache Spark 構造化ストリーミングを使用してデータを読み書きする方法について説明します。This tutorial demonstrates how to use Apache Spark Structured Streaming to read and write data with Apache Kafka on Azure HDInsight.

Spark 構造化ストリーミングは、Spark SQL を基盤とするストリーム処理エンジンです。Spark Structured Streaming is a stream processing engine built on Spark SQL. 静的データに対してバッチ計算と同様にストリーミング計算を表現できるようになります。It allows you to express streaming computations the same as batch computation on static data.

このチュートリアルでは、以下の内容を学習します。In this tutorial, you learn how to:

  • Azure Resource Manager テンプレートを使用してクラスターを作成するUse an Azure Resource Manager template to create clusters
  • Kafka で Spark 構造化ストリーミングを使用するUse Spark Structured Streaming with Kafka

このドキュメントの手順を完了したら、余分に課金されないようにするためにクラスターは削除してください。When you are done with the steps in this document, remember to delete the clusters to avoid excess charges.

前提条件Prerequisites

重要

このドキュメントの手順には、HDInsight の Spark クラスターと HDInsight の Kafka クラスターの両方を含む Azure リソース グループが必要です。The steps in this document require an Azure resource group that contains both a Spark on HDInsight and a Kafka on HDInsight cluster. これらのクラスターは両方とも、Spark クラスターが Kafka クラスターと直接通信できるように、Azure Virtual Network 内に配置します。These clusters are both located within an Azure Virtual Network, which allows the Spark cluster to directly communicate with the Kafka cluster.

利便性のために、このドキュメントは、必要なすべての Azure リソースを作成できるテンプレートにリンクしています。For your convenience, this document links to a template that can create all the required Azure resources.

仮想ネットワークでの HDInsight の使用方法の詳細については、HDInsight 用の仮想ネットワークの計画に関するドキュメントを参照してください。For more information on using HDInsight in a virtual network, see the Plan a virtual network for HDInsight document.

Apache Kafka での構造化ストリーミングStructured Streaming with Apache Kafka

Spark 構造化ストリーミングは、Spark SQL エンジンを基盤とするストリーム処理エンジンです。Spark Structured Streaming is a stream processing engine built on the Spark SQL engine. 構造化ストリーミングを使用すると、バッチ クエリと同様にストリーミング クエリを記述できます。When using Structured Streaming, you can write streaming queries the same way that you write batch queries.

次のコード スニペットは、Kafka からの読み取りとファイルへの保存を示しています。The following code snippets demonstrate reading from Kafka and storing to file. 最初のコード スニペットはバッチ操作であり、2 番目はストリーミング操作です。The first one is a batch operation, while the second one is a streaming operation:

// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .write
                .format("parquet")
                .option("path","/example/batchtripdata")
                .option("checkpointLocation", "/batchcheckpoint")
                .save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .writeStream
                .format("parquet")
                .option("path","/example/streamingtripdata")
                .option("checkpointLocation", "/streamcheckpoint")
                .start.awaitTermination(30000)

どちらのスニペットでも、データは Kafka から読み取られ、ファイルに書き込まれます。In both snippets, data is read from Kafka and written to file. これらの例の違いは次のとおりです。The differences between the examples are:

BatchBatch ストリーミングStreaming
read readStream
write writeStream
save start

ストリーミング操作では、30,000 ミリ秒後にストリーミングを停止する、awaitTermination(30000) も使用されています。The streaming operation also uses awaitTermination(30000), which stops the stream after 30,000 ms.

Kafka で構造化ストリーミングを使用するには、プロジェクトに org.apache.spark : spark-sql-kafka-0-10_2.11 パッケージの依存関係が必要です。To use Structured Streaming with Kafka, your project must have a dependency on the org.apache.spark : spark-sql-kafka-0-10_2.11 package. このパッケージのバージョンは、HDInsight の Spark のバージョンと一致する必要があります。The version of this package should match the version of Spark on HDInsight. Spark 2.2.0 (HDInsight 3.6 で使用可能) の場合、さまざまなプロジェクトの種類の依存関係情報を https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar で確認できます。For Spark 2.2.0 (available in HDInsight 3.6), you can find the dependency information for different project types at https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.

このチュートリアルで使用される Jupyter Notebook では、次のセルでこのパッケージの依存関係を読み込みます。For the Jupyter Notebook used with this tutorial, the following cell loads this package dependency:

%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

クラスターの作成Create the clusters

HDInsight の Apache Kafka では、パブリック インターネットを介した Kafka ブローカーへのアクセスは提供されていません。Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Kafka を使用するものは、すべて同じ Azure 仮想ネットワークに属している必要があります。Anything that uses Kafka must be in the same Azure virtual network. このチュートリアルでは、Kafka クラスターと Spark クラスターの両方を同じ Azure 仮想ネットワーク内に配置します。In this tutorial, both the Kafka and Spark clusters are located in the same Azure virtual network.

次の図に、Spark と Kafka の間の通信フローを示します。The following diagram shows how communication flows between Spark and Kafka:

Azure 仮想ネットワークにおける Spark クラスターと Kafka クラスターの図

注意

Kafka サービスは、仮想ネットワーク内の通信に制限されます。The Kafka service is limited to communication within the virtual network. SSH や Ambari など、クラスター上の他のサービスは、インターネット経由でアクセスできます。Other services on the cluster, such as SSH and Ambari, can be accessed over the internet. HDInsight で使用できるパブリック ポートの詳細については、「HDInsight で使用されるポートと URI」を参照してください。For more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

Azure 仮想ネットワークを作成し、その仮想ネットワーク内に Kafka クラスターと Spark クラスターを作成するには、次の手順に従います。To create an Azure Virtual Network, and then create the Kafka and Spark clusters within it, use the following steps:

  1. 次のボタンを使用して Azure にサインインし、Azure Portal でテンプレートを開きます。Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure button for new cluster

    Azure Resource Manager テンプレートは、 https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json にあります。The Azure Resource Manager template is located at https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.

    このテンプレートでは、次のリソースを作成します。This template creates the following resources:

    • HDInsight 3.6 クラスター上の Kafka。A Kafka on HDInsight 3.6 cluster.

    • HDInsight 3.6 クラスター上の Spark 2.2.0。A Spark 2.2.0 on HDInsight 3.6 cluster.

    • Azure Virtual Network (HDInsight クラスターを含む)An Azure Virtual Network, which contains the HDInsight clusters.

      重要

      このチュートリアルで使用する構造化ストリーミング Notebook では、HDInsight 3.6 上に Spark 2.2.0 が必要です。The structured streaming notebook used in this tutorial requires Spark 2.2.0 on HDInsight 3.6. HDInsight 上で以前のバージョンの Spark を使用している場合は、ノートブックを使用するとエラーを受信します。If you use an earlier version of Spark on HDInsight, you receive errors when using the notebook.

  2. 次の情報に従って、 [カスタマイズされたテンプレート] セクションの各エントリに入力します。Use the following information to populate the entries on the Customized template section:

    SettingSetting Value
    SubscriptionSubscription お使いの Azure サブスクリプションYour Azure subscription
    Resource groupResource group リソースが含まれるリソース グループ。The resource group that contains the resources.
    LocationLocation リソースが作成される Azure リージョン。The Azure region that the resources are created in.
    [Spark Cluster Name](Spark クラスター名)Spark Cluster Name Spark クラスターの名前。The name of the Spark cluster. 最初の 6 文字は、Kafka クラスターの名前と異なるものにする必要があります。The first six characters must be different than the Kafka cluster name.
    [Kafka Cluster Name](Kafka クラスター名)Kafka Cluster Name Kafka クラスターの名前。The name of the Kafka cluster. 最初の 6 文字は、Spark クラスターの名前と異なるものにする必要があります。The first six characters must be different than the Spark cluster name.
    [Cluster Login User Name](クラスター ログイン ユーザー名)Cluster Login User Name クラスターの管理者ユーザー名。The admin user name for the clusters.
    [クラスター ログイン パスワード]Cluster Login Password クラスターの管理者ユーザー パスワード。The admin user password for the clusters.
    [SSH ユーザー名]SSH User Name クラスター用に作成する SSH ユーザー。The SSH user to create for the clusters.
    [SSH パスワード]SSH Password SSH ユーザーのパスワード。The password for the SSH user.

    カスタマイズされたテンプレートのスクリーンショット

  3. 使用条件を読み、 [上記の使用条件に同意する] をオンにします。Read the Terms and Conditions, and then select I agree to the terms and conditions stated above.

  4. [購入] を選択します。Select Purchase.

注意

クラスターの作成には最大で 20 分かかります。It can take up to 20 minutes to create the clusters.

Spark 構造化ストリーミングを使用するUse Spark Structured Streaming

この例では、HDInsight 上の Kafka で Spark 構造化ストリーミングを使用する方法を示します。This example demonstrates how to use Spark Structured Streaming with Kafka on HDInsight. ここでは、ニューヨーク市が提供しているタクシー乗車データを使用します。It uses data on taxi trips, which is provided by New York City. このノートブックで使用されるデータ セットは「2016 Green Taxi Trip Data (2016 年のグリーン タクシー乗車データ)」です。The data set used by this notebook is from 2016 Green Taxi Trip Data.

  1. ホスト情報を収集します。Gather host information. 以下の curl コマンドと jq コマンドを使用して、Kafka ZooKeeper とブローカー ホストの情報を取得します。Use the curl and jq commands below to obtain your Kafka ZooKeeper and broker hosts information. これらのコマンドは Windows コマンド プロンプト用に設計されているので、他の環境では若干の調整が必要となります。The commands are designed for a Windows command prompt, slight variations will be needed for other environments. KafkaCluster を Kafka クラスターの名前に、KafkaPassword をクラスターのログイン パスワードに置き換えます。Replace KafkaCluster with the name of your Kafka cluster, and KafkaPassword with the cluster login password. また、C:\HDI\jq-win64.exe を実際の jq インストールへのパスに置き換えます。Also, replace C:\HDI\jq-win64.exe with the actual path to your jq installation. Windows コマンド プロンプトでこれらのコマンドを入力し、後の手順で使用するために出力を保存します。Enter the commands in a Windows command prompt and save the output for use in later steps.

    set CLUSTERNAME=KafkaCluster
    set PASSWORD=KafkaPassword
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
    
  2. Web ブラウザーで、Spark クラスターの Jupyter Notebook に接続します。In your web browser, connect to the Jupyter notebook on your Spark cluster. 次の URL のCLUSTERNAME をお使いの Spark クラスターの名前に置き換えます。In the following URL, replace CLUSTERNAME with the name of your Spark cluster:

     https://CLUSTERNAME.azurehdinsight.net/jupyter
    

    プロンプトが表示されたら、クラスターの作成時に使用したログイン (管理者) パスワードを入力します。When prompted, enter the cluster login (admin) and password used when you created the cluster.

  3. [New](新規)、[Spark] の順に選択して、ノートブックを作成します。Select New > Spark to create a notebook.

  4. Spark ストリーミングには、マイクロバッチ処理があります。つまり、データをバッチとして受信し、データのバッチに対して Executor が実行されます。Spark streaming has microbatching, which means data comes as batches and executers run on the batches of data. Executor のアイドル タイムアウトがバッチの処理にかかる時間よりも少ない場合、Executor は常に追加され、削除されます。If the executor has idle timeout less than the time it takes to process the batch then the executors would be constantly added and removed. Executor のアイドル タイムアウトがバッチ期間より長い場合、Executor は削除されません。If the executors idle timeout is greater than the batch duration, the executor never gets removed. そのため、ストリーミング アプリケーションを実行する場合は、spark.dynamicAllocation.enabled を false に設定して、動的割り当てを無効にすることをお勧めします。Hence we recommend that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.

    次の情報を Notebook セルに入力して、Notebook で使用されるパッケージを読み込みます。Load packages used by the Notebook by entering the following information in a Notebook cell. Ctrl + Enter キーを使用してコマンドを実行します。Run the command by using CTRL + ENTER.

    %%configure -f
    {
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
            "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
            "spark.dynamicAllocation.enabled": false
        }
    }
    
  5. Kafka トピックを作成します。Create the Kafka topic. 以下のコマンドに対して、YOUR_ZOOKEEPER_HOSTS を最初の手順で抽出した Zookeeper ホスト情報で置き換えるという編集を実施します。Edit the command below by replacing YOUR_ZOOKEEPER_HOSTS with the Zookeeper host information extracted in the first step. 編集したコマンドを Jupyter Notebook に入力して、tripdata トピックを作成します。Enter the edited command in your Jupyter Notebook to create the tripdata topic.

    %%bash
    export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS"
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
    
  6. タクシー乗車データを取得します。Retrieve data on taxi trips. 次のセルにコマンドを入力して、ニューヨーク市のタクシー乗車データを読み込みます。Enter the command in the next cell to load data on taxi trips in New York City. データフレームにデータが読み込まれると、そのデータフレームはセルの出力として表示されます。The data is loaded into a dataframe and then the dataframe is displayed as the cell output.

    import spark.implicits._
    
    // Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
    val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
    val result = scala.io.Source.fromURL(url).mkString
    
    // Create a dataframe from the JSON data
    val taxiDF = spark.read.json(Seq(result).toDS)
    
    // Display the dataframe containing trip data
    taxiDF.show()
    
  7. Kafka ブローカー ホストの情報を設定します。Set the Kafka broker hosts information. YOUR_KAFKA_BROKER_HOSTS を手順 1 で抽出したブローカー ホスト情報に置き換えます。Replace YOUR_KAFKA_BROKER_HOSTS with the broker hosts information you extracted in step 1. 次の Jupyter Notebook セルに編集したコマンドを入力します。Enter the edited command in the next Jupyter Notebook cell.

    // The Kafka broker hosts and topic used to write to Kafka
    val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
    val kafkaTopic="tripdata"
    
    println("Finished setting Kafka broker and topic configuration.")
    
  8. Kafka にデータを送信します。Send the data to Kafka. 次のコマンドでは、vendorid フィールドが Kafka メッセージのキー値として使用されています。In the following command, the vendorid field is used as the key value for the Kafka message. このキーは、Kafka でデータをパーティション分割するときに使用されます。The key is used by Kafka when partitioning data. フィールドはすべて JSON 文字列値として Kafka メッセージに格納されます。All of the fields are stored in the Kafka message as a JSON string value. Jupyter で次のコマンドを入力し、バッチ クエリを使用して Kafka にデータを保存します。Enter the following command in Jupyter to save the data to Kafka using a batch query.

    // Select the vendorid as the key and save the JSON string as the value.
    val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
    
    println("Data sent to Kafka")
    
  9. スキーマを宣言します。Declare a schema. 次のコマンドは、Kafka から JSON データを読み取るときにスキーマを使用する方法を示しています。The following command demonstrates how to use a schema when reading JSON data from kafka. 次の Jupyter セルにコマンドを入力します。Enter the command in your next Jupyter cell.

    // Import bits useed for declaring schemas and working with JSON data
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    // Define a schema for the data
    val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
    // Reproduced here for readability
    //val schema = (new StructType)
    //   .add("dropoff_latitude", StringType)
    //   .add("dropoff_longitude", StringType)
    //   .add("extra", StringType)
    //   .add("fare_amount", StringType)
    //   .add("improvement_surcharge", StringType)
    //   .add("lpep_dropoff_datetime", StringType)
    //   .add("lpep_pickup_datetime", StringType)
    //   .add("mta_tax", StringType)
    //   .add("passenger_count", StringType)
    //   .add("payment_type", StringType)
    //   .add("pickup_latitude", StringType)
    //   .add("pickup_longitude", StringType)
    //   .add("ratecodeid", StringType)
    //   .add("store_and_fwd_flag", StringType)
    //   .add("tip_amount", StringType)
    //   .add("tolls_amount", StringType)
    //   .add("total_amount", StringType)
    //   .add("trip_distance", StringType)
    //   .add("trip_type", StringType)
    //   .add("vendorid", StringType)
    
    println("Schema declared")
    
  10. データを選択してストリーミングを開始します。Select data and start the stream. 次のコマンドは、バッチ クエリを使用して Kafka からデータを取得してから、その結果を Spark クラスター上の HDFS に書き出す方法を示しています。The following command demonstrates how to retrieve data from kafka using a batch query, and then write the results out to HDFS on the Spark cluster. この例では、select が Kafka からメッセージ (値フィールド) を取得し、そのメッセージにスキーマを適用します。In this example, the select retrieves the message (value field) from Kafka and applies the schema to it. その後、データは parquet 形式で HDFS (WASB または ADL) に書き込まれます。The data is then written to HDFS (WASB or ADL) in parquet format. 次の Jupyter セルにコマンドを入力します。Enter the command in your next Jupyter cell.

    // Read a batch from Kafka
    val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data and write to file
    val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
    
    println("Wrote data to file")
    
  11. 次の Jupyter セルにコマンドを入力することで、ファイルが作成されたことを確認できます。You can verify that the files were created by entering the command in your next Jupyter cell. /example/batchtripdata ディレクトリ内のファイルが一覧表示されます。It lists the files in the /example/batchtripdata directory.

    %%bash
    hdfs dfs -ls /example/batchtripdata
    
  12. 前の例ではバッチ クエリが使用されていましたが、以下のコマンドでは、ストリーミング クエリを使用して同じ処理を行う方法を示します。While the previous example used a batch query, the following command demonstrates how to do the same thing using a streaming query. 次の Jupyter セルにコマンドを入力します。Enter the command in your next Jupyter cell.

    // Stream from Kafka
    val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data from the stream and write to file
    kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
    println("Wrote data to file")
    
  13. 次のセルを実行して、ファイルがストリーミング クエリによって書き込まれたことを確認します。Run the following cell to verify that the files were written by the streaming query.

    %%bash
    hdfs dfs -ls /example/streamingtripdata
    

リソースのクリーンアップClean up resources

このチュートリアルで作成したリソースをクリーンアップするために、リソース グループを削除することができます。To clean up the resources created by this tutorial, you can delete the resource group. リソース グループを削除すると、関連付けられている HDInsight クラスター、およびリソース グループに関連付けられているその他のリソースも削除されます。Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

Azure Portal を使用してリソース グループを削除するには:To remove the resource group using the Azure portal:

  1. Azure Portal で左側のメニューを展開してサービスのメニューを開き、 [リソース グループ] を選択して、リソース グループの一覧を表示します。In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. 削除するリソース グループを見つけて、一覧の右側にある [詳細] ボタン ([...]) を右クリックします。Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. [リソース グループの削除] を選択し、確認します。Select Delete resource group, and then confirm.

警告

HDInsight クラスターの課金は、クラスターが作成されると開始し、クラスターが削除されると停止します。HDInsight cluster billing starts once a cluster is created and stops when the cluster is deleted. 課金は分単位なので、クラスターを使わなくなったら必ず削除してください。Billing is pro-rated per minute, so you should always delete your cluster when it is no longer in use.

HDInsight クラスター上の Kafka を削除すると、Kafka に格納されているすべてのデータが削除されます。Deleting a Kafka on HDInsight cluster deletes any data stored in Kafka.

次の手順Next steps

このチュートリアルでは、Apache Spark 構造化ストリーミングを使用して、HDInsight の Apache Kafka からデータを読み書きする方法を説明しました。In this tutorial, you learned how to use Apache Spark Structured Streaming to write and read data from Apache Kafka on HDInsight. Kafka で Apache Storm を使用する方法については、次のリンクを参照してください。Use the following link to learn how to use Apache Storm with Kafka.