Esercitazione: Usare Apache Storm con Kafka in HDInsightTutorial: Use Apache Storm with Kafka on HDInsight

Questa esercitazione illustra come usare una topologia di Apache Storm per leggere e scrivere dati con Apache Kafka in HDInsight.This tutorial demonstrates how to use an Apache Storm topology to read and write data with Apache Kafka on HDInsight. Questa esercitazione illustra inoltre come rendere persistenti i dati nella risorsa di archiviazione compatibile con HDFS nel cluster Storm.This tutorial also demonstrates how to persist data to the HDFS-compatible storage on the Storm cluster.

In questa esercitazione si apprenderà come:In this tutorial, you learn how to:

  • Storm e KafkaStorm and Kafka
  • Informazioni sul codiceUnderstanding the code
  • Creare cluster Kafka e StormCreate Kafka and Storm clusters
  • Creare la topologiaBuild the topology
  • Configurare la topologiaConfigure the topology
  • Creare l'argomento KafkaCreate the Kafka topic
  • Avviare le topologieStart the topologies
  • Arrestare le topologieStop the topologies
  • Pulire le risorseClean up resources

prerequisitiPrerequisites

Le variabili di ambiente seguenti possono essere impostate quando si installa Java e l'JDK nella workstation di sviluppo.The following environment variables may be set when you install Java and the JDK on your development workstation. È tuttavia necessario verificare che esistano e che contengano i valori corretti per il sistema in uso.However, you should check that they exist and that they contain the correct values for your system.

  • JAVA_HOME - deve puntare alla directory dove è installato JDK.JAVA_HOME - should point to the directory where the JDK is installed.
  • PATH: deve contenere i percorsi seguenti:PATH - should contain the following paths:

    • JAVA_HOME o il percorso equivalente.JAVA_HOME (or the equivalent path).
    • JAVA_HOME\bin o il percorso equivalente.JAVA_HOME\bin (or the equivalent path).
    • Directory in cui è installato Maven.The directory where Maven is installed.

Importante

La procedura descritta in questo documento richiede l'uso di un gruppo di risorse di Azure che contiene sia un cluster Storm in HDInsight che un cluster Kafka in HDInsight.The steps in this document require an Azure resource group that contains both a Storm on HDInsight and a Kafka on HDInsight cluster. Entrambi questi cluster si trovano all'interno di una rete virtuale di Azure, che consente al cluster Storm di comunicare direttamente con il cluster Kafka.These clusters are both located within an Azure Virtual Network, which allows the Storm cluster to directly communicate with the Kafka cluster.

Per comodità, questo documento si collega a un modello in grado di creare tutte le risorse di Azure necessarie.For your convenience, this document links to a template that can create all the required Azure resources.

Per altre informazioni sull'uso di HDInsight in una rete virtuale, vedere il documento Estendere Azure HDInsight usando Rete virtuale di Azure.For more information on using HDInsight in a virtual network, see the Extend HDInsight using a virtual network document.

Storm e KafkaStorm and Kafka

Apache Storm include numerosi componenti da usare con Kafka.Apache Storm provides the several components for working with Kafka. In questa esercitazione vengono usati i componenti seguenti:The following components are used in this tutorial:

  • org.apache.storm.kafka.KafkaSpout: questo componente legge i dati da Kafka.org.apache.storm.kafka.KafkaSpout: This component reads data from Kafka. Il componente si basa sui componenti seguenti:This component relies on the following components:

    • org.apache.storm.kafka.SpoutConfig: include la configurazione per il componente spout.org.apache.storm.kafka.SpoutConfig: Provides configuration for the spout component.

    • org.apache.storm.spout.SchemeAsMultiScheme e org.apache.storm.kafka.StringScheme: modalità con cui i dati in Kafka vengono trasformati in una tupla Storm.org.apache.storm.spout.SchemeAsMultiScheme and org.apache.storm.kafka.StringScheme: How the data from Kafka is transformed into a Storm tuple.

  • org.apache.storm.kafka.bolt.KafkaBolt: questo componente scrive i dati in Kafka.org.apache.storm.kafka.bolt.KafkaBolt: This component writes data to Kafka. Il componente si basa sui componenti seguenti:This component relies on the following components:

    • org.apache.storm.kafka.bolt.selector.DefaultTopicSelector: descrive l'argomento in cui viene scritto.org.apache.storm.kafka.bolt.selector.DefaultTopicSelector: Describes the topic that is written to.

    • org.apache.kafka.common.serialization.StringSerializer: configura il bolt per serializzare i dati come valore stringa.org.apache.kafka.common.serialization.StringSerializer: Configures the bolt to serialize data as a string value.

    • org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper: esegue il mapping della struttura di dati della tupla usata all'interno della topologia Storm con i campi archiviati in Kafka.org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper: Maps from the tuple data structure used inside the Storm topology to fields stored in Kafka.

Questi componenti sono disponibili nel pacchetto di org.apache.storm : storm-kafka.These components are available in the org.apache.storm : storm-kafka package. Occorre usare la versione del pacchetto che corrisponde alla versione di Storm.Use the package version that matches the Storm version. Per HDInsight 3.6, la versione di Storm è 1.1.0.For HDInsight 3.6, the Storm version is 1.1.0. Occorre anche il pacchetto di org.apache.kafka : kafka_2.10, che contiene componenti Kafka aggiuntivi.You also need the org.apache.kafka : kafka_2.10 package, which contains additional Kafka components. Occorre usare la versione del pacchetto che corrisponde alla versione di Kafka.Use the package version that matches the Kafka version. Per HDInsight 3.6, la versione di Kafka è 0.10.0.0.For HDInsight 3.6, the Kafka version is 0.10.0.0.

Il codice XML seguente rappresenta la dichiarazione di dipendenza nel pom.xml per un progetto Maven:The following XML is the dependency declaration in the pom.xml for a Maven project:

<!-- Storm components for talking to Kafka -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.1.0</version>
</dependency>
<!-- needs to be the same Kafka version as used on your cluster -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.0.0</version>
    <!-- Exclude components that are loaded from the Storm cluster at runtime -->
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Informazioni sul codiceUnderstanding the code

Il codice usato in questo documento è disponibile all'indirizzo https://github.com/Azure-Samples/hdinsight-storm-java-kafka.The code used in this document is available at https://github.com/Azure-Samples/hdinsight-storm-java-kafka.

Questa esercitazione include due topologie:There are two topologies provided with this tutorial:

  • Kafka-writer: genera frasi casuali e le archivia in Kafka.Kafka-writer: Generates random sentences and stores them to Kafka.

  • Kafka-reader: legge i dati in Kafka e quindi li memorizza nell'archivio file compatibile con HDFS per il cluster Storm.Kafka-reader: Reads data from Kafka and then stores it to the HDFS compatible file store for the Storm cluster.

    Avviso

    Per consentire a Storm di usare la risorsa di archiviazione compatibile con HDFS usata da HDInsight, è necessaria un'azione script.To enable the Storm to work with the HDFS compatible storage used by HDInsight, a script action is required. Lo script installa vari file con estensione jar nel percorso extlib per Storm.The script installs several jar files to the extlib path for Storm. Il modello in questa esercitazione usa automaticamente lo script durante la creazione del cluster.The template in this tutorial automatically uses the script during cluster creation.

    Se non si usa il modello in questo documento per creare il cluster Storm, è necessario applicare manualmente l'azione script al cluster.If you do not use the template in this document to create the Storm cluster, then you must manually apply the script action to your cluster.

    L'azione script si trova in https://hdiconfigactions2.blob.core.windows.net/stormextlib/stormextlib.sh e viene applicata ai nodi supervisore e nimbus del cluster Storm.The script action is located at https://hdiconfigactions2.blob.core.windows.net/stormextlib/stormextlib.sh and is applied to the supervisor and nimbus nodes of the Storm cluster. Per altre informazioni sull'uso di azioni script, vedere il documento Customize HDInsight using script actions (Personalizzare HDInsight tramite azioni script).For more information on using script actions, see the Customize HDInsight using script actions document.

Le topologie vengono definite tramite Flux.The topologies are defined using Flux. Flux è stato introdotto in Storm 0.10.x e consente di separare la configurazione della topologia dal codice.Flux was introduced in Storm 0.10.x and allows you to separate the topology configuration from the code. Per le topologie che fanno uso del framework Flux, la topologia viene definita in un file YAML.For Topologies that use the Flux framework, the topology is defined in a YAML file. Il file YAML può essere incluso come parte della topologia.The YAML file can be included as part of the topology. Può essere anche un file autonomo usato quando si invia la topologia.It can also be a standalone file used when you submit the topology. Flux supporta anche la sostituzione delle variabili in fase di esecuzione, caratteristica che viene usata in questo esempio.Flux also supports variable substitution at run-time, which is used in this example.

I parametri seguenti vengono impostati in fase di esecuzione per queste topologie:The following parameters are set at run time for these topologies:

  • ${kafka.topic}: nome dell'argomento Kafka usato dalla topologie per la lettura/scrittura.${kafka.topic}: The name of the Kafka topic that the topologies read/write to.

  • ${kafka.broker.hosts}: host in cui vengono eseguiti i broker di Kafka.${kafka.broker.hosts}: The hosts that the Kafka brokers run on. Le informazioni sui broker vengono usate da KafkaBolt durante la scrittura in Kafka.The broker information is used by the KafkaBolt when writing to Kafka.

  • ${kafka.zookeeper.hosts}: host in cui viene eseguito Zookeeper nel cluster di Kafka.${kafka.zookeeper.hosts}: The hosts that Zookeeper runs on in the Kafka cluster.

  • ${hdfs.url}: URL del file system per il componente HDFSBolt.${hdfs.url}: The file system URL for the HDFSBolt component. Indica se i dati vengono scritti in un account di Archiviazione di Azure o in Azure Data Lake Store.Indicates whether the data is written to an Azure Storage account or Azure Data Lake Store.

  • ${hdfs.write.dir}: directory in cui vengono scritti i dati.${hdfs.write.dir}: The directory that data is written to.

Per altre informazioni sulle topologie di Flux, vedere https://storm.apache.org/releases/1.1.2/flux.html.For more information on Flux topologies, see https://storm.apache.org/releases/1.1.2/flux.html.

Kafka-writerKafka-writer

Nella topologia Kafka-writer il componente bolt di Kafka accetta due valori stringa come parametri.In the Kafka-writer topology, the Kafka bolt component takes two string values as parameters. Questi parametri indicano quali campi di tupla vengono inviati a Kafka dal bolt come valori di chiave e messaggio.These parameters indicate which tuple fields the bolt sends to Kafka as key and message values. La chiave viene usata per partizionare i dati in Kafka.The key is used to partition data in Kafka. Il messaggio corrisponde ai dati da archiviare.The message is the data being stored.

In questo esempio, il componente com.microsoft.example.SentenceSpout genera una tupla che contiene due campi, key e message.In this example, the com.microsoft.example.SentenceSpout component emits a tuple that contains two fields, key and message. Il bolt di Kafka estrae questi campi e ne invia i dati a Kafka.The Kafka bolt extracts these fields and sends the data in them to Kafka.

I campi non devono necessariamente chiamarsi key e message.The fields don't have to use the names key and message. Questi nomi vengono usati in questo progetto per facilitare la comprensione del mapping.These names are used in this project to make the mapping easier to understand.

Il codice YAML seguente è la definizione del componente Kafka-writer:The following YAML is the definition for the Kafka-writer component:

# kafka-writer
---

# topology definition
# name to be used when submitting
name: "kafka-writer"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Topic selector for KafkaBolt
  - id: "topicSelector"
    className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "${kafka.topic}"

  # Mapper for KafkaBolt
  - id: "kafkaMapper"
    className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
    constructorArgs:
      - "key"
      - "message"

  # Producer properties for KafkaBolt
  - id: "producerProperties"
    className: "java.util.Properties"
    configMethods:
      - name: "put"
        args:
          - "bootstrap.servers"
          - "${kafka.broker.hosts}"
      - name: "put"
        args:
          - "acks"
          - "1"
      - name: "put"
        args:
          - "key.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
      - name: "put"
        args:
          - "value.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"


# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "sentence-spout"
    className: "com.microsoft.example.SentenceSpout"
    parallelism: 8

# Bolt definitions
bolts:
  - id: "kafka-bolt"
    className: "org.apache.storm.kafka.bolt.KafkaBolt"
    parallelism: 8
    configMethods:
    - name: "withProducerProperties"
      args: [ref: "producerProperties"]
    - name: "withTopicSelector"
      args: [ref: "topicSelector"]
    - name: "withTupleToKafkaMapper"
      args: [ref: "kafkaMapper"]

# Stream definitions

streams:
  - name: "spout --> kafka" # Streams data from the sentence spout to the Kafka bolt
    from: "sentence-spout"
    to: "kafka-bolt"
    grouping:
      type: SHUFFLE

Kafka-readerKafka-reader

Nella topologia Kafka-reader il componente spout legge i dati di Kafka come valori stringa.In the Kafka-reader topology, the spout component reads data from Kafka as string values. I dati vengono quindi scritti nel log di Storm dal componente di registrazione e nel file system compatibile con HDFS per il cluster Storm dal componente bolt HDFS.The data is then written the Storm log by the logging component and to the HDFS compatible file system for the Storm cluster by the HDFS bolt component.

# kafka-reader
---

# topology definition
# name to be used when submitting
name: "kafka-reader"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Convert data from Kafka into string tuples in storm
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"
  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "${kafka.zookeeper.hosts}"

  # Spout configuration
  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "zkHosts"
      # topic
      - "${kafka.topic}"
      # zkRoot
      - ""
      # id
      - "readerid"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

    # How often to sync files to HDFS; every 1000 tuples.
  - id: "syncPolicy"
    className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
    constructorArgs:
      - 1

  # Rotate files when they hit 5 MB
  - id: "rotationPolicy"
    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
    constructorArgs:
      - 5
      - "KB"

  # File format; read the directory from filters at run time, and use a .txt extension when writing.
  - id: "fileNameFormat"
    className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
    configMethods:
      - name: "withPath"
        args: ["${hdfs.write.dir}"]
      - name: "withExtension"
        args: [".txt"]

  # Internal file format; fields delimited by `|`.
  - id: "recordFormat"
    className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
    configMethods:
      - name: "withFieldDelimiter"
        args: ["|"]

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "kafka-spout"
    className: "org.apache.storm.kafka.KafkaSpout"
    constructorArgs:
      - ref: "spoutConfig"
    # Set to the number of partitions for the topic
    parallelism: 8

# Bolt definitions
bolts:
  - id: "logger-bolt"
    className: "com.microsoft.example.LoggerBolt"
    parallelism: 1

  - id: "hdfs-bolt"
    className: "org.apache.storm.hdfs.bolt.HdfsBolt"
    configMethods:
      - name: "withConfigKey"
        args: ["hdfs.config"]
      - name: "withFsUrl"
        args: ["${hdfs.url}"]
      - name: "withFileNameFormat"
        args: [ref: "fileNameFormat"]
      - name: "withRecordFormat"
        args: [ref: "recordFormat"]
      - name: "withRotationPolicy"
        args: [ref: "rotationPolicy"]
      - name: "withSyncPolicy"
        args: [ref: "syncPolicy"]
    parallelism: 1

# Stream definitions

streams:
  # Stream data to log
  - name: "kafka --> log" # name isn't used (placeholder for logging, UI, etc.)
    from: "kafka-spout"
    to: "logger-bolt"
    grouping:
      type: SHUFFLE

  # stream data to file
  - name: "kafka --> hdfs"
    from: "kafka-spout"
    to: "hdfs-bolt"
    grouping:
      type: SHUFFLE

Sostituzioni di proprietàProperty substitutions

Il progetto contiene un file denominato dev.properties che viene usato per passare i parametri usati dalle topologie.The project contains a file named dev.properties that is used to pass parameters used by the topologies. Questo file definisce le proprietà seguenti:It defines the following properties:

File dev.propertiesdev.properties file DESCRIZIONEDescription
kafka.zookeeper.hosts Host Zookeeper per il cluster Kafka.The Zookeeper hosts for the Kafka cluster.
kafka.broker.hosts Host broker Kafka (nodi ruolo di lavoro).The Kafka broker hosts (worker nodes).
kafka.topic Argomento Kafka usato dalle topologie.The Kafka topic that the topologies use.
hdfs.write.dir Directory in cui scrive la topologia Kafka-reader.The directory that the Kafka-reader topology writes to.
hdfs.url File system usato dal cluster Storm.The file system used by the Storm cluster. Per gli account di Archiviazione di Azure, usare un valore di wasb:///.For Azure Storage accounts, use a value of wasb:///. Per Azure Data Lake Store, usare un valore di adl:///.For Azure Data Lake Store, use a value of adl:///.

Creare i clusterCreate the clusters

Apache Kafka in HDInsight non fornisce l'accesso ai broker Kafka tramite Internet pubblico.Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Qualunque elemento che usa Kafka deve trovarsi nella stessa rete virtuale di Azure.Anything that uses Kafka must be in the same Azure virtual network. In questa esercitazione, entrambi i cluster Kafka e Storm si trovano nella stessa rete virtuale di Azure.In this tutorial, both the Kafka and Storm clusters are located in the same Azure virtual network.

Il diagramma seguente illustra il flusso delle comunicazioni tra Storm e Kafka:The following diagram shows how communication flows between Storm and Kafka:

Diagramma dei cluster Storm e Kafka in una rete virtuale di Azure

Nota

Altri servizi nel cluster, ad esempio SSH e Ambari, sono accessibili tramite Internet.Other services on the cluster such as SSH and Ambari can be accessed over the internet. Per altre informazioni sulle porte pubbliche disponibili con HDInsight, vedere Porte e URI usati da HDInsight.For more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

Per creare una Rete virtuale di Microsoft Azure e quindi crearvi i cluster Kafka e Storm, procedere come segue:To create an Azure Virtual Network, and then create the Kafka and Storm clusters within it, use the following steps:

  1. Usare il pulsante seguente per accedere ad Azure e aprire il modello nel portale di Azure.Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure

    Il modello di Azure Resource Manager è disponibile in https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json.The Azure Resource Manager template is located at https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json. Crea le risorse seguenti:It creates the following resources:

    • Gruppo di risorse di AzureAzure resource group
    • Rete virtuale di AzureAzure Virtual Network
    • Account di archiviazione di AzureAzure Storage account
    • Kafka in HDInsight versione 3.6 con tre nodi di lavoroKafka on HDInsight version 3.6 (three worker nodes)
    • Storm in HDInsight versione 3.6 con tre nodi di lavoroStorm on HDInsight version 3.6 (three worker nodes)

    Avviso

    Per garantire la disponibilità di Kafka in HDInsight, il cluster deve contenere almeno tre nodi del ruolo di lavoro.To guarantee availability of Kafka on HDInsight, your cluster must contain at least three worker nodes. Questo modello crea un cluster Kafka contenente tre nodi di lavoro.This template creates a Kafka cluster that contains three worker nodes.

  2. Usare le linee guida seguenti per popolare le voci nella sezione Distribuzione personalizzata:Use the following guidance to populate the entries on the Custom deployment section:

    1. Usare le informazioni seguenti per popolare le voci nella sezione Modello personalizzato:Use the following information to populate the entries on the Customized template section:

      ImpostazioneSetting ValoreValue
      SottoscrizioneSubscription Sottoscrizione di AzureYour Azure subscription
      Gruppo di risorseResource group Gruppo di risorse che contiene le risorse.The resource group that contains the resources.
      LocalitàLocation Area di Azure in cui vengono create le risorse.The Azure region that the resources are created in.
      Nome del cluster KafkaKafka Cluster Name Nome del cluster Kafka.The name of the Kafka cluster.
      Nome del cluster StormStorm Cluster Name Nome del cluster Storm.The name of the Storm cluster.
      Nome utente dell'account di accesso del clusterCluster Login User Name Nome utente dell'amministratore per i cluster.The admin user name for the clusters.
      Password di accesso al clusterCluster Login Password Password dell'utente amministratore per i cluster.The admin user password for the clusters.
      Nome utente SSHSSH User Name Utente SSH da creare per i cluster.The SSH user to create for the clusters.
      Password SSHSSH Password Password per l'utente SSH.The password for the SSH user.

      Immagine dei parametri del modello

  3. Leggere le Condizioni e quindi selezionare Accetto le condizioni riportate sopra.Read the Terms and Conditions, and then select I agree to the terms and conditions stated above.

  4. Selezionare infine Aggiungi al dashboard e quindi Acquista.Finally, check Pin to dashboard and then select Purchase.

Nota

La creazione dei cluster può richiedere fino a 20 minuti.It can take up to 20 minutes to create the clusters.

Creare la topologiaBuild the topology

  1. Nell'ambiente di sviluppo scaricare il progetto dall'indirizzo https://github.com/Azure-Samples/hdinsight-storm-java-kafka, aprire una riga di comando e passare al percorso in cui è stato scaricato il progetto.On your development environment, download the project from https://github.com/Azure-Samples/hdinsight-storm-java-kafka, open a command-line, and change directories to the location that you downloaded the project.

  2. Dalla directory hdinsight-storm-java-kafka usare il comando seguente per compilare il progetto e creare un pacchetto per la distribuzione:From the hdinsight-storm-java-kafka directory, use the following command to compile the project and create a package for deployment:

    mvn clean package
    

    Il processo del pacchetto crea un file denominato KafkaTopology-1.0-SNAPSHOT.jar nella directory target.The package process creates a file named KafkaTopology-1.0-SNAPSHOT.jar in the target directory.

  3. Usare i comandi seguenti per copiare il pacchetto nel cluster Storm in HDInsight.Use the following commands to copy the package to your Storm on HDInsight cluster. Sostituire sshuser con il nome utente SSH per il cluster.Replace sshuser with the SSH user name for the cluster. Sostituire stormclustername con il nome del cluster Storm.Replace stormclustername with the name of the Storm cluster.

    scp ./target/KafkaTopology-1.0-SNAPSHOT.jar sshuser@stormclustername-ssh.azurehdinsight.net:KafkaTopology-1.0-SNAPSHOT.jar
    

    Quando richiesto, immettere la password usata durante la creazione del cluster.When prompted, enter the password you used when creating the clusters.

Configurare la topologiaConfigure the topology

  1. Usare uno dei metodi seguenti per individuare gli host del broker Kafka per il cluster Kafka in HDInsight:Use one of the following methods to discover the Kafka broker hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER" `
        -Credential $creds `
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $brokerHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($brokerHosts -join ":9092,") + ":9092"
    

    Importante

    L'esempio di Bash seguente presuppone che $CLUSTERNAME contenga il nome del cluster Kafka.The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster name. Presuppone anche che sia installato jq versione 1.5 o una versione successiva.It also assumes that jq version 1.5 or greater is installed. Quando richiesto, immettere la password dell'account di accesso al cluster.When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2
    

    Il valore restituito è simile al testo seguente:The value returned is similar to the following text:

     wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
    

    Importante

    Anche se possono essere presenti più di due host broker per il cluster, non è necessario fornire un elenco completo di tutti gli host ai client.While there may be more than two broker hosts for your cluster, you do not need to provide a full list of all hosts to clients. È sufficiente specificarne uno o due.One or two is enough.

  2. Usare uno dei metodi seguenti per individuare gli host Zookeeper per il cluster Kafka in HDInsight:Use one of the following methods to discover the Zookeeper hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" `
        -Credential $creds `
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $zookeeperHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($zookeeperHosts -join ":2181,") + ":2181"
    

    Importante

    L'esempio di Bash seguente presuppone che $CLUSTERNAME contenga il nome del cluster Kafka.The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster. Presuppone anche che jq sia installato.It also assumes that jq is installed. Quando richiesto, immettere la password dell'account di accesso al cluster.When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2
    

    Il valore restituito è simile al testo seguente:The value returned is similar to the following text:

     zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
    

    Importante

    Anche se sono presenti più di due nodi Zookeeper, non è necessario fornire un elenco completo di tutti gli host ai client.While there are more than two Zookeeper nodes, you do not need to provide a full list of all hosts to clients. È sufficiente specificarne uno o due.One or two is enough.

    Salvare questo valore, che verrà usato in un secondo momento.Save this value, as it is used later.

  3. Modificare il file dev.properties nella radice del progetto.Edit the dev.properties file in the root of the project. Aggiungere le informazioni degli host Broker e Zookeeper del cluster Kafka alle righe corrispondenti in questo file.Add the Broker and Zookeeper hosts information for the Kafka cluster to the matching lines in this file. Nell'esempio seguente viene configurato con i valori di esempio dei passaggi precedenti:The following example is configured using the sample values from the previous steps:

     kafka.zookeeper.hosts: zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
     kafka.broker.hosts: wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
     kafka.topic: stormtopic
    

    Importante

    La voce hdfs.url viene configurata per un cluster che usa un account di Archiviazione di Azure.The hdfs.url entry is configured for a cluster that uses an Azure Storage account. Per usare questa topologia con un cluster Storm che usa Data Lake Store, modificare questo valore da wasb in adl.To use this topology with a Storm cluster that uses Data Lake Store, change this value from wasb to adl.

  4. Salvare il file dev.properties e quindi usare il comando seguente per caricarlo nel cluster Storm:Save the dev.properties file and then use the following command to upload it to the Storm cluster:

    scp dev.properties USERNAME@storm-BASENAME-ssh.azurehdinsight.net:dev.properties
    

    Sostituire USERNAME con il nome utente SSH per il cluster.Replace USERNAME with the SSH user name for the cluster. Sostituire BASENAME con il nome di base usato durante la creazione del cluster.Replace BASENAME with the base name you used when creating the cluster.

Creare l'argomento KafkaCreate the Kafka topic

Kafka archivia i dati in un argomento.Kafka stores data into a topic. È necessario creare l'argomento prima di avviare le topologie Storm.You must create the topic before starting the Storm topologies. Per creare la topologia, seguire questa procedura:To create the topology, use the following steps:

  1. Connettersi al cluster Kafka tramite SSH usando il comando seguente.Connect to the Kafka cluster through SSH by using the following command. Sostituire sshuser con il nome utente SSH usato durante la creazione del cluster.Replace sshuser with the SSH user name used when creating the cluster. Sostituire kafkaclustername con il nome del cluster Kafka:Replace kafkaclustername with the name of the Kafka cluster:

    ssh sshuser@kafkaclustername-ssh.azurehdinsight.net
    

    Quando richiesto, immettere la password usata durante la creazione del cluster.When prompted, enter the password you used when creating the clusters.

    Per altre informazioni, vedere Usare SSH con HDInsight.For information, see Use SSH with HDInsight.

  2. Per creare l'argomento di Kafka, usare il comando seguente.To create the Kafka topic, use the following command. Sostituire $KAFKAZKHOSTS con le informazioni dell'host Zookeeper usate durante la configurazione della topologia:Replace $KAFKAZKHOSTS with the Zookeeper host information you used when configuring the topology:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stormtopic --zookeeper $KAFKAZKHOSTS
    

    Questo comando si connette a Zookeeper per il cluster Kafka e crea un nuovo argomento denominato stormtopic.This command connects to Zookeeper for the Kafka cluster and creates a new topic named stormtopic. Questo argomento viene usato dalle topologie Storm.This topic is used by the Storm topologies.

Avviare il writerStart the writer

  1. Usare il codice seguente per connettersi al cluster Storm tramite SSH.Use the following to connect to the Storm cluster using SSH. Sostituire sshuser con il nome utente SSH usato durante la creazione del cluster.Replace sshuser with the SSH user name used when creating the cluster. Sostituire stormclustername con il nome del cluster Storm:Replace stormclustername with the name the Storm cluster:

    ssh sshuser@stormclustername-ssh.azurehdinsight.net
    

    Quando richiesto, immettere la password usata durante la creazione del cluster.When prompted, enter the password you used when creating the clusters.

    Per altre informazioni, vedere Usare SSH con HDInsight.For information, see Use SSH with HDInsight.

  2. Dalla connessione SSH al cluster Storm usare il comando seguente per avviare la topologia del writer:From the SSH connection to the Storm cluster, use the following command to start the writer topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writer.yaml --filter dev.properties
    

    I parametri usati con questo comando sono i seguenti:The parameters used with this command are:

    • org.apache.storm.flux.Flux: usa Flux per configurare ed eseguire questa topologia.org.apache.storm.flux.Flux: Use Flux to configure and run this topology.

    • --remote: invia la topologia a Nimbus.--remote: Submit the topology to Nimbus. La topologia viene distribuita ai nodi del ruolo di lavoro nel cluster.The topology is distributed across the worker nodes in the cluster.

    • -R /writer.yaml: usa il file writer.yaml per configurare la topologia.-R /writer.yaml: Use the writer.yaml file to configure the topology. -R indica che questa risorsa è inclusa nel file JAR.-R indicates that this resource is included in the jar file. È nella radice del file JAR, quindi /writer.yaml è il relativo percorso.It's in the root of the jar, so /writer.yaml is the path to it.

    • --filter: consente di compilare le voci nella topologia writer.yaml con i valori nel file dev.properties.--filter: Populate entries in the writer.yaml topology using values in the dev.properties file. Ad esempio, il valore della voce kafka.topic nel file viene usato per sostituire la voce ${kafka.topic} nella definizione della topologia.For example, the value of the kafka.topic entry in the file is used to replace the ${kafka.topic} entry in the topology definition.

Avviare il readerStart the reader

  1. Dalla sessione SSH nel cluster Storm usare il comando seguente per avviare la topologia del reader:From the SSH session to the Storm cluster, use the following command to start the reader topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml --filter dev.properties
    
  2. Attendere un minuto e quindi usare il comando seguente per visualizzare i file creati dalla topologia reader:Wait a minute and then use the following command to view the files created by the reader topology:

    hdfs dfs -ls /stormdata
    

    L'output è simile al testo seguente:The output is similar to the following text:

     Found 173 items
     -rw-r--r--   1 storm supergroup       5137 2018-04-09 19:00 /stormdata/hdfs-bolt-4-0-1523300453088.txt
     -rw-r--r--   1 storm supergroup       5128 2018-04-09 19:00 /stormdata/hdfs-bolt-4-1-1523300453624.txt
     -rw-r--r--   1 storm supergroup       5131 2018-04-09 19:00 /stormdata/hdfs-bolt-4-10-1523300455170.txt
     ...
    
  3. Per visualizzare il contenuto del file, usare il comando seguente.To view the contents of the file, use the following command. Sostituire filename.txt con il nome di un file:Replace filename.txt with the name of a file:

    hdfs dfs -cat /stormdata/filename.txt
    

    Il testo seguente è un esempio del contenuto del file:The following text is an example of the file contents:

     four score and seven years ago
     snow white and the seven dwarfs
     i am at two with nature
     snow white and the seven dwarfs
     i am at two with nature
     four score and seven years ago
     an apple a day keeps the doctor away
    

Arrestare le topologieStop the topologies

Dalla sessione SSH nel cluster Storm usare i comandi seguenti per arrestare le topologie di Storm:From an SSH session to the Storm cluster, use the following commands to stop the Storm topologies:

storm kill kafka-writer
storm kill kafka-reader

Pulire le risorseClean up resources

Per pulire le risorse create da questa esercitazione, eliminare il gruppo di risorse.To clean up the resources created by this tutorial, you can delete the resource group. Se si elimina il gruppo di risorse, vengono eliminati anche il cluster HDInsight associato e tutte le altre risorse correlate al gruppo di risorse.Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

Per rimuovere il gruppo di risorse usando il portale di Azure:To remove the resource group using the Azure portal:

  1. Nel portale di Azure espandere il menu a sinistra per aprire il menu dei servizi e quindi scegliere Gruppi di risorse per visualizzare l'elenco dei gruppi di risorse.In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. Individuare il gruppo di risorse da eliminare e quindi fare clic con il pulsante destro del mouse su Altro (...) a destra dell'elenco.Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. Scegliere Elimina gruppo di risorse e quindi confermare.Select Delete resource group, and then confirm.

Avviso

La fatturazione del cluster HDInsight inizia dopo la creazione del cluster e si interrompe solo quando questo viene eliminato.HDInsight cluster billing starts once a cluster is created and stops when the cluster is deleted. La fatturazione avviene con tariffa oraria, perciò si deve sempre eliminare il cluster in uso quando non lo si usa più.Billing is pro-rated per minute, so you should always delete your cluster when it is no longer in use.

Se si elimina un cluster Kafka su HDInsight vengono eliminati anche eventuali dati archiviati in Kafka.Deleting a Kafka on HDInsight cluster deletes any data stored in Kafka.

Passaggi successiviNext steps

In questa esercitazione è stato descritto come usare una topologia Storm per scrivere e leggere dati da Kafka in HDInsight.In this tutorial, you learned how to use a Storm topology to write to and read from Kafka on HDInsight. Si è inoltre appreso come archiviare dati nella risorsa di archiviazione compatibile con HDFS usata da HDInsight.You also learned how to store data to the HDFS compatible storage used by HDInsight.

Per altre informazioni sull'uso di Kafka in HDInsight, vedere il documento Use Kafka Producer and Consumer API (Usare la API Kafka Producer e Consumer).To learn more about using Kafka on HDInsight, see the Use Kafka Producer and Consumer API document.

Per informazioni sulla distribuzione e sul monitoraggio di topologie in HDInsight basato su Linux, vedere Distribuzione e gestione di topologie Apache Storm in HDInsight basato su Linux.For information on deploying and monitoring topologies on Linux-based HDInsight, see Deploy and manage Apache Storm topologies on Linux-based HDInsight