Kurz: Použití Apache Storm s Apache Kafka v HDInsightTutorial: Use Apache Storm with Apache Kafka on HDInsight

Tento kurz ukazuje použití topologie Apache Storm ke čtení a zápisu dat pomocí Apache Kafka v HDInsight.This tutorial demonstrates how to use an Apache Storm topology to read and write data with Apache Kafka on HDInsight. Tento kurz také ukazuje, jak zachovat data do úložiště s nezaručeným úložištěm Apache HADOOP HDFS kompatibilní s clustery.This tutorial also demonstrates how to persist data to the Apache Hadoop HDFS compatible storage on the Storm cluster.

V tomto kurzu se naučíte:In this tutorial, you learn how to:

  • Storm a KafkaStorm and Kafka
  • Vysvětlení kóduUnderstanding the code
  • Vytvoření clusterů Kafka a StormCreate Kafka and Storm clusters
  • Vytvoření topologieBuild the topology
  • Konfigurace topologieConfigure the topology
  • Vytvoření tématu KafkaCreate the Kafka topic
  • Spuštění topologieStart the topologies
  • Zastavení topologiíStop the topologies
  • Vyčištění prostředkůClean up resources

PožadavkyPrerequisites

Když na svoji vývojářskou pracovní stanici nainstalujete Javu a JDK, mohou být nastaveny následující proměnné prostředí.The following environment variables may be set when you install Java and the JDK on your development workstation. Nicméně byste měli zkontrolovat, že existují a že obsahují hodnoty správné pro váš systém.However, you should check that they exist and that they contain the correct values for your system.

  • JAVA_HOME – instalační adresář sady JDK.JAVA_HOME - should point to the directory where the JDK is installed.

  • PATH – měla by obsahovat následující cesty:PATH - should contain the following paths:

    • JAVA_HOME (nebo odpovídající cestu).JAVA_HOME (or the equivalent path).
    • JAVA_HOME\bin (nebo odpovídající cestu).JAVA_HOME\bin (or the equivalent path).
    • Adresář, kde je nainstalovaný Maven.The directory where Maven is installed.

Důležité

Kroky v tomto dokumentu vyžadují skupinu prostředků Azure obsahující cluster Storm ve službě HDInsight i cluster Kafka ve službě HDInsight.The steps in this document require an Azure resource group that contains both a Storm on HDInsight and a Kafka on HDInsight cluster. Oba tyto clustery se nacházejí ve službě Azure Virtual Network, což umožňuje přímou komunikaci clusteru Storm s clusterem Kafka.These clusters are both located within an Azure Virtual Network, which allows the Storm cluster to directly communicate with the Kafka cluster.

Pro usnadnění práce tento dokument odkazuje na šablonu, která může vytvořit všechny požadované prostředky Azure.For your convenience, this document links to a template that can create all the required Azure resources.

Další informace o používání služby HDInsight ve virtuální síti najdete v tématu Naplánování služby Virtual Network for HDInsight .For more information on using HDInsight in a virtual network, see the Plan a virtual network for HDInsight document.

Storm a KafkaStorm and Kafka

Apache Storm poskytuje několik komponent pro práci s Apache Kafka.Apache Storm provides the several components for working with Apache Kafka. V tomto kurzu jsou použité následující komponenty:The following components are used in this tutorial:

  • org.apache.storm.kafka.KafkaSpout: Tato součást čte data z Kafka.org.apache.storm.kafka.KafkaSpout: This component reads data from Kafka. Závisí na následujících komponentách:This component relies on the following components:

    • org.apache.storm.kafka.SpoutConfig: Poskytuje konfiguraci pro komponentu Spout.org.apache.storm.kafka.SpoutConfig: Provides configuration for the spout component.

    • org.apache.storm.spout.SchemeAsMultiSchemea org.apache.storm.kafka.StringScheme: Jak jsou data z Kafka transformovaná na řazenou kolekci členů.org.apache.storm.spout.SchemeAsMultiScheme and org.apache.storm.kafka.StringScheme: How the data from Kafka is transformed into a Storm tuple.

  • org.apache.storm.kafka.bolt.KafkaBolt: Tato součást zapisuje data do Kafka.org.apache.storm.kafka.bolt.KafkaBolt: This component writes data to Kafka. Závisí na následujících komponentách:This component relies on the following components:

    • org.apache.storm.kafka.bolt.selector.DefaultTopicSelector: Popisuje téma, které je zapsáno do.org.apache.storm.kafka.bolt.selector.DefaultTopicSelector: Describes the topic that is written to.

    • org.apache.kafka.common.serialization.StringSerializer: Nakonfiguruje šroub na serializaci dat jako řetězcové hodnoty.org.apache.kafka.common.serialization.StringSerializer: Configures the bolt to serialize data as a string value.

    • org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper: Mapuje ze struktury dat řazené kolekce členů použité v topologii překrytí do polí uložených v Kafka.org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper: Maps from the tuple data structure used inside the Storm topology to fields stored in Kafka.

Tyto komponenty jsou dostupné v balíčku org.apache.storm : storm-kafka.These components are available in the org.apache.storm : storm-kafka package. Použijte verzi balíčku, která odpovídá verzi Stormu.Use the package version that matches the Storm version. Pro HDInsight 3.6 je verze Stormu 1.1.0.For HDInsight 3.6, the Storm version is 1.1.0. Budete také potřebovat balíček org.apache.kafka : kafka_2.10 s dalšími komponentami systému Kafka.You also need the org.apache.kafka : kafka_2.10 package, which contains additional Kafka components. Použijte verzi balíčku, která odpovídá verzi systému Kafka.Use the package version that matches the Kafka version. V případě HDInsight 3,6 je Kafka verze 1.1.1.For HDInsight 3.6, the Kafka version is 1.1.1.

Následující kód XML je deklarace závislosti v pom.xml projektu Apache Maven :The following XML is the dependency declaration in the pom.xml for an Apache Maven project:

<!-- Storm components for talking to Kafka -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.1.0</version>
</dependency>
<!-- needs to be the same Kafka version as used on your cluster -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>1.1.1</version>
    <!-- Exclude components that are loaded from the Storm cluster at runtime -->
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Vysvětlení kóduUnderstanding the code

Kód použitý v tomto dokumentu je k dispozici na adrese https://github.com/Azure-Samples/hdinsight-storm-java-kafka.The code used in this document is available at https://github.com/Azure-Samples/hdinsight-storm-java-kafka.

V tomto kurzu se využívají dvě topologie:There are two topologies provided with this tutorial:

  • Kafka-Writer: Generuje náhodné věty a ukládá je do Kafka.Kafka-writer: Generates random sentences and stores them to Kafka.

  • Kafka – čtecí modul: Přečte data z Kafka a pak je uloží do úložiště souborů kompatibilního se systémem HDFS pro cluster ne.Kafka-reader: Reads data from Kafka and then stores it to the HDFS compatible file store for the Storm cluster.

    Varování

    Pokud chcete umožnit Stormu práci s úložištěm kompatibilním s HDFS, které používá HDInsight, je nutná akce skriptu.To enable the Storm to work with the HDFS compatible storage used by HDInsight, a script action is required. Skript nainstaluje pro Storm několik souborů jar do cesty extlib.The script installs several jar files to the extlib path for Storm. Šablona v tomto kurzu při vytváření clusteru tento skript použije automaticky.The template in this tutorial automatically uses the script during cluster creation.

    Pokud nepoužijete k vytvoření clusteru Storm šablonu v tomto dokumentu, pak musíte akci skriptu použít na cluster ručně.If you do not use the template in this document to create the Storm cluster, then you must manually apply the script action to your cluster.

    Akce skriptu je umístěna v https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh umístění a je použita na uzel správce a Nimbus v clusteru s více clustery.The script action is located at https://hdiconfigactions.blob.core.windows.net/linuxstormextlibv01/stormextlib.sh and is applied to the supervisor and nimbus nodes of the Storm cluster. Další informace o použití akcí skriptu najdete v dokumentu Přizpůsobení HDInsightu pomocí akcí skriptu.For more information on using script actions, see the Customize HDInsight using script actions document.

Topologie se definují pomocí komponenty Flux.The topologies are defined using Flux. Flux byl uveden ve Stormu 0.10.x a umožňuje oddělení konfigurace topologie od kódu.Flux was introduced in Storm 0.10.x and allows you to separate the topology configuration from the code. Pro topologie používající architekturu Flux se topologie definuje v souboru YAML.For Topologies that use the Flux framework, the topology is defined in a YAML file. Soubor YAML je možné zahrnout jako součást topologie.The YAML file can be included as part of the topology. Může to ale být také samostatný soubor, který použijete při odesílání topologie.It can also be a standalone file used when you submit the topology. Flux také podporuje různé substituce za běhu, což se využívá i v tomto příkladu.Flux also supports variable substitution at run-time, which is used in this example.

Pro tyto topologie jsou za běhu nastaveny následující parametry:The following parameters are set at run time for these topologies:

  • ${kafka.topic}: Název Kafka tématu, do kterého topologie čtou/zapisují.${kafka.topic}: The name of the Kafka topic that the topologies read/write to.

  • ${kafka.broker.hosts}: Hostitelé, na kterých se spouští zprostředkovatelé Kafka.${kafka.broker.hosts}: The hosts that the Kafka brokers run on. Informace o zprostředkovatelích využívá KafkaBolt při zápisu do systému Kafka.The broker information is used by the KafkaBolt when writing to Kafka.

  • ${kafka.zookeeper.hosts}: Hostitelé, na kterých Zookeeper běží v clusteru Kafka.${kafka.zookeeper.hosts}: The hosts that Zookeeper runs on in the Kafka cluster.

  • ${hdfs.url}: Adresa URL systému souborů pro komponentu HDFSBolt${hdfs.url}: The file system URL for the HDFSBolt component. Určuje, zda jsou data zapsána do účtu Azure Storage nebo Azure Data Lake Storage.Indicates whether the data is written to an Azure Storage account or Azure Data Lake Storage.

  • ${hdfs.write.dir}: Adresář, do kterého se zapisují data${hdfs.write.dir}: The directory that data is written to.

Další informace o topologiích Flux najdete zde: https://storm.apache.org/releases/current/flux.htmlFor more information on Flux topologies, see https://storm.apache.org/releases/current/flux.html.

Kafka-writerKafka-writer

V topologii Kafka-writer přebírá komponenta Kafka bolt dva řetězcové parametry.In the Kafka-writer topology, the Kafka bolt component takes two string values as parameters. Tyto parametry určují, která pole řazené kolekce členů se odesílají do systému Kafka jako klíč a zpráva.These parameters indicate which tuple fields the bolt sends to Kafka as key and message values. Klíč se používá k třídění dat v systému Kafka.The key is used to partition data in Kafka. Zpráva jsou vlastní ukládaná data.The message is the data being stored.

V tomto příkladu komponenta com.microsoft.example.SentenceSpout vydává řazenou kolekci členů obsahující dvě pole, key a message.In this example, the com.microsoft.example.SentenceSpout component emits a tuple that contains two fields, key and message. Kafka bolt extrahuje tato pole a odešle data z nich do systému Kafka.The Kafka bolt extracts these fields and sends the data in them to Kafka.

Samotná pole nemusí mít názvy key a message.The fields don't have to use the names key and message. Tyto názvy se v tomto projektu používají jen pro lepší srozumitelnost mapování.These names are used in this project to make the mapping easier to understand.

Následující YAML obsahuje definici komponenty Kafka-writer:The following YAML is the definition for the Kafka-writer component:

# kafka-writer
---

# topology definition
# name to be used when submitting
name: "kafka-writer"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Topic selector for KafkaBolt
  - id: "topicSelector"
    className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "${kafka.topic}"

  # Mapper for KafkaBolt
  - id: "kafkaMapper"
    className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
    constructorArgs:
      - "key"
      - "message"

  # Producer properties for KafkaBolt
  - id: "producerProperties"
    className: "java.util.Properties"
    configMethods:
      - name: "put"
        args:
          - "bootstrap.servers"
          - "${kafka.broker.hosts}"
      - name: "put"
        args:
          - "acks"
          - "1"
      - name: "put"
        args:
          - "key.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
      - name: "put"
        args:
          - "value.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
 

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "sentence-spout"
    className: "com.microsoft.example.SentenceSpout"
    parallelism: 8

# Bolt definitions
bolts:
  - id: "kafka-bolt"
    className: "org.apache.storm.kafka.bolt.KafkaBolt"
    parallelism: 8
    configMethods:
    - name: "withProducerProperties"
      args: [ref: "producerProperties"]
    - name: "withTopicSelector"
      args: [ref: "topicSelector"]
    - name: "withTupleToKafkaMapper"
      args: [ref: "kafkaMapper"]

# Stream definitions

streams:
  - name: "spout --> kafka" # Streams data from the sentence spout to the Kafka bolt
    from: "sentence-spout"
    to: "kafka-bolt"
    grouping:
      type: SHUFFLE

Kafka-readerKafka-reader

V topologii Kafka-reader čte komponenta spout data ze systému Kafka jako řetězcové hodnoty.In the Kafka-reader topology, the spout component reads data from Kafka as string values. Data se pak zapíšou do protokolu Storm pomocí protokolovací komponenty a do systému souborů kompatibilního s HDFS pro cluster Storm komponentou HDFS bolt.The data is then written the Storm log by the logging component and to the HDFS compatible file system for the Storm cluster by the HDFS bolt component.

# kafka-reader
---

# topology definition
# name to be used when submitting
name: "kafka-reader"

# Components - constructors, property setters, and builder arguments.
# Currently, components must be declared in the order they are referenced
components:
  # Convert data from Kafka into string tuples in storm
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"
  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "${kafka.zookeeper.hosts}"

  # Spout configuration
  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "zkHosts"
      # topic
      - "${kafka.topic}"
      # zkRoot
      - ""
      # id
      - "readerid"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

    # How often to sync files to HDFS; every 1000 tuples.
  - id: "syncPolicy"
    className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
    constructorArgs:
      - 1

  # Rotate files when they hit 5 MB
  - id: "rotationPolicy"
    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
    constructorArgs:
      - 5
      - "KB"

  # File format; read the directory from filters at run time, and use a .txt extension when writing.
  - id: "fileNameFormat"
    className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
    configMethods:
      - name: "withPath"
        args: ["${hdfs.write.dir}"]
      - name: "withExtension"
        args: [".txt"]

  # Internal file format; fields delimited by `|`.
  - id: "recordFormat"
    className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
    configMethods:
      - name: "withFieldDelimiter"
        args: ["|"]

# Topology configuration
config:
  topology.workers: 2

# Spout definitions
spouts:
  - id: "kafka-spout"
    className: "org.apache.storm.kafka.KafkaSpout"
    constructorArgs:
      - ref: "spoutConfig"
    # Set to the number of partitions for the topic
    parallelism: 8

# Bolt definitions
bolts:
  - id: "logger-bolt"
    className: "com.microsoft.example.LoggerBolt"
    parallelism: 1
  
  - id: "hdfs-bolt"
    className: "org.apache.storm.hdfs.bolt.HdfsBolt"
    configMethods:
      - name: "withConfigKey"
        args: ["hdfs.config"]
      - name: "withFsUrl"
        args: ["${hdfs.url}"]
      - name: "withFileNameFormat"
        args: [ref: "fileNameFormat"]
      - name: "withRecordFormat"
        args: [ref: "recordFormat"]
      - name: "withRotationPolicy"
        args: [ref: "rotationPolicy"]
      - name: "withSyncPolicy"
        args: [ref: "syncPolicy"]
    parallelism: 1

# Stream definitions

streams:
  # Stream data to log
  - name: "kafka --> log" # name isn't used (placeholder for logging, UI, etc.)
    from: "kafka-spout"
    to: "logger-bolt"
    grouping:
      type: SHUFFLE
  
  # stream data to file
  - name: "kafka --> hdfs"
    from: "kafka-spout"
    to: "hdfs-bolt"
    grouping:
      type: SHUFFLE

Náhrady vlastnostíProperty substitutions

Projekt obsahuje soubor s názvem dev.properties, který se používá pro předávání parametrů využívaných topologiemi.The project contains a file named dev.properties that is used to pass parameters used by the topologies. Definuje následující vlastnosti:It defines the following properties:

Soubor dev.propertiesdev.properties file PopisDescription
kafka.zookeeper.hosts Apache Zookeeper hostitelé pro cluster Kafka.The Apache ZooKeeper hosts for the Kafka cluster.
kafka.broker.hosts Hostitelé zprostředkovatelů Kafka (pracovní uzly).The Kafka broker hosts (worker nodes).
kafka.topic Téma Kafka, které topologie používají.The Kafka topic that the topologies use.
hdfs.write.dir Adresář, do kterého topologie Kafka-reader zapisuje.The directory that the Kafka-reader topology writes to.
hdfs.url Systém souborů používaný clusterem Storm.The file system used by the Storm cluster. Pro účty úložiště Azure Storage použijte hodnotu wasb:///.For Azure Storage accounts, use a value of wasb:///. Pro Azure Data Lake Storage Gen2 použijte hodnotu abfs:///.For Azure Data Lake Storage Gen2, use a value of abfs:///. Pro Azure Data Lake Storage Gen1 použijte hodnotu adl:///.For Azure Data Lake Storage Gen1, use a value of adl:///.

Vytvoření clusterůCreate the clusters

Apache Kafka ve službě HDInsight neposkytuje přístup ke zprostředkovatelům systému Kafka přes veřejný internet.Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Cokoli, co využívá systém Kafka, musí být ve stejné virtuální síti Azure.Anything that uses Kafka must be in the same Azure virtual network. V tomto kurzu se clustery Kafka i Storm nacházejí ve stejné virtuální síti Azure.In this tutorial, both the Kafka and Storm clusters are located in the same Azure virtual network.

Následující diagram znázorňuje tok komunikace mezi Stormem a systémem Kafka:The following diagram shows how communication flows between Storm and Kafka:

Diagram clusterů Storm a Kafka ve virtuální síti Azure

Poznámka

K dalším službám v clusteru, jako jsou SSH a Apache Ambari , se dá dostat přes Internet.Other services on the cluster such as SSH and Apache Ambari can be accessed over the internet. Další informace o veřejných portech dostupných ve službě HDInsight najdete v tématu Porty a identifikátory URI používané službou HDInsight.For more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

K vytvoření virtuální sítě Azure a následnému vytvoření clusterů Kafka a Storm v rámci této sítě použijte následující postup:To create an Azure Virtual Network, and then create the Kafka and Storm clusters within it, use the following steps:

  1. Pomocí následujícího tlačítka se přihlaste do Azure a otevřete šablonu na webu Azure Portal.Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure

    Šablona Azure Resource Manageru se nachází na adrese https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json .The Azure Resource Manager template is located at https://github.com/Azure-Samples/hdinsight-storm-java-kafka/blob/master/create-kafka-storm-clusters-in-vnet.json. Vytvoří následující prostředky:It creates the following resources:

    • Skupina prostředků AzureAzure resource group
    • Azure Virtual NetworkAzure Virtual Network
    • Účet služby Azure StorageAzure Storage account
    • Kafka v HDInsight verze 3.6 (tři pracovní uzly)Kafka on HDInsight version 3.6 (three worker nodes)
    • Storm v HDInsight verze 3.6 (tři pracovní uzly)Storm on HDInsight version 3.6 (three worker nodes)

    Varování

    Pokud chcete zajistit dostupnost Kafka v HDInsightu, musí cluster obsahovat aspoň tři pracovní uzly.To guarantee availability of Kafka on HDInsight, your cluster must contain at least three worker nodes. Tato šablona vytvoří cluster Kafka se třemi pracovními uzly.This template creates a Kafka cluster that contains three worker nodes.

  2. Podle následujícího návodu vyplňte položky v části Vlastní nasazení:Use the following guidance to populate the entries on the Custom deployment section:

    1. Pomocí následujících informací vyplňte položky v části Přizpůsobená šablona:Use the following information to populate the entries on the Customized template section:

      NastaveníSetting ValueValue
      SubscriptionSubscription Vaše předplatné AzureYour Azure subscription
      Resource groupResource group Skupina prostředků obsahující prostředky.The resource group that contains the resources.
      LocationLocation Oblast Azure, ve které se prostředky vytvoří.The Azure region that the resources are created in.
      Název clusteru KafkaKafka Cluster Name Název clusteru Kafka.The name of the Kafka cluster.
      Název clusteru StormStorm Cluster Name Název clusteru Storm.The name of the Storm cluster.
      Uživatelské jméno přihlášení clusteruCluster Login User Name Uživatelské jméno správce clusterů.The admin user name for the clusters.
      Heslo přihlášení clusteruCluster Login Password Heslo správce clusterů.The admin user password for the clusters.
      Uživatelské jméno SSHSSH User Name Uživatel SSH, který se má pro clustery vytvořit.The SSH user to create for the clusters.
      Heslo SSHSSH Password Heslo uživatele SSH.The password for the SSH user.

      Obrázek parametrů šablony

  3. Přečtěte si Podmínky a ujednání a pak vyberte Souhlasím s podmínkami a ujednáními uvedenými nahoře.Read the Terms and Conditions, and then select I agree to the terms and conditions stated above.

  4. Nakonec zaškrtněte políčko Připnout na řídicí panel a vyberte Koupit.Finally, check Pin to dashboard and then select Purchase.

Poznámka

Vytvoření clusterů může trvat až 20 minut.It can take up to 20 minutes to create the clusters.

Vytvoření topologieBuild the topology

  1. Ve svém vývojovém prostředí si stáhněte projekt z adresy https://github.com/Azure-Samples/hdinsight-storm-java-kafka, otevřete příkazový řádek a přejděte do adresáře, kam jste projekt stáhli.On your development environment, download the project from https://github.com/Azure-Samples/hdinsight-storm-java-kafka, open a command-line, and change directories to the location that you downloaded the project.

  2. V adresáři hdinsight-storm-java-kafka následujícím příkazem zkompilujte projekt a vytvořte balíček pro nasazení:From the hdinsight-storm-java-kafka directory, use the following command to compile the project and create a package for deployment:

    mvn clean package
    

    Proces vytvoří v adresáři target soubor balíčku s názvem KafkaTopology-1.0-SNAPSHOT.jar.The package process creates a file named KafkaTopology-1.0-SNAPSHOT.jar in the target directory.

  3. Pomocí následujících příkazů zkopírujte balíček do vašeho Stormu v clusteru HDInsight.Use the following commands to copy the package to your Storm on HDInsight cluster. sshuser nahraďte uživatelským jménem SSH pro cluster.Replace sshuser with the SSH user name for the cluster. stormclustername nahraďte názvem clusteru Storm.Replace stormclustername with the name of the Storm cluster.

    scp ./target/KafkaTopology-1.0-SNAPSHOT.jar sshuser@stormclustername-ssh.azurehdinsight.net:KafkaTopology-1.0-SNAPSHOT.jar
    

    Po zobrazení výzvy zadejte heslo, které jste při vytváření clusteru nastavili.When prompted, enter the password you used when creating the clusters.

Konfigurace topologieConfigure the topology

  1. Ke zjišťování zprostředkovatelských hostitelů Kafka pro systém Kafka na clusteru HDInsight použijte jednu z následujících metod:Use one of the following methods to discover the Kafka broker hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER" `
        -Credential $creds `
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $brokerHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($brokerHosts -join ":9092,") + ":9092"
    

    Důležité

    Následující příklad Bash předpokládá, že $CLUSTERNAME obsahuje název clusteru Kafka.The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster name. Dále předpokládá, že nainstalovaná verze jq je 1.5 nebo vyšší.It also assumes that jq version 1.5 or greater is installed. Po zobrazení výzvy zadejte heslo účtu pro přihlášení ke clusteru.When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2
    

    Vrácená hodnota je obdobná následujícímu textu:The value returned is similar to the following text:

     ```output
     wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
     ```
    

    Důležité

    V clusteru sice můžete mít víc než dva zprostředkovatelské hostitele, není však nutné poskytovat klientům jejich úplný seznam.While there may be more than two broker hosts for your cluster, you do not need to provide a full list of all hosts to clients. Jeden nebo dva jsou postačující.One or two is enough.

  2. Pomocí jedné z následujících metod zjistěte hostitele Zookeeper pro systém Kafka v clusteru HDInsight:Use one of the following methods to discover the Zookeeper hosts for the Kafka on HDInsight cluster:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" `
        -Credential $creds `
        -UseBasicParsing
    $respObj = ConvertFrom-Json $resp.Content
    $zookeeperHosts = $respObj.host_components.HostRoles.host_name[0..1]
    ($zookeeperHosts -join ":2181,") + ":2181"
    

    Důležité

    Následující příklad Bash předpokládá, že $CLUSTERNAME obsahuje název clusteru Kafka.The following Bash example assumes that $CLUSTERNAME contains the name of the Kafka cluster. Dále předpokládá instalaci jq.It also assumes that jq is installed. Po zobrazení výzvy zadejte heslo účtu pro přihlášení ke clusteru.When prompted, enter the password for the cluster login account.

    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2
    

    Vrácená hodnota je obdobná následujícímu textu:The value returned is similar to the following text:

     zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
    

    Důležité

    Ačkoli máte více než dva uzly Zookeeper, není nutné poskytovat klientům úplný seznam jejich hostitelů.While there are more than two Zookeeper nodes, you do not need to provide a full list of all hosts to clients. Jeden nebo dva jsou postačující.One or two is enough.

    Uložte tuto hodnotu, bude se hodit později.Save this value, as it is used later.

  3. Upravte soubor dev.properties umístěný v kořenovém adresáři projektu.Edit the dev.properties file in the root of the project. Přidejte informace o hostitelích zprostředkovatelů a Zookeeper pro cluster Kafka do odpovídajících řádků v tomto souboru.Add the Broker and Zookeeper hosts information for the Kafka cluster to the matching lines in this file. Následující příklad je konfigurovaný s použitím hodnot z předchozích kroků:The following example is configured using the sample values from the previous steps:

     kafka.zookeeper.hosts: zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
     kafka.broker.hosts: wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
     kafka.topic: stormtopic
    

    Důležité

    Položka hdfs.url je konfigurována pro cluster, který používá účet úložiště Azure Storage.The hdfs.url entry is configured for a cluster that uses an Azure Storage account. Pokud chcete tuto topologii použít s clusterem s více podmnožinami, který wasb používá adlData Lake Storage, změňte tuto hodnotu z na.To use this topology with a Storm cluster that uses Data Lake Storage, change this value from wasb to adl.

  4. Uložte soubor dev.properties a následujícím příkazem ho nahrajte do clusteru Storm:Save the dev.properties file and then use the following command to upload it to the Storm cluster:

    scp dev.properties USERNAME@BASENAME-ssh.azurehdinsight.net:dev.properties
    

    USERNAME nahraďte uživatelským jménem SSH pro cluster.Replace USERNAME with the SSH user name for the cluster. BASENAME nahraďte základním názvem použitým při vytváření clusteru.Replace BASENAME with the base name you used when creating the cluster.

Vytvoření tématu KafkaCreate the Kafka topic

Kafka ukládá data do tématu.Kafka stores data into a topic. Téma je třeba vytvořit před spuštěním topologií Storm.You must create the topic before starting the Storm topologies. Téma vytvořte následujícím postupem:To create the topology, use the following steps:

  1. Připojte se ke clusteru Kafka prostřednictvím SSH pomocí následujícího příkazu.Connect to the Kafka cluster through SSH by using the following command. Nahraďte sshuser uživatelským jménem SSH použitým při vytváření clusteru.Replace sshuser with the SSH user name used when creating the cluster. Nahraďte kafkaclustername názvem clusteru Kafka:Replace kafkaclustername with the name of the Kafka cluster:

    ssh sshuser@kafkaclustername-ssh.azurehdinsight.net
    

    Po zobrazení výzvy zadejte heslo, které jste při vytváření clusteru nastavili.When prompted, enter the password you used when creating the clusters.

    Další informace najdete v tématu Použití SSH se službou HDInsight.For information, see Use SSH with HDInsight.

  2. K vytvoření tématu Kafka použijte následující příkaz.To create the Kafka topic, use the following command. Nahraďte $KAFKAZKHOSTS informací o hostiteli Zookeeper, kterou jste použili při konfiguraci topologie:Replace $KAFKAZKHOSTS with the Zookeeper host information you used when configuring the topology:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stormtopic --zookeeper $KAFKAZKHOSTS
    

    Tento příkaz se připojí k hostiteli Zookeeper pro cluster Kafka a vytvoří nové téma s názvem stormtopic.This command connects to Zookeeper for the Kafka cluster and creates a new topic named stormtopic. Toto téma budou využívat topologie Storm.This topic is used by the Storm topologies.

Spuštění topologie writerStart the writer

  1. Následujícím příkazem se připojte ke clusteru Storm přes SSH.Use the following to connect to the Storm cluster using SSH. sshuser nahraďte uživatelským jménem SSH použitým při vytváření clusteru.Replace sshuser with the SSH user name used when creating the cluster. stormclustername nahraďte názvem clusteru Storm:Replace stormclustername with the name the Storm cluster:

    ssh sshuser@stormclustername-ssh.azurehdinsight.net
    

    Po zobrazení výzvy zadejte heslo, které jste při vytváření clusteru nastavili.When prompted, enter the password you used when creating the clusters.

    Další informace najdete v tématu Použití SSH se službou HDInsight.For information, see Use SSH with HDInsight.

  2. V okně připojení SSK ke clusteru zadáním následujícího příkazu spusťte topologii writer:From the SSH connection to the Storm cluster, use the following command to start the writer topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writer.yaml --filter dev.properties
    

    Parametry použité v tomto příkazu jsou následující:The parameters used with this command are:

    • org.apache.storm.flux.Flux: Ke konfiguraci a spuštění této topologie použijte tok.org.apache.storm.flux.Flux: Use Flux to configure and run this topology.

    • --remote: Odešlete topologii do Nimbus.--remote: Submit the topology to Nimbus. Topologie bude distribuována do pracovních uzlů v clusteru.The topology is distributed across the worker nodes in the cluster.

    • -R /writer.yaml: writer.yaml Použijte soubor ke konfiguraci topologie.-R /writer.yaml: Use the writer.yaml file to configure the topology. -R označuje, že tento prostředek je zahrnutý v souboru jar.-R indicates that this resource is included in the jar file. Je v kořenové složce souboru jar, takže /writer.yaml je cesta k němu.It's in the root of the jar, so /writer.yaml is the path to it.

    • --filter: Naplňte položky v writer.yaml topologii pomocí hodnot dev.properties v souboru.--filter: Populate entries in the writer.yaml topology using values in the dev.properties file. Například hodnota položky kafka.topic v souboru se použije k nahrazení položky ${kafka.topic} v definici topologie.For example, the value of the kafka.topic entry in the file is used to replace the ${kafka.topic} entry in the topology definition.

Spuštění topologie readerStart the reader

  1. V okně připojení ke clusteru Storm přes SSH zadáním následujícího příkazu spusťte topologii reader:From the SSH session to the Storm cluster, use the following command to start the reader topology:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml --filter dev.properties
    
  2. Počkejte chvíli a pak následujícím příkazem zobrazte soubory vytvořené topologií reader:Wait a minute and then use the following command to view the files created by the reader topology:

    hdfs dfs -ls /stormdata
    

    Výstup se bude podobat následujícímu:The output is similar to the following text:

     Found 173 items
     -rw-r--r--   1 storm supergroup       5137 2018-04-09 19:00 /stormdata/hdfs-bolt-4-0-1523300453088.txt
     -rw-r--r--   1 storm supergroup       5128 2018-04-09 19:00 /stormdata/hdfs-bolt-4-1-1523300453624.txt
     -rw-r--r--   1 storm supergroup       5131 2018-04-09 19:00 /stormdata/hdfs-bolt-4-10-1523300455170.txt
     ...
    
  3. K zobrazení obsahu souboru použijte následující příkaz.To view the contents of the file, use the following command. Nahraďte filename.txt názvem souboru:Replace filename.txt with the name of a file:

    hdfs dfs -cat /stormdata/filename.txt
    

    Následující text uvádí příklad obsahu souboru:The following text is an example of the file contents:

     four score and seven years ago
     snow white and the seven dwarfs
     i am at two with nature
     snow white and the seven dwarfs
     i am at two with nature
     four score and seven years ago
     an apple a day keeps the doctor away
    

Zastavení topologiíStop the topologies

V okně připojení ke clusteru Storm přes SSH zadáním následujícího příkazu zastavte topologie Storm:From an SSH session to the Storm cluster, use the following commands to stop the Storm topologies:

storm kill kafka-writer
storm kill kafka-reader

Vyčištění prostředkůClean up resources

Pokud chcete vyčistit prostředky vytvořené v tomto kurzu, můžete odstranit skupinu prostředků.To clean up the resources created by this tutorial, you can delete the resource group. Odstraněním skupiny prostředků odstraníte také přidružený cluster HDInsight a všechny další prostředky, které jsou k příslušné skupině prostředků přidružené.Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

Odebrání skupiny prostředků pomocí webu Azure Portal:To remove the resource group using the Azure portal:

  1. Na webu Azure Portal rozbalením nabídky na levé straně otevřete nabídku služeb a pak zvolte Skupiny prostředků. Zobrazí se seznam skupin prostředků.In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. Vyhledejte skupinu prostředků, kterou chcete odstranit, a klikněte pravým tlačítkem na tlačítko Další (...) na pravé straně seznamu.Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. Vyberte Odstranit skupinu prostředků a potvrďte tuto akci.Select Delete resource group, and then confirm.

Další krokyNext steps

V tomto kurzu jste zjistili, jak používat topologii Apache Storm k zápisu do Apache Kafka ve službě HDInsight a čtení z nich.In this tutorial, you learned how to use an Apache Storm topology to write to and read from Apache Kafka on HDInsight. Zjistili jste také, jak ukládat data do úložiště kompatibilního s Apache HADOOP HDFS používaného službou HDInsight.You also learned how to store data to the Apache Hadoop HDFS compatible storage used by HDInsight.