Tutorial: Verwenden von Apache Storm mit Apache Kafka in HDInsightTutorial: Use Apache Storm with Apache Kafka on HDInsight

Dieses Tutorial veranschaulicht die Verwendung einer Apache Storm-Topologie zum Lesen und Schreiben von Daten mit Apache Kafka in HDInsight.This tutorial demonstrates how to use an Apache Storm topology to read and write data with Apache Kafka on HDInsight. In diesem Tutorial wird außerdem gezeigt, wie Sie Daten im Storm-Cluster in dem Speicher dauerhaft speichern, der mit dem HDFS von Apache Hadoop kompatibel ist.This tutorial also demonstrates how to persist data to the Apache Hadoop HDFS compatible storage on the Storm cluster.

In diesem Tutorial lernen Sie Folgendes:In this tutorial, you learn how to:

  • Storm und KafkaStorm and Kafka
  • Grundlegendes zum CodeUnderstanding the code
  • Erstellen von Kafka- und Storm-ClusternCreate Kafka and Storm clusters
  • Erstellen der TopologieBuild the topology
  • Konfigurieren der TopologieConfigure the topology
  • Erstellen des Kafka-ThemasCreate the Kafka topic
  • Starten der TopologienStart the topologies
  • Beenden der TopologienStop the topologies
  • Bereinigen von RessourcenClean up resources

VoraussetzungenPrerequisites

Bei der Installation von Java und dem JDK auf Ihrer Entwicklungsworkstation können die folgenden Umgebungsvariablen festgelegt werden.The following environment variables may be set when you install Java and the JDK on your development workstation. Sie sollten dennoch prüfen, ob die Variablen vorhanden sind und korrekte Werte für Ihr System enthalten.However, you should check that they exist and that they contain the correct values for your system.

  • JAVA_HOME – sollte auf das Verzeichnis verweisen, in dem das JDK installiert ist.JAVA_HOME - should point to the directory where the JDK is installed.

  • PATH – sollte die folgenden Pfade enthalten:PATH - should contain the following paths:

    • JAVA_HOME (oder entsprechender Pfad)JAVA_HOME (or the equivalent path).
    • JAVA_HOME\bin (oder entsprechender Pfad)JAVA_HOME\bin (or the equivalent path).
    • Das Verzeichnis, in dem Maven installiert istThe directory where Maven is installed.

Wichtig

Für die in diesem Dokument beschriebenen Schritte wird eine Azure-Ressourcengruppe benötigt, die sowohl ein Cluster vom Typ „Storm in HDInsight“ als auch vom Typ „Kafka in HDInsight“ enthält.The steps in this document require an Azure resource group that contains both a Storm on HDInsight and a Kafka on HDInsight cluster. Diese Cluster befinden sich beide in einem virtuellen Azure-Netzwerk, damit der Storm-Cluster direkt mit dem Kafka-Cluster kommunizieren kann.These clusters are both located within an Azure Virtual Network, which allows the Storm cluster to directly communicate with the Kafka cluster.

Der Einfachheit halber enthält dieses Dokument Links zu einer Vorlage, mit der die erforderlichen Azure-Ressourcen erstellt werden können.For your convenience, this document links to a template that can create all the required Azure resources.

Weitere Informationen zur Verwendung von HDInsight in einem virtuellen Netzwerk finden Sie im Dokument Erweitern von HDInsight mit einem virtuellen Netzwerk.For more information on using HDInsight in a virtual network, see the Extend HDInsight using a virtual network document.

Storm und KafkaStorm and Kafka

Apache Storm stellt mehrere Komponenten für die Arbeit mit Apache Kafka bereit.Apache Storm provides the several components for working with Apache Kafka. In diesem Tutorial werden die folgenden Komponenten verwendet:The following components are used in this tutorial:

  • org.apache.storm.kafka.KafkaSpout: Diese Komponente liest Daten aus Kafka.org.apache.storm.kafka.KafkaSpout: This component reads data from Kafka. Diese Komponente benötigt die folgenden Komponenten:This component relies on the following components:

    • org.apache.storm.kafka.SpoutConfig: Stellt die Konfiguration für die Spout-Komponente bereit.org.apache.storm.kafka.SpoutConfig: Provides configuration for the spout component.

    • org.apache.storm.spout.SchemeAsMultiScheme und org.apache.storm.kafka.StringScheme: Legen fest, wie die Daten aus Kafka in ein Storm-Tupel transformiert werden.org.apache.storm.spout.SchemeAsMultiScheme and org.apache.storm.kafka.StringScheme: How the data from Kafka is transformed into a Storm tuple.

  • org.apache.storm.kafka.bolt.KafkaBolt: Diese Komponente schreibt Daten in Kafka.org.apache.storm.kafka.bolt.KafkaBolt: This component writes data to Kafka. Diese Komponente benötigt die folgenden Komponenten:This component relies on the following components:

    • org.apache.storm.kafka.bolt.selector.DefaultTopicSelector: Beschreibt das Thema, in das geschrieben wird.org.apache.storm.kafka.bolt.selector.DefaultTopicSelector: Describes the topic that is written to.

    • org.apache.kafka.common.serialization.StringSerializer: Konfiguriert den Bolt zum Serialisieren von Daten als Zeichenfolge.org.apache.kafka.common.serialization.StringSerializer: Configures the bolt to serialize data as a string value.

    • org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper: Ordnet die Tupel-Datenstruktur, die in der Storm-Topologie verwendet wird, den in Kafka gespeicherten Feldern zu.org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper: Maps from the tuple data structure used inside the Storm topology to fields stored in Kafka.

Diese Komponenten sind im Paket org.apache.storm : storm-kafka enthalten.These components are available in the org.apache.storm : storm-kafka package. Verwenden Sie die Paketversion, die Ihrer Storm-Version entspricht.Use the package version that matches the Storm version. Für HDInsight 3.6 lautet die Storm-Version 1.1.0.For HDInsight 3.6, the Storm version is 1.1.0. Sie benötigen auch das Paket org.apache.kafka : kafka_2.10, das zusätzliche Kafka-Komponenten enthält.You also need the org.apache.kafka : kafka_2.10 package, which contains additional Kafka components. Verwenden Sie die Paketversion, die Ihrer Kafka-Version entspricht.Use the package version that matches the Kafka version. Für HDInsight 3.6 lautet die Kafka-Version 1.1.1.For HDInsight 3.6, the Kafka version is 1.1.1.

Der folgende XML-Code stellt die Abhängigkeitsdeklaration in pom.xml für ein Apache Maven-Projekt dar:The following XML is the dependency declaration in the pom.xml for an Apache Maven project:

<!-- Storm components for talking to Kafka -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.1.0</version>
</dependency>
<!-- needs to be the same Kafka version as used on your cluster -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>1.1.1</version>
    <!-- Exclude components that are loaded from the Storm cluster at runtime -->
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Grundlegendes zum CodeUnderstanding the code

Den in diesem Dokument verwendeten Code finden Sie unter https://github.com/Azure-Samples/hdinsight-storm-java-kafka.The code used in this document is available at https://github.com/Azure-Samples/hdinsight-storm-java-kafka.

In diesem Tutorial werden zwei Topologien bereitgestellt:There are two topologies provided with this tutorial:

  • Kafka-writer: Generiert zufällige Sätze und speichert sie in Kafka.Kafka-writer: Generates random sentences and stores them to Kafka.

  • Kafka-reader: Liest Daten aus Kafka und speichert sie im HDFS-kompatiblen Dateispeicher für den Storm-Cluster.Kafka-reader: Reads data from Kafka and then stores it to the HDFS compatible file store for the Storm cluster.

    Warnung

    Damit Storm mit dem von HDInsight verwendeten HDFS-kompatiblen Speicher arbeiten kann, ist eine Skriptaktion erforderlich.To enable the Storm to work with the HDFS compatible storage used by HDInsight, a script action is required. Das Skript installiert mehrere JAR-Dateien im Pfad extlib für Storm.The script installs several jar files to the extlib path for Storm. Die Vorlage in diesem Tutorial verwendet dieses Skript während der Clustererstellung automatisch.The template in this tutorial automatically uses the script during cluster creation.

    Wenn Sie nicht die Vorlage in diesem Dokument verwenden, um den Storm-Cluster zu erstellen, müssen Sie die Skriptaktion manuell in Ihrem Cluster anwenden.If you do not use the template in this document to create the Storm cluster, then you must manually apply the script action to your cluster.

    Die Skriptaktion befindet sich unter https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh und wird auf die übergeordneten und Nimbus-Knoten des Storm-Clusters angewandt.The script action is located at https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh and is applied to the supervisor and nimbus nodes of the Storm cluster. Weitere Informationen zur Verwendung von Skriptaktionen finden Sie im Dokument Anpassen von HDInsight für die Verwendung von Skriptaktionen.For more information on using script actions, see the Customize HDInsight using script actions document.

Die Topologien werden mit Flux definiert.The topologies are defined using Flux. Flux wurde mit Storm 0.10.x eingeführt und ermöglicht Ihnen die Trennung der Topologiekonfiguration vom Code.Flux was introduced in Storm 0.10.x and allows you to separate the topology configuration from the code. Für Topologien, die das Flux-Framework nutzen, wird die Topologie in einer YAML-Datei definiert.For Topologies that use the Flux framework, the topology is defined in a YAML file. Die YAML-Datei kann als Teil der Topologie aufgenommen werden.The YAML file can be included as part of the topology. Sie kann auch eine eigenständige Datei sein, die beim Übermitteln der Topologie verwendet wird.It can also be a standalone file used when you submit the topology. Außerdem unterstützt Flux die Variablenersetzung zur Laufzeit, die in diesem Beispiel verwendet wird.Flux also supports variable substitution at run-time, which is used in this example.

Zur Laufzeit werden die folgenden Parameter für diese Topologien festgelegt:The following parameters are set at run time for these topologies:

  • ${kafka.topic}: Der Name des Kafka-Themas, für das die Topologien die Lese-/Schreibvorgänge durchführen.${kafka.topic}: The name of the Kafka topic that the topologies read/write to.

  • ${kafka.broker.hosts}: Die Hosts, auf denen die Kafka-Broker ausgeführt werden.${kafka.broker.hosts}: The hosts that the Kafka brokers run on. Die Brokerinformationen werden von der KafkaBolt-Komponente verwendet, wenn auf Kafka geschrieben wird.The broker information is used by the KafkaBolt when writing to Kafka.

  • ${kafka.zookeeper.hosts}: Die Hosts, auf denen Zookeeper im Kafka-Cluster ausgeführt wird.${kafka.zookeeper.hosts}: The hosts that Zookeeper runs on in the Kafka cluster.

  • ${hdfs.url}: Die Dateisystem-URL für die HDFSBolt-Komponente.${hdfs.url}: The file system URL for the HDFSBolt component. Sie gibt an, ob die Daten in ein Azure Storage-Konto oder in Azure Data Lake Storage geschrieben werden.Indicates whether the data is written to an Azure Storage account or Azure Data Lake Storage.

  • ${hdfs.write.dir}: Das Verzeichnis, in das die Daten geschrieben werden.${hdfs.write.dir}: The directory that data is written to.

Weitere Informationen zu Flux-Topologien finden Sie unter https://storm.apache.org/releases/1.1.2/flux.html.For more information on Flux topologies, see https://storm.apache.org/releases/1.1.2/flux.html.

Kafka-writerKafka-writer

In der Topologie von Kafka-writer akzeptiert die Kafka-Bolt-Komponente zwei Zeichenfolgenwerte als Parameter.In the Kafka-writer topology, the Kafka bolt component takes two string values as parameters. Diese Parameter geben an, welche Tupelfelder der Bolt an Kafka als die Werte key und message sendet.These parameters indicate which tuple fields the bolt sends to Kafka as key and message values. Der Schlüssel („key“) wird zum Partitionieren von Daten in Kafka verwendet.The key is used to partition data in Kafka. Die Nachricht („message“) enthält die zu speichernden Daten.The message is the data being stored.

In diesem Beispiel gibt die com.microsoft.example.SentenceSpout-Komponente ein Tupel aus, das die beiden Felder key und message enthält.In this example, the com.microsoft.example.SentenceSpout component emits a tuple that contains two fields, key and message. Der Kafka-Bolt extrahiert diese Felder und sendet die darin enthaltenen Daten an Kafka.The Kafka bolt extracts these fields and sends the data in them to Kafka.

Die Felder müssen nicht die Namen key und message verwenden.The fields don't have to use the names key and message. Diese Namen werden in diesem Projekt verwendet, um die Zuordnung einfacher verständlich zu machen.These names are used in this project to make the mapping easier to understand.

Der folgende YAML-Code stellt die Definition für die Kafka-writer-Komponente dar:The following YAML is the definition for the Kafka-writer component:

# kafka-writer
---

# topology definition
# name to be used when submitting
name: "kafka-writer"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Topic selector for KafkaBolt
  - id: "topicSelector"
    className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "${kafka.topic}"

  # Mapper for KafkaBolt
  - id: "kafkaMapper"
    className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
    constructorArgs:
      - "key"
      - "message"

  # Producer properties for KafkaBolt
  - id: "producerProperties"
    className: "java.util.Properties"
    configMethods:
      - name: "put"
        args:
          - "bootstrap.servers"
          - "${kafka.broker.hosts}"
      - name: "put"
        args:
          - "acks"
          - "1"
      - name: "put"
        args:
          - "key.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
      - name: "put"
        args:
          - "value.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
 

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "sentence-spout"
    className: "com.microsoft.example.SentenceSpout"
    parallelism: 8

# Bolt definitions
bolts:
  - id: "kafka-bolt"
    className: "org.apache.storm.kafka.bolt.KafkaBolt"
    parallelism: 8
    configMethods:
    - name: "withProducerProperties"
      args: [ref: "producerProperties"]
    - name: "withTopicSelector"
      args: [ref: "topicSelector"]
    - name: "withTupleToKafkaMapper"
      args: [ref: "kafkaMapper"]

# Stream definitions

streams:
  - name: "spout --> kafka" # Streams data from the sentence spout to the Kafka bolt
    from: "sentence-spout"
    to: "kafka-bolt"
    grouping:
      type: SHUFFLE

Kafka-readerKafka-reader

In der Topologie von Kafka-reader liest die Spout-Komponente die Daten aus Kafka als Zeichenfolgenwerte.In the Kafka-reader topology, the spout component reads data from Kafka as string values. Die Daten werden dann von der Protokollierungskomponente in das Storm-Protokoll und von der HDFS-Bolt-Komponente in das HDFS-kompatible Dateisystem für den Storm-Cluster geschrieben.The data is then written the Storm log by the logging component and to the HDFS compatible file system for the Storm cluster by the HDFS bolt component.

# kafka-reader
---

# topology definition
# name to be used when submitting
name: "kafka-reader"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Convert data from Kafka into string tuples in storm
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"
  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "${kafka.zookeeper.hosts}"

  # Spout configuration
  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "zkHosts"
      # topic
      - "${kafka.topic}"
      # zkRoot
      - ""
      # id
      - "readerid"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

    # How often to sync files to HDFS; every 1000 tuples.
  - id: "syncPolicy"
    className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
    constructorArgs:
      - 1

  # Rotate files when they hit 5 MB
  - id: "rotationPolicy"
    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
    constructorArgs:
      - 5
      - "KB"

  # File format; read the directory from filters at run time, and use a .txt extension when writing.
  - id: "fileNameFormat"
    className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
    configMethods:
      - name: "withPath"
        args: ["${hdfs.write.dir}"]
      - name: "withExtension"
        args: [".txt"]

  # Internal file format; fields delimited by `|`.
  - id: "recordFormat"
    className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
    configMethods:
      - name: "withFieldDelimiter"
        args: ["|"]

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "kafka-spout"
    className: "org.apache.storm.kafka.KafkaSpout"
    constructorArgs:
      - ref: "spoutConfig"
    # Set to the number of partitions for the topic
    parallelism: 8

# Bolt definitions
bolts:
  - id: "logger-bolt"
    className: "com.microsoft.example.LoggerBolt"
    parallelism: 1
  
  - id: "hdfs-bolt"
    className: "org.apache.storm.hdfs.bolt.HdfsBolt"
    configMethods:
      - name: "withConfigKey"
        args: ["hdfs.config"]
      - name: "withFsUrl"
        args: ["${hdfs.url}"]
      - name: "withFileNameFormat"
        args: [ref: "fileNameFormat"]
      - name: "withRecordFormat"
        args: [ref: "recordFormat"]
      - name: "withRotationPolicy"
        args: [ref: "rotationPolicy"]
      - name: "withSyncPolicy"
        args: [ref: "syncPolicy"]
    parallelism: 1

# Stream definitions

streams:
  # Stream data to log
  - name: "kafka --> log" # name isn't used (placeholder for logging, UI, etc.)
    from: "kafka-spout"
    to: "logger-bolt"
    grouping:
      type: SHUFFLE
  
  # stream data to file
  - name: "kafka --> hdfs"
    from: "kafka-spout"
    to: "hdfs-bolt"
    grouping:
      type: SHUFFLE

EigenschaftenersetzungenProperty substitutions

Das Projekt enthält eine Datei namens dev.properties, die zum Übergeben der durch die Topologien verwendeten Parameter dient.The project contains a file named dev.properties that is used to pass parameters used by the topologies. In ihr werden die folgenden Eigenschaften definiert:It defines the following properties:

Datei „dev.properties“dev.properties file BESCHREIBUNGDescription
kafka.zookeeper.hosts Die Apache ZooKeeper-Hosts für den Kafka-ClusterThe Apache ZooKeeper hosts for the Kafka cluster.
kafka.broker.hosts Die Kafka-Brokerhosts (Workerknoten)The Kafka broker hosts (worker nodes).
kafka.topic Das von den Topologien verwendete Kafka-ThemaThe Kafka topic that the topologies use.
hdfs.write.dir Das Verzeichnis, in das die Kafka-reader-Topologie schreibtThe directory that the Kafka-reader topology writes to.
hdfs.url Das vom Storm-Cluster verwendete DateisystemThe file system used by the Storm cluster. Verwenden Sie für Azure Storage-Konten den Wert wasb:///.For Azure Storage accounts, use a value of wasb:///. Verwenden Sie für Azure Data Lake Storage Gen2 den Wert abfs:///.For Azure Data Lake Storage Gen2, use a value of abfs:///. Verwenden Sie für Azure Data Lake Storage Gen1 den Wert adl:///.For Azure Data Lake Storage Gen1, use a value of adl:///.

Erstellen von ClusternCreate the clusters

Apache Kafka in HDInsight ermöglicht keinen Zugriff auf die Kafka-Broker über das öffentliche Internet.Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Alle von Kafka verwendeten Elemente müssen sich im gleichen virtuellen Azure-Netzwerk befinden.Anything that uses Kafka must be in the same Azure virtual network. In diesem Tutorial befinden sich der Kafka- und der Storm-Cluster im selben virtuellen Azure-Netzwerk.In this tutorial, both the Kafka and Storm clusters are located in the same Azure virtual network.

Das folgende Diagramm zeigt den Kommunikationsfluss zwischen Storm und Kafka:The following diagram shows how communication flows between Storm and Kafka:

Diagramm mit Storm- und Kafka-Clustern in einem virtuellen Azure-Netzwerk

Hinweis

Auf andere Dienste im Cluster, beispielsweise SSH und Apache Ambari, kann über das Internet zugegriffen werden.Other services on the cluster such as SSH and Apache Ambari can be accessed over the internet. Weitere Informationen zu den öffentlichen Ports, die für HDInsight verfügbar sind, finden Sie unter Von HDInsight verwendete Ports und URIs.For more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

Führen Sie zum Erstellen eines virtuellen Azure-Netzwerks und zum anschließenden Erstellen der Kafka- und Storm-Cluster in diesem die folgenden Schritte aus:To create an Azure Virtual Network, and then create the Kafka and Storm clusters within it, use the following steps:

  1. Verwenden Sie die folgende Schaltfläche, um sich bei Azure anzumelden, und öffnen Sie die Vorlage im Azure-Portal.Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure

    Die Azure Resource Manager-Vorlage finden Sie unter https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json .The Azure Resource Manager template is located at https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json. Er erstellt die folgenden Ressourcen:It creates the following resources:

    • Azure-RessourcengruppeAzure resource group
    • Virtuelles Azure-NetzwerkAzure Virtual Network
    • Azure-SpeicherkontoAzure Storage account
    • Kafka in HDInsight Version 3.6 (drei Workerknoten)Kafka on HDInsight version 3.6 (three worker nodes)
    • Storm in HDInsight Version 3.6 (drei Workerknoten)Storm on HDInsight version 3.6 (three worker nodes)

    Warnung

    Um die Verfügbarkeit von Kafka in HDInsight zu gewährleisten, muss der Cluster mindestens drei Workerknoten enthalten.To guarantee availability of Kafka on HDInsight, your cluster must contain at least three worker nodes. Diese Vorlage erstellt einen Kafka-Cluster, der drei Workerknoten enthält.This template creates a Kafka cluster that contains three worker nodes.

  2. Befolgen Sie die nachstehende Anleitung, um die Einträge im Abschnitt Benutzerdefinierte Bereitstellung aufzufüllen:Use the following guidance to populate the entries on the Custom deployment section:

    1. Geben Sie im Abschnitt Benutzerdefinierte Vorlage folgende Informationen an:Use the following information to populate the entries on the Customized template section:

      EinstellungSetting WertValue
      AbonnementSubscription Ihr Azure-AbonnementYour Azure subscription
      RessourcengruppeResource group Die Ressourcengruppe mit den Ressourcen.The resource group that contains the resources.
      LocationLocation Die Azure-Region, in der die Ressourcen erstellt werden.The Azure region that the resources are created in.
      Kafka Cluster Name (Kafka-Clustername)Kafka Cluster Name Der Name des Kafka-Clusters.The name of the Kafka cluster.
      Storm Cluster Name (Storm-Clustername)Storm Cluster Name Der Name des Storm-Clusters.The name of the Storm cluster.
      Benutzername für ClusteranmeldungCluster Login User Name Der Administratorbenutzername für die Cluster.The admin user name for the clusters.
      Kennwort für ClusteranmeldungCluster Login Password Das Administratorbenutzerkennwort für die Cluster.The admin user password for the clusters.
      SSH-BenutzernameSSH User Name Der SSH-Benutzer, der für die Cluster erstellt werden soll.The SSH user to create for the clusters.
      SSH-KennwortSSH Password Das Kennwort für den SSH-Benutzer.The password for the SSH user.

      Darstellung der Vorlagenparameter

  3. Lesen Sie die Geschäftsbedingungen, und wählen Sie anschließend die Option Ich stimme den oben genannten Geschäftsbedingungen zu.Read the Terms and Conditions, and then select I agree to the terms and conditions stated above.

  4. Aktivieren Sie zum Schluss An Dashboard anheften, und wählen Sie dann Kaufen aus.Finally, check Pin to dashboard and then select Purchase.

Hinweis

Die Clustererstellung kann bis zu 20 Minuten dauern.It can take up to 20 minutes to create the clusters.

Erstellen der TopologieBuild the topology

  1. Laden Sie das Projekt in Ihrer Entwicklungsumgebung unter https://github.com/Azure-Samples/hdinsight-storm-java-kafka herunter, öffnen Sie eine Befehlszeile, und ändern Sie die Verzeichnisse am Speicherort, in den Sie das Projekt heruntergeladen haben.On your development environment, download the project from https://github.com/Azure-Samples/hdinsight-storm-java-kafka, open a command-line, and change directories to the location that you downloaded the project.

  2. Verwenden Sie im Verzeichnis hdinsight-storm-java-kafka den folgenden Befehl, um das Projekt zu kompilieren und ein Paket für die Bereitstellung zu erstellen:From the hdinsight-storm-java-kafka directory, use the following command to compile the project and create a package for deployment:

    mvn clean package
    

    Mit dem Paketprozess wird eine Datei mit dem Namen KafkaTopology-1.0-SNAPSHOT.jar im Verzeichnis target erstellt.The package process creates a file named KafkaTopology-1.0-SNAPSHOT.jar in the target directory.

  3. Verwenden Sie die folgenden Befehle, um das Paket in Ihren Cluster vom Typ „Storm in HDInsight“ zu kopieren.Use the following commands to copy the package to your Storm on HDInsight cluster. Ersetzen Sie sshuser durch den Namen des SSH-Benutzers für den Cluster.Replace sshuser with the SSH user name for the cluster. Ersetzen Sie stormclustername durch den Namen des Storm-Clusters.Replace stormclustername with the name of the Storm cluster.

    scp ./target/KafkaTopology-1.0-SNAPSHOT.jar sshuser@stormclustername-ssh.azurehdinsight.net:KafkaTopology-1.0-SNAPSHOT.jar
    

    Geben Sie nach der entsprechenden Aufforderung das Kennwort ein, das Sie beim Erstellen der Cluster verwendet haben.When prompted, enter the password you used when creating the clusters.

Konfigurieren der TopologieConfigure the topology

  1. Verwenden Sie eine der folgenden Methoden, um die Kafka-Brokerhosts für den Kafka-Cluster in HDInsight zu ermitteln:Use one of the following methods to discover the Kafka broker hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER" `
        -Credential $creds `
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $brokerHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($brokerHosts -join ":9092,") + ":9092"
    

    Wichtig

    Im folgenden Bash-Beispiel wird angenommen, dass $CLUSTERNAME den Namen des Kafka-Clusters enthält.The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster name. Außerdem wird angenommen, dass mindestens Version 1.5 von jq installiert ist.It also assumes that jq version 1.5 or greater is installed. Geben Sie bei entsprechender Aufforderung das Kennwort des Anmeldekontos für den Cluster ein.When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2
    

    Der zurückgegebene Wert ähnelt dem folgenden Text:The value returned is similar to the following text:

     wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
    

    Wichtig

    Obwohl möglicherweise mehr als zwei Brokerhosts für den Cluster vorhanden sind, müssen Sie keine vollständige Liste aller Hosts für Clients angeben.While there may be more than two broker hosts for your cluster, you do not need to provide a full list of all hosts to clients. Ein oder zwei genügen.One or two is enough.

  2. Verwenden Sie eine der folgenden Methoden, um die Zookeeper-Brokerhosts für den Kafka-Cluster in HDInsight zu ermitteln:Use one of the following methods to discover the Zookeeper hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" `
        -Credential $creds `
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $zookeeperHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($zookeeperHosts -join ":2181,") + ":2181"
    

    Wichtig

    Im folgenden Bash-Beispiel wird angenommen, dass $CLUSTERNAME den Namen des Kafka-Clusters enthält.The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster. Außerdem wird angenommen, das jq installiert ist.It also assumes that jq is installed. Geben Sie bei entsprechender Aufforderung das Kennwort des Anmeldekontos für den Cluster ein.When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2
    

    Der zurückgegebene Wert ähnelt dem folgenden Text:The value returned is similar to the following text:

     zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
    

    Wichtig

    Obwohl mehr als zwei Zookeeper-Knoten vorhanden sind, müssen Sie keine vollständige Liste aller Hosts für Clients angeben.While there are more than two Zookeeper nodes, you do not need to provide a full list of all hosts to clients. Ein oder zwei genügen.One or two is enough.

    Speichern Sie diesen Wert, da er später wieder verwendet wird.Save this value, as it is used later.

  3. Bearbeiten Sie die Datei dev.properties im Stammverzeichnis des Projekts.Edit the dev.properties file in the root of the project. Fügen Sie die Broker- und Zookeeper-Hostinformationen für den Kafka-Cluster in die entsprechenden Zeilen in dieser Datei ein.Add the Broker and Zookeeper hosts information for the Kafka cluster to the matching lines in this file. Das folgende Beispiel wird mithilfe der Beispielwerte aus den vorherigen Schritten konfiguriert:The following example is configured using the sample values from the previous steps:

     kafka.zookeeper.hosts: zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
     kafka.broker.hosts: wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
     kafka.topic: stormtopic
    

    Wichtig

    Der hdfs.url-Eintrag wurde für einen Cluster konfiguriert, der ein Azure Storage-Konto verwendet.The hdfs.url entry is configured for a cluster that uses an Azure Storage account. Um diese Topologie mit einem Storm-Cluster zu verwenden, der Data Lake Storage verwendet, ändern Sie diesen Wert von wasb in adl.To use this topology with a Storm cluster that uses Data Lake Storage, change this value from wasb to adl.

  4. Speichern Sie die Datei dev.properties, und laden Sie sie anschließend mit dem folgenden Befehl in den Storm-Cluster hoch:Save the dev.properties file and then use the following command to upload it to the Storm cluster:

    scp dev.properties USERNAME@BASENAME-ssh.azurehdinsight.net:dev.properties
    

    Ersetzen Sie USERNAME durch den SSH-Benutzernamen für den Cluster.Replace USERNAME with the SSH user name for the cluster. Ersetzen Sie BASENAME durch den Basisnamen, den Sie beim Erstellen des Clusters verwendet haben.Replace BASENAME with the base name you used when creating the cluster.

Erstellen des Kafka-ThemasCreate the Kafka topic

Kafka speichert die Daten in einem Thema.Kafka stores data into a topic. Sie müssen das Thema erstellen, bevor Sie die Storm-Topologien starten.You must create the topic before starting the Storm topologies. Führen Sie die folgenden Schritte aus, um die Topologie zu erstellen:To create the topology, use the following steps:

  1. Stellen Sie per SSH mithilfe des folgenden Befehls eine Verbindung mit dem Kafka-Cluster her.Connect to the Kafka cluster through SSH by using the following command. Ersetzen Sie sshuser durch den SSH-Benutzernamen, der beim Erstellen des Clusters verwendet wurde.Replace sshuser with the SSH user name used when creating the cluster. Ersetzen Sie kafkaclustername durch den Namen des Kafka-Clusters:Replace kafkaclustername with the name of the Kafka cluster:

    ssh sshuser@kafkaclustername-ssh.azurehdinsight.net
    

    Geben Sie nach der entsprechenden Aufforderung das Kennwort ein, das Sie beim Erstellen der Cluster verwendet haben.When prompted, enter the password you used when creating the clusters.

    Informationen hierzu finden Sie unter Verwenden von SSH mit Linux-basiertem Hadoop in HDInsight unter Linux, Unix oder OS X.For information, see Use SSH with HDInsight.

  2. Um ein Kafka-Thema zu erstellen, verwenden Sie den folgenden Befehl.To create the Kafka topic, use the following command. Ersetzen Sie $KAFKAZKHOSTS durch die Informationen des ZooKeeper-Hosts, den Sie beim Konfigurieren der Topologie verwendet haben:Replace $KAFKAZKHOSTS with the Zookeeper host information you used when configuring the topology:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stormtopic --zookeeper $KAFKAZKHOSTS
    

    Dieser Befehl verbindet ZooKeeper für den Cluster Kafka und erstellt das neue Thema stormtopic.This command connects to Zookeeper for the Kafka cluster and creates a new topic named stormtopic. Dieses Thema wird von den Storm-Topologien verwendet.This topic is used by the Storm topologies.

Starten der Writer-TopologieStart the writer

  1. Verwenden Sie Folgendes, um per SSH eine Verbindung mit dem Storm-Cluster herzustellen.Use the following to connect to the Storm cluster using SSH. Ersetzen Sie sshuser durch den SSH-Benutzernamen, der beim Erstellen des Clusters verwendet wurde.Replace sshuser with the SSH user name used when creating the cluster. Ersetzen Sie stormclustername durch den Namen des Storm-Clusters:Replace stormclustername with the name the Storm cluster:

    ssh sshuser@stormclustername-ssh.azurehdinsight.net
    

    Geben Sie nach der entsprechenden Aufforderung das Kennwort ein, das Sie beim Erstellen der Cluster verwendet haben.When prompted, enter the password you used when creating the clusters.

    Informationen hierzu finden Sie unter Verwenden von SSH mit Linux-basiertem Hadoop in HDInsight unter Linux, Unix oder OS X.For information, see Use SSH with HDInsight.

  2. Verwenden Sie für die SSH-Verbindung mit dem Storm-Cluster den folgenden Befehl, um die Writer-Topologie zu starten:From the SSH connection to the Storm cluster, use the following command to start the writer topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writer.yaml --filter dev.properties
    

    Für diesen Befehl werden die folgenden Parameter verwendet:The parameters used with this command are:

    • org.apache.storm.flux.Flux: Flux wird verwendet, um diese Topologie zu konfigurieren und auszuführen.org.apache.storm.flux.Flux: Use Flux to configure and run this topology.

    • --remote: Die Topologie wird an Nimbus übermittelt.--remote: Submit the topology to Nimbus. Die Topologie wird auf die Workerknoten im Cluster verteilt.The topology is distributed across the worker nodes in the cluster.

    • -R /writer.yaml: Die Datei writer.yaml wird verwendet, um die Topologie zu konfigurieren.-R /writer.yaml: Use the writer.yaml file to configure the topology. -R gibt an, dass diese Ressource in der JAR-Datei enthalten ist.-R indicates that this resource is included in the jar file. Da sie im Stamm der JAR-Datei enthalten ist, lautet der Pfad /writer.yaml.It's in the root of the jar, so /writer.yaml is the path to it.

    • --filter: Einträge in der Topologie writer.yaml werden mit Werten in der Datei dev.properties aufgefüllt.--filter: Populate entries in the writer.yaml topology using values in the dev.properties file. Der Wert des Eintrags kafka.topic in der Datei wird beispielsweise verwendet, um den Eintrag ${kafka.topic} in der Topologiedefinition zu ersetzen.For example, the value of the kafka.topic entry in the file is used to replace the ${kafka.topic} entry in the topology definition.

Starten der Reader-TopologieStart the reader

  1. Verwenden Sie für die SSH-Sitzung mit dem Storm-Cluster den folgenden Befehl, um die Reader-Topologie zu starten:From the SSH session to the Storm cluster, use the following command to start the reader topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml --filter dev.properties
    
  2. Warten Sie eine Minute, und zeigen Sie dann mit dem folgenden Befehl die von der reader-Topologie erstellten Dateien an:Wait a minute and then use the following command to view the files created by the reader topology:

    hdfs dfs -ls /stormdata
    

    Die Ausgabe sieht in etwa wie folgender Text aus:The output is similar to the following text:

     Found 173 items
     -rw-r--r--   1 storm supergroup       5137 2018-04-09 19:00 /stormdata/hdfs-bolt-4-0-1523300453088.txt
     -rw-r--r--   1 storm supergroup       5128 2018-04-09 19:00 /stormdata/hdfs-bolt-4-1-1523300453624.txt
     -rw-r--r--   1 storm supergroup       5131 2018-04-09 19:00 /stormdata/hdfs-bolt-4-10-1523300455170.txt
     ...
    
  3. Sie können den Inhalt der Datei mit dem folgenden Befehl anzeigen.To view the contents of the file, use the following command. Ersetzen Sie filename.txt durch den Namen einer Datei:Replace filename.txt with the name of a file:

    hdfs dfs -cat /stormdata/filename.txt
    

    Der folgende Text ist ein Beispiel für den Dateiinhalt:The following text is an example of the file contents:

     four score and seven years ago
     snow white and the seven dwarfs
     i am at two with nature
     snow white and the seven dwarfs
     i am at two with nature
     four score and seven years ago
     an apple a day keeps the doctor away
    

Beenden der TopologienStop the topologies

Verwenden Sie in einer SSH-Sitzung für den Storm-Cluster die folgenden Befehle, um die Storm-Topologien zu beenden:From an SSH session to the Storm cluster, use the following commands to stop the Storm topologies:

storm kill kafka-writer
storm kill kafka-reader

Bereinigen von RessourcenClean up resources

Zum Bereinigen der im Rahmen dieses Tutorials erstellten Ressourcen können Sie die Ressourcengruppe löschen.To clean up the resources created by this tutorial, you can delete the resource group. Dadurch werden auch der zugeordnete HDInsight-Cluster sowie alle anderen Ressourcen gelöscht, die der Ressourcengruppe zugeordnet sind.Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

So entfernen Sie die Ressourcengruppe über das Azure-Portal:To remove the resource group using the Azure portal:

  1. Erweitern Sie im Azure-Portal das Menü auf der linken Seite, um das Menü mit den Diensten zu öffnen, und klicken Sie auf Ressourcengruppen, um die Liste mit Ihren Ressourcengruppen anzuzeigen.In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. Suchen Sie die zu löschende Ressourcengruppe, und klicken Sie mit der rechten Maustaste rechts neben dem Eintrag auf die Schaltfläche Mehr (...).Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. Klicken Sie auf Ressourcengruppe löschen, und bestätigen Sie den Vorgang.Select Delete resource group, and then confirm.

Warnung

Die Abrechnung für einen HDInsight-Cluster beginnt, sobald der Cluster erstellt wurde, und endet mit dem Löschen des Clusters.HDInsight cluster billing starts once a cluster is created and stops when the cluster is deleted. Die Gebühren werden anteilig nach Minuten erhoben. Daher sollten Sie Ihren Cluster immer löschen, wenn Sie ihn nicht mehr verwenden.Billing is pro-rated per minute, so you should always delete your cluster when it is no longer in use.

Wenn Sie einen Kafka-Cluster in HDInsight löschen, werden auch alle in Kafka gespeicherten Daten gelöscht.Deleting a Kafka on HDInsight cluster deletes any data stored in Kafka.

Nächste SchritteNext steps

In diesem Tutorial haben Sie gelernt, wie Sie eine Apache Storm-Topologie zum Schreiben und Lesen aus Apache Kafka in HDInsight verwenden.In this tutorial, you learned how to use an Apache Storm topology to write to and read from Apache Kafka on HDInsight. Außerdem haben Sie gelernt, Daten in dem von HDInsight verwendeten Speicher zu speichern, der mit dem HDFS von Apache Hadoop kompatibel ist.You also learned how to store data to the Apache Hadoop HDFS compatible storage used by HDInsight.

Weitere Informationen zum Verwenden von Kafka in HDInsight finden Sie unter Tutorial: Verwenden der Apache Kafka Producer- und Consumer-APIs.To learn more about using Kafka on HDInsight, see the Use Apache Kafka Producer and Consumer API document.

Informationen zur Bereitstellung und Überwachung von Topologien in Linux-basiertem HDInsight finden Sie unter Bereitstellen und Verwalten von Apache Storm-Topologien in Linux-basiertem HDInsight.For information on deploying and monitoring topologies on Linux-based HDInsight, see Deploy and manage Apache Storm topologies on Linux-based HDInsight