チュートリアル: HDInsight 上の Apache Kafka で Apache Storm を使用するTutorial: Use Apache Storm with Apache Kafka on HDInsight

このチュートリアルでは、Apache Storm トポロジを使用して、HDInsight の Apache Kafka でデータを読み書きする方法について説明します。This tutorial demonstrates how to use an Apache Storm topology to read and write data with Apache Kafka on HDInsight. また、Storm クラスター上の Apache Hadoop HDFS 互換ストレージにデータを保持する方法についても説明します。This tutorial also demonstrates how to persist data to the Apache Hadoop HDFS compatible storage on the Storm cluster.

このチュートリアルでは、以下の内容を学習します。In this tutorial, you learn how to:

  • Storm と KafkaStorm and Kafka
  • コードについてUnderstanding the code
  • Kafka クラスターと Storm クラスターの作成Create Kafka and Storm clusters
  • トポロジの作成Build the topology
  • トポロジの構成Configure the topology
  • Kafka トピックの作成Create the Kafka topic
  • トポロジの開始Start the topologies
  • トポロジの停止Stop the topologies
  • リソースのクリーンアップClean up resources

前提条件Prerequisites

開発用ワークステーションに Java と JDK をインストールするときに、次のような環境変数が設定される場合があります。The following environment variables may be set when you install Java and the JDK on your development workstation. ただし、これらが存在するかどうかや、システムに対して適切な値が含まれているかを確認する必要があります。However, you should check that they exist and that they contain the correct values for your system.

  • JAVA_HOME - JDK がインストールされているディレクトリを指している必要があります。JAVA_HOME - should point to the directory where the JDK is installed.

  • PATH - 次のパスを含む必要があります。PATH - should contain the following paths:

    • JAVA_HOME (または同等のパス)。JAVA_HOME (or the equivalent path).
    • JAVA_HOME\bin (または同等のパス)。JAVA_HOME\bin (or the equivalent path).
    • Maven がインストールされているディレクトリ。The directory where Maven is installed.

重要

このドキュメントの手順には、HDInsight の Storm クラスターと HDInsight の Kafka クラスターの両方を含む Azure リソース グループが必要です。The steps in this document require an Azure resource group that contains both a Storm on HDInsight and a Kafka on HDInsight cluster. これらのクラスターは両方とも、Strom クラスターが Kafka クラスターと直接通信できるように、Azure 仮想ネットワーク内に配置します。These clusters are both located within an Azure Virtual Network, which allows the Storm cluster to directly communicate with the Kafka cluster.

利便性のために、このドキュメントは、必要なすべての Azure リソースを作成できるテンプレートにリンクしています。For your convenience, this document links to a template that can create all the required Azure resources.

仮想ネットワークでの HDInsight の使用方法の詳細については、HDInsight 用の仮想ネットワークの計画に関するドキュメントを参照してください。For more information on using HDInsight in a virtual network, see the Plan a virtual network for HDInsight document.

Storm と KafkaStorm and Kafka

Apache Storm には、Apache Kafka を操作するためのコンポーネントがいくつか用意されています。Apache Storm provides the several components for working with Apache Kafka. このチュートリアルでは、次のコンポーネントを使用します。The following components are used in this tutorial:

  • org.apache.storm.kafka.KafkaSpout:このコンポーネントは Kafka からデータを読み取ります。org.apache.storm.kafka.KafkaSpout: This component reads data from Kafka. このコンポーネントは、次のコンポーネントに依存します。This component relies on the following components:

    • org.apache.storm.kafka.SpoutConfig:スパウト コンポーネントの構成を提供します。org.apache.storm.kafka.SpoutConfig: Provides configuration for the spout component.

    • org.apache.storm.spout.SchemeAsMultiSchemeorg.apache.storm.kafka.StringScheme: Kafka のデータを Storm のタプルに変換します。org.apache.storm.spout.SchemeAsMultiScheme and org.apache.storm.kafka.StringScheme: How the data from Kafka is transformed into a Storm tuple.

  • org.apache.storm.kafka.bolt.KafkaBolt:このコンポーネントはデータを Kafka に書き込みます。org.apache.storm.kafka.bolt.KafkaBolt: This component writes data to Kafka. このコンポーネントは、次のコンポーネントに依存します。This component relies on the following components:

    • org.apache.storm.kafka.bolt.selector.DefaultTopicSelector:書き込みの対象となるトピックを記述します。org.apache.storm.kafka.bolt.selector.DefaultTopicSelector: Describes the topic that is written to.

    • org.apache.kafka.common.serialization.StringSerializer:データを文字列値としてシリアル化するようにボルトを構成します。org.apache.kafka.common.serialization.StringSerializer: Configures the bolt to serialize data as a string value.

    • org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper:Storm トポロジ内で使用されるタプル データ構造から、Kafka に格納されたフィールドにマップします。org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper: Maps from the tuple data structure used inside the Storm topology to fields stored in Kafka.

これらのコンポーネントは、org.apache.storm : storm-kafka パッケージで提供されます。These components are available in the org.apache.storm : storm-kafka package. Storm のバージョンに適合するパッケージ バージョンを使用してください。Use the package version that matches the Storm version. HDInsight 3.6 の場合、Storm のバージョンは 1.1.0 です。For HDInsight 3.6, the Storm version is 1.1.0. その他の Kafka コンポーネントが含まれた org.apache.kafka : kafka_2.10 パッケージも必要です。You also need the org.apache.kafka : kafka_2.10 package, which contains additional Kafka components. Kafka のバージョンに適合するパッケージ バージョンを使用してください。Use the package version that matches the Kafka version. HDInsight 3.6 の場合、Kafka のバージョンは 1.1.1 です。For HDInsight 3.6, the Kafka version is 1.1.1.

次の XML は、Apache Maven プロジェクトの pom.xml 内の依存関係宣言を示しています。The following XML is the dependency declaration in the pom.xml for an Apache Maven project:

<!-- Storm components for talking to Kafka -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.1.0</version>
</dependency>
<!-- needs to be the same Kafka version as used on your cluster -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>1.1.1</version>
    <!-- Exclude components that are loaded from the Storm cluster at runtime -->
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

コードについてUnderstanding the code

このドキュメントで使用するコードは、https://github.com/Azure-Samples/hdinsight-storm-java-kafka で入手できます。The code used in this document is available at https://github.com/Azure-Samples/hdinsight-storm-java-kafka.

このチュートリアルでは、次の 2 つのトポロジが提供されます。There are two topologies provided with this tutorial:

  • Kafka-writer: ランダムな文を生成し、それらを Kafka に格納します。Kafka-writer: Generates random sentences and stores them to Kafka.

  • Kafka-reader: Kafka からデータを読み取り、Storm クラスターの HDFS 互換ファイル ストアに格納します。Kafka-reader: Reads data from Kafka and then stores it to the HDFS compatible file store for the Storm cluster.

    警告

    HDInsight で使用される HDFS 互換ストレージを Storm で操作できるようにするには、スクリプト アクションが必要です。To enable the Storm to work with the HDFS compatible storage used by HDInsight, a script action is required. このスクリプトでは、Storm の extlib パスに複数の jar ファイルをインストールします。The script installs several jar files to the extlib path for Storm. このチュートリアルのテンプレートでは、クラスターの作成時に、このスクリプトが自動的に使用されます。The template in this tutorial automatically uses the script during cluster creation.

    Storm クラスターの作成にこのドキュメントのテンプレートを使用しない場合は、スクリプト アクションをクラスターに手動で適用する必要があります。If you do not use the template in this document to create the Storm cluster, then you must manually apply the script action to your cluster.

    このスクリプト アクションは https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh にあり、Storm クラスターのスーパーバイザー ノードと Nimbus ノードに適用されます。The script action is located at https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh and is applied to the supervisor and nimbus nodes of the Storm cluster. スクリプト アクションの使用方法の詳細については、スクリプト アクションを使用した HDInsight のカスタマイズに関するドキュメントをご覧ください。For more information on using script actions, see the Customize HDInsight using script actions document.

これらのトポロジは、Flux で定義されています。The topologies are defined using Flux. Flux は Storm 0.10.x で導入されており、これによりトポロジ構成とコードを切り離すことができます。Flux was introduced in Storm 0.10.x and allows you to separate the topology configuration from the code. Flux フレームワークを使用するトポロジの場合、YAML ファイルでトポロジを定義します。For Topologies that use the Flux framework, the topology is defined in a YAML file. YAML ファイルはトポロジの一部として含めることができるほか、The YAML file can be included as part of the topology. トポロジを送信するときに使用するスタンドアロン ファイルにもなります。It can also be a standalone file used when you submit the topology. Flux では実行時の変数代入もサポートされており、以下の例ではこれを使用しています。Flux also supports variable substitution at run-time, which is used in this example.

これらのトポロジの実行時に、次のパラメーターが設定されます。The following parameters are set at run time for these topologies:

  • ${kafka.topic}:トポロジの読み取り/書き込みの対象になる Kafka トピックの名前。${kafka.topic}: The name of the Kafka topic that the topologies read/write to.

  • ${kafka.broker.hosts}:Kafka ブローカーが実行されるホスト。${kafka.broker.hosts}: The hosts that the Kafka brokers run on. ブローカー情報は、KafkaBolt が Kafka への書き込み時に使用します。The broker information is used by the KafkaBolt when writing to Kafka.

  • ${kafka.zookeeper.hosts}:Kafka クラスター上で Zookeeper が実行されるホスト。${kafka.zookeeper.hosts}: The hosts that Zookeeper runs on in the Kafka cluster.

  • ${hdfs.url}:HDFSBolt コンポーネントのファイル システム URL。${hdfs.url}: The file system URL for the HDFSBolt component. Azure Storage アカウントと Azure Data Lake Storage のどちらにデータを書き込むかを示します。Indicates whether the data is written to an Azure Storage account or Azure Data Lake Storage.

  • ${hdfs.write.dir}:データの書き込み先のディレクトリ。${hdfs.write.dir}: The directory that data is written to.

Flux トポロジの詳細については、https://storm.apache.org/releases/current/flux.html を参照してください。For more information on Flux topologies, see https://storm.apache.org/releases/current/flux.html.

Kafka-writerKafka-writer

Kafka-writer トポロジでは、Kafka ボルト コンポーネントがパラメーターとして 2 つの文字列値を受け取ります。In the Kafka-writer topology, the Kafka bolt component takes two string values as parameters. これらのパラメーターは、ボルトから キー 値および メッセージ 値として Kafka に送信するタプル フィールドを示します。These parameters indicate which tuple fields the bolt sends to Kafka as key and message values. キーは、Kafka でデータをパーティション分割するために使用されます。The key is used to partition data in Kafka. メッセージは格納されるデータです。The message is the data being stored.

この例では、com.microsoft.example.SentenceSpout コンポーネントは、keymessage の 2 つのフィールドを含むタプルを生成します。In this example, the com.microsoft.example.SentenceSpout component emits a tuple that contains two fields, key and message. Kafka ボルトはこれらのフィールドを抽出し、フィールド内のデータを Kafka に送信します。The Kafka bolt extracts these fields and sends the data in them to Kafka.

これらのフィールドに、key および message という名前を使用する必要はありません。The fields don't have to use the names key and message. マッピングをわかりやすくするために、このプロジェクトではこれらの名前を使用しています。These names are used in this project to make the mapping easier to understand.

次の YAML は、Kafka-writer コンポーネントの定義を示しています。The following YAML is the definition for the Kafka-writer component:

# kafka-writer
---

# topology definition
# name to be used when submitting
name: "kafka-writer"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Topic selector for KafkaBolt
  - id: "topicSelector"
    className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "${kafka.topic}"

  # Mapper for KafkaBolt
  - id: "kafkaMapper"
    className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
    constructorArgs:
      - "key"
      - "message"

  # Producer properties for KafkaBolt
  - id: "producerProperties"
    className: "java.util.Properties"
    configMethods:
      - name: "put"
        args:
          - "bootstrap.servers"
          - "${kafka.broker.hosts}"
      - name: "put"
        args:
          - "acks"
          - "1"
      - name: "put"
        args:
          - "key.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
      - name: "put"
        args:
          - "value.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
 

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "sentence-spout"
    className: "com.microsoft.example.SentenceSpout"
    parallelism: 8

# Bolt definitions
bolts:
  - id: "kafka-bolt"
    className: "org.apache.storm.kafka.bolt.KafkaBolt"
    parallelism: 8
    configMethods:
    - name: "withProducerProperties"
      args: [ref: "producerProperties"]
    - name: "withTopicSelector"
      args: [ref: "topicSelector"]
    - name: "withTupleToKafkaMapper"
      args: [ref: "kafkaMapper"]

# Stream definitions

streams:
  - name: "spout --> kafka" # Streams data from the sentence spout to the Kafka bolt
    from: "sentence-spout"
    to: "kafka-bolt"
    grouping:
      type: SHUFFLE

Kafka-readerKafka-reader

Kafka-reader トポロジでは、スパウト コンポーネントが文字列値として Kafka からデータを読み取ります。In the Kafka-reader topology, the spout component reads data from Kafka as string values. その後、データは、ログ コンポーネントによって Storm ログに書き込まれ、HDFS ボルト コンポーネントによって Storm クラスターの HDFS 互換ファイル システムに書き込まれます。The data is then written the Storm log by the logging component and to the HDFS compatible file system for the Storm cluster by the HDFS bolt component.

# kafka-reader
---

# topology definition
# name to be used when submitting
name: "kafka-reader"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Convert data from Kafka into string tuples in storm
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"
  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "${kafka.zookeeper.hosts}"

  # Spout configuration
  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "zkHosts"
      # topic
      - "${kafka.topic}"
      # zkRoot
      - ""
      # id
      - "readerid"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

    # How often to sync files to HDFS; every 1000 tuples.
  - id: "syncPolicy"
    className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
    constructorArgs:
      - 1

  # Rotate files when they hit 5 MB
  - id: "rotationPolicy"
    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
    constructorArgs:
      - 5
      - "KB"

  # File format; read the directory from filters at run time, and use a .txt extension when writing.
  - id: "fileNameFormat"
    className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
    configMethods:
      - name: "withPath"
        args: ["${hdfs.write.dir}"]
      - name: "withExtension"
        args: [".txt"]

  # Internal file format; fields delimited by `|`.
  - id: "recordFormat"
    className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
    configMethods:
      - name: "withFieldDelimiter"
        args: ["|"]

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "kafka-spout"
    className: "org.apache.storm.kafka.KafkaSpout"
    constructorArgs:
      - ref: "spoutConfig"
    # Set to the number of partitions for the topic
    parallelism: 8

# Bolt definitions
bolts:
  - id: "logger-bolt"
    className: "com.microsoft.example.LoggerBolt"
    parallelism: 1
  
  - id: "hdfs-bolt"
    className: "org.apache.storm.hdfs.bolt.HdfsBolt"
    configMethods:
      - name: "withConfigKey"
        args: ["hdfs.config"]
      - name: "withFsUrl"
        args: ["${hdfs.url}"]
      - name: "withFileNameFormat"
        args: [ref: "fileNameFormat"]
      - name: "withRecordFormat"
        args: [ref: "recordFormat"]
      - name: "withRotationPolicy"
        args: [ref: "rotationPolicy"]
      - name: "withSyncPolicy"
        args: [ref: "syncPolicy"]
    parallelism: 1

# Stream definitions

streams:
  # Stream data to log
  - name: "kafka --> log" # name isn't used (placeholder for logging, UI, etc.)
    from: "kafka-spout"
    to: "logger-bolt"
    grouping:
      type: SHUFFLE
  
  # stream data to file
  - name: "kafka --> hdfs"
    from: "kafka-spout"
    to: "hdfs-bolt"
    grouping:
      type: SHUFFLE

プロパティ置換Property substitutions

プロジェクトには、トポロジで使用されるパラメーターを渡す際に使用する dev.properties という名前のファイルが含まれています。The project contains a file named dev.properties that is used to pass parameters used by the topologies. 次のプロパティが定義されます。It defines the following properties:

dev.properties ファイルdev.properties file 説明Description
kafka.zookeeper.hosts Kafka クラスターの Apache ZooKeeper ホスト。The Apache ZooKeeper hosts for the Kafka cluster.
kafka.broker.hosts Kafka ブローカー ホスト (ワーカー ノード)。The Kafka broker hosts (worker nodes).
kafka.topic トポロジで使用される Kafka トピック。The Kafka topic that the topologies use.
hdfs.write.dir Kafka-reader トポロジによる書き込み先のディレクトリ。The directory that the Kafka-reader topology writes to.
hdfs.url Storm クラスターで使用されるファイル システム。The file system used by the Storm cluster. Azure ストレージ アカウントの場合、値として wasb:// を使用します。For Azure Storage accounts, use a value of wasb://. Azure Data Lake Storage Gen2 の場合、値として abfs:// を使用します。For Azure Data Lake Storage Gen2, use a value of abfs://. Azure Data Lake Storage Gen1 の場合、値として adl:// を使用します。For Azure Data Lake Storage Gen1, use a value of adl://.

クラスターの作成Create the clusters

HDInsight の Apache Kafka では、パブリック インターネットを介した Kafka ブローカーへのアクセスは提供されていません。Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Kafka を使用するものは、すべて同じ Azure 仮想ネットワークに属している必要があります。Anything that uses Kafka must be in the same Azure virtual network. このチュートリアルでは、Kafka クラスターと Storm クラスターを同じ Azure 仮想ネットワーク内に配置します。In this tutorial, both the Kafka and Storm clusters are located in the same Azure virtual network.

次の図に、Storm と Kafka 間の通信フローを示します。The following diagram shows how communication flows between Storm and Kafka:

Azure 仮想ネットワークにおける Strom クラスターと Kafka クラスターの図

注意

SSH や Apache Ambari など、クラスター上の他のサービスは、インターネット経由でアクセスできます。Other services on the cluster such as SSH and Apache Ambari can be accessed over the internet. HDInsight で使用できるパブリック ポートの詳細については、「HDInsight で使用されるポートと URI」を参照してください。For more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

Azure 仮想ネットワークを作成し、その仮想ネットワーク内に Kafka クラスターと Storm クラスターを作成するには、次の手順に従います。To create an Azure Virtual Network, and then create the Kafka and Storm clusters within it, use the following steps:

  1. 次のボタンを使用して Azure にサインインし、Azure Portal でテンプレートを開きます。Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure button for new cluster

    Azure Resource Manager テンプレートは、 https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json にあります。The Azure Resource Manager template is located at https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json. このテンプレートは次のリソースを作成します。It creates the following resources:

    • Azure リソース グループAzure resource group
    • Azure Virtual NetworkAzure Virtual Network
    • Azure ストレージ アカウントAzure Storage account
    • HDInsight バージョン 3.6 上の Kafka (3 ワーカー ノード)Kafka on HDInsight version 3.6 (three worker nodes)
    • HDInsight バージョン 3.6 上の Storm (3 ワーカー ノード)Storm on HDInsight version 3.6 (three worker nodes)

    警告

    HDInsight で Kafka の可用性を保証するには、クラスターに少なくとも 3 つのワーカー ノードが必要です。To guarantee availability of Kafka on HDInsight, your cluster must contain at least three worker nodes. このテンプレートは、3 つのワーカー ノードが含まれる Kafka クラスターを作成します。This template creates a Kafka cluster that contains three worker nodes.

  2. 次のガイダンスに従って、 [カスタム デプロイ] セクションの各エントリに入力します。Use the following guidance to populate the entries on the Custom deployment section:

    1. 次の情報に従って、 [カスタマイズされたテンプレート] セクションの各エントリに入力します。Use the following information to populate the entries on the Customized template section:

      SettingSetting Value
      SubscriptionSubscription お使いの Azure サブスクリプションYour Azure subscription
      Resource groupResource group リソースが含まれるリソース グループ。The resource group that contains the resources.
      LocationLocation リソースが作成される Azure リージョン。The Azure region that the resources are created in.
      [Kafka Cluster Name](Kafka クラスター名)Kafka Cluster Name Kafka クラスターの名前。The name of the Kafka cluster.
      [Storm Cluster Name](Storm クラスター名)Storm Cluster Name Storm クラスターの名前。The name of the Storm cluster.
      [Cluster Login User Name](クラスター ログイン ユーザー名)Cluster Login User Name クラスターの管理者ユーザー名。The admin user name for the clusters.
      [クラスター ログイン パスワード]Cluster Login Password クラスターの管理者ユーザー パスワード。The admin user password for the clusters.
      [SSH ユーザー名]SSH User Name クラスター用に作成する SSH ユーザー。The SSH user to create for the clusters.
      [SSH パスワード]SSH Password SSH ユーザーのパスワード。The password for the SSH user.

      テンプレート パラメーターの画像

  3. 使用条件を読み、 [上記の使用条件に同意する] をオンにします。Read the Terms and Conditions, and then select I agree to the terms and conditions stated above.

  4. 最後に、 [ダッシュボードにピン留めする] をオンにし、 [購入] をクリックします。Finally, check Pin to dashboard and then select Purchase.

注意

クラスターの作成には最大で 20 分かかります。It can take up to 20 minutes to create the clusters.

トポロジの作成Build the topology

  1. 開発環境で、https://github.com/Azure-Samples/hdinsight-storm-java-kafka からプロジェクトをダウンロードし、コマンドラインを開いてプロジェクトをダウンロードした場所にディレクトリを変更します。On your development environment, download the project from https://github.com/Azure-Samples/hdinsight-storm-java-kafka, open a command-line, and change directories to the location that you downloaded the project.

  2. hdinsight-storm-java-kafka ディレクトリで次のコマンドを使用して、プロジェクトをコンパイルしデプロイ用のパッケージを作成します。From the hdinsight-storm-java-kafka directory, use the following command to compile the project and create a package for deployment:

    mvn clean package
    

    このパッケージ処理では、target ディレクトリに KafkaTopology-1.0-SNAPSHOT.jar という名前のファイルが作成されます。The package process creates a file named KafkaTopology-1.0-SNAPSHOT.jar in the target directory.

  3. 次のコマンドを使用して、HDInsight の Storm クラスターにパッケージをコピーします。Use the following commands to copy the package to your Storm on HDInsight cluster. sshuser をクラスターの SSH ユーザー名に置き換えます。Replace sshuser with the SSH user name for the cluster. stormclusternameStorm クラスターの名前に置き換えます。Replace stormclustername with the name of the Storm cluster.

    scp ./target/KafkaTopology-1.0-SNAPSHOT.jar sshuser@stormclustername-ssh.azurehdinsight.net:KafkaTopology-1.0-SNAPSHOT.jar
    

    メッセージが表示されたら、クラスターの作成時に使用したパスワードを入力します。When prompted, enter the password you used when creating the clusters.

トポロジの構成Configure the topology

  1. 次のいずれかの方法を使用して、HDInsight クラスター上の Kafka の Kafka Broker ホストを検出します。Use one of the following methods to discover the Kafka broker hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER" `
        -Credential $creds `
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $brokerHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($brokerHosts -join ":9092,") + ":9092"
    

    重要

    次の Bash の例では、$CLUSTERNAMEKafka クラスターの名前が含まれていることを前提とします。The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster name. また、jq バージョン 1.5 以降がインストールされているものとします。It also assumes that jq version 1.5 or greater is installed. プロンプトが表示されたら、クラスターのログイン アカウントのパスワードを入力してください。When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2
    

    次のテキストのような値が返されます。The value returned is similar to the following text:

    wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
    

    重要

    クラスターに 2 つ以上のブローカー ホストがある場合でも、すべてのホストの完全な一覧をクライアントに提供する必要はありません。While there may be more than two broker hosts for your cluster, you do not need to provide a full list of all hosts to clients. 1 つまたは 2 つで十分です。One or two is enough.

  2. 次のいずれかのを使用して、HDInsight クラスタ上の Kafka の Kafka Broker ホストを検出します。Use one of the following methods to discover the Zookeeper hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" `
        -Credential $creds `
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $zookeeperHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($zookeeperHosts -join ":2181,") + ":2181"
    

    重要

    次の Bash の例では、$CLUSTERNAMEKafka クラスターが含まれていることを前提とします。The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster. および jq がインストールされいていることを前提としています。It also assumes that jq is installed. プロンプトが表示されたら、クラスターのログイン アカウントのパスワードを入力してください。When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2
    

    次のテキストのような値が返されます。The value returned is similar to the following text:

     zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
    

    重要

    クラスターに 2 つ以上の Zookeeper ホストがある場合でも、すべてのホストの完全な一覧をクライアントに提供する必要はありません。While there are more than two Zookeeper nodes, you do not need to provide a full list of all hosts to clients. 1 つまたは 2 つで十分です。One or two is enough.

    この値を保存します。これは後で使用します。Save this value, as it is used later.

  3. プロジェクトのルートにある dev.properties ファイルを編集します。Edit the dev.properties file in the root of the project. このファイルの該当する行に、Kafka クラスターの Broker ホストと Zookeeper ホストの情報を追加します。Add the Broker and Zookeeper hosts information for the Kafka cluster to the matching lines in this file. 次の例は、前の手順のサンプルの値を使用して構成されています。The following example is configured using the sample values from the previous steps:

     kafka.zookeeper.hosts: zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
     kafka.broker.hosts: wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
     kafka.topic: stormtopic
    

    重要

    hdfs.url エントリは、Azure ストレージ アカウントを使用するクラスター用に構成されています。The hdfs.url entry is configured for a cluster that uses an Azure Storage account. Data Lake Storage を使用する Storm クラスターでこのトポロジを使用するには、この値を wasb から adl に変更します。To use this topology with a Storm cluster that uses Data Lake Storage, change this value from wasb to adl.

  4. dev.properties ファイルを保存し、次のコマンドを使用して Storm クラスターにアップロードします。Save the dev.properties file and then use the following command to upload it to the Storm cluster:

    scp dev.properties USERNAME@BASENAME-ssh.azurehdinsight.net:dev.properties
    

    USERNAME は、クラスターの SSH ユーザー名に置き換えます。Replace USERNAME with the SSH user name for the cluster. BASENAME は、クラスターの作成時に使用したベース名に置き換えます。Replace BASENAME with the base name you used when creating the cluster.

Kafka トピックの作成Create the Kafka topic

Kafka では、"トピック" にデータが格納されます。Kafka stores data into a topic. Storm トポロジを開始する前に、トピックを作成する必要があります。You must create the topic before starting the Storm topologies. トポロジを作成するには、次の手順に従います。To create the topology, use the following steps:

  1. 次のコマンドを使用して、SSH 経由で Kafka クラスターに接続します。Connect to the Kafka cluster through SSH by using the following command. sshuser は、クラスターの作成時に使用した SSH ユーザー名に置き換えます。Replace sshuser with the SSH user name used when creating the cluster. kafkaclustername を Kafka クラスターの名前に置き換えます。Replace kafkaclustername with the name of the Kafka cluster:

    ssh sshuser@kafkaclustername-ssh.azurehdinsight.net
    

    メッセージが表示されたら、クラスターの作成時に使用したパスワードを入力します。When prompted, enter the password you used when creating the clusters.

    詳細については、HDInsight での SSH の使用に関するページを参照してください。For information, see Use SSH with HDInsight.

  2. Kafka トピックを作成するには、次のコマンドを使用します。To create the Kafka topic, use the following command. $KAFKAZKHOSTS を、トポロジの構成時に使用した Zookeeper ホスト情報に置き換えます。Replace $KAFKAZKHOSTS with the Zookeeper host information you used when configuring the topology:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stormtopic --zookeeper $KAFKAZKHOSTS
    

    このコマンドでは、Kafka クラスターの Zookeeper に接続し、stormtopic という名前の新しいトピックを作成します。This command connects to Zookeeper for the Kafka cluster and creates a new topic named stormtopic. このトピックが Storm トポロジで使用されます。This topic is used by the Storm topologies.

ライターの起動Start the writer

  1. 次のコマンドを使用して、SSH 経由で Storm クラスターに接続します。Use the following to connect to the Storm cluster using SSH. sshuser は、クラスターの作成時に使用した SSH ユーザー名に置き換えます。Replace sshuser with the SSH user name used when creating the cluster. stormclustername を Storm クラスターの名前に置き換えます。Replace stormclustername with the name the Storm cluster:

    ssh sshuser@stormclustername-ssh.azurehdinsight.net
    

    メッセージが表示されたら、クラスターの作成時に使用したパスワードを入力します。When prompted, enter the password you used when creating the clusters.

    詳細については、HDInsight での SSH の使用に関するページを参照してください。For information, see Use SSH with HDInsight.

  2. Storm クラスターに SSH で接続してから、次のコマンドを使用してライター トポロジを起動します。From the SSH connection to the Storm cluster, use the following command to start the writer topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writer.yaml --filter dev.properties
    

    このコマンドで使用されているパラメーターの意味は次のとおりです。The parameters used with this command are:

    • org.apache.storm.flux.Flux:Flux を使用してこのトポロジを構成および実行します。org.apache.storm.flux.Flux: Use Flux to configure and run this topology.

    • --remote:トポロジを Nimbus に送信します。--remote: Submit the topology to Nimbus. トポロジは、クラスター内のワーカー ノード全体に配布されます。The topology is distributed across the worker nodes in the cluster.

    • -R /writer.yaml:writer.yaml ファイルを使用して、トポロジを構成します。-R /writer.yaml: Use the writer.yaml file to configure the topology. -R は、このリソースが jar ファイル内に含まれていることを指示しています。-R indicates that this resource is included in the jar file. リソースは jar のルートにあるため、/writer.yaml がリソースへのパスになります。It's in the root of the jar, so /writer.yaml is the path to it.

    • --filter:dev.properties ファイルの値を使用して、writer.yaml トポロジのエントリを作成します。--filter: Populate entries in the writer.yaml topology using values in the dev.properties file. たとえば、ファイル内の kafka.topic エントリの値を使用して、トポロジの定義内の ${kafka.topic} エントリを置き換えます。For example, the value of the kafka.topic entry in the file is used to replace the ${kafka.topic} entry in the topology definition.

リーダーの起動Start the reader

  1. Storm クラスターへの SSH セッションで、次のコマンドを使用してリーダー トポロジを起動します。From the SSH session to the Storm cluster, use the following command to start the reader topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml --filter dev.properties
    
  2. 少し待ってから、次のコマンドを使用して、リーダー トポロジによって作成されたファイルを表示します。Wait a minute and then use the following command to view the files created by the reader topology:

    hdfs dfs -ls /stormdata
    

    出力は次のテキストのようになります。The output is similar to the following text:

     Found 173 items
     -rw-r--r--   1 storm supergroup       5137 2018-04-09 19:00 /stormdata/hdfs-bolt-4-0-1523300453088.txt
     -rw-r--r--   1 storm supergroup       5128 2018-04-09 19:00 /stormdata/hdfs-bolt-4-1-1523300453624.txt
     -rw-r--r--   1 storm supergroup       5131 2018-04-09 19:00 /stormdata/hdfs-bolt-4-10-1523300455170.txt
     ...
    
  3. ファイルの内容を表示するには、次のコマンドを使用します。To view the contents of the file, use the following command. filename.txt をファイル名に置き換えます。Replace filename.txt with the name of a file:

    hdfs dfs -cat /stormdata/filename.txt
    

    次のテキストはファイルの内容の一例です。The following text is an example of the file contents:

     four score and seven years ago
     snow white and the seven dwarfs
     i am at two with nature
     snow white and the seven dwarfs
     i am at two with nature
     four score and seven years ago
     an apple a day keeps the doctor away
    

トポロジの停止Stop the topologies

Storm クラスターへの SSH セッションで、次のコマンドを使用して Storm の各トポロジを停止します。From an SSH session to the Storm cluster, use the following commands to stop the Storm topologies:

storm kill kafka-writer
storm kill kafka-reader

リソースのクリーンアップClean up resources

このチュートリアルで作成したリソースをクリーンアップするために、リソース グループを削除することができます。To clean up the resources created by this tutorial, you can delete the resource group. リソース グループを削除すると、関連付けられている HDInsight クラスター、およびリソース グループに関連付けられているその他のリソースも削除されます。Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

Azure Portal を使用してリソース グループを削除するには:To remove the resource group using the Azure portal:

  1. Azure Portal で左側のメニューを展開してサービスのメニューを開き、 [リソース グループ] を選択して、リソース グループの一覧を表示します。In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. 削除するリソース グループを見つけて、一覧の右側にある [詳細] ボタン ([...]) を右クリックします。Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. [リソース グループの削除] を選択し、確認します。Select Delete resource group, and then confirm.

次の手順Next steps

このチュートリアルでは、Apache Storm トポロジを使用して、HDInsight の Apache Kafka に対して書き込み/読み取りを実行する方法を説明しました。In this tutorial, you learned how to use an Apache Storm topology to write to and read from Apache Kafka on HDInsight. また、HDInsight で使用される Apache Hadoop HDFS 互換ストレージにデータを格納する方法も説明しました。You also learned how to store data to the Apache Hadoop HDFS compatible storage used by HDInsight.