Kurz: Použití Apache Kafka Producer and Consumer APITutorial: Use the Apache Kafka Producer and Consumer APIs

Zjistěte, jak používat rozhraní Apache Kafka Producer and Consumer API se systémem Kafka ve službě HDInsight.Learn how to use the Apache Kafka Producer and Consumer APIs with Kafka on HDInsight.

Rozhraní Kafka Producer API umožňuje aplikacím odesílat datové proudy do clusteru Kafka.The Kafka Producer API allows applications to send streams of data to the Kafka cluster. Rozhraní Kafka Consumer API umožňuje aplikacím číst datové proudy z clusteru.The Kafka Consumer API allows applications to read streams of data from the cluster.

V tomto kurzu se naučíte:In this tutorial, you learn how to:

  • PožadavkyPrerequisites
  • Vysvětlení kóduUnderstand the code
  • Sestavení a nasazení aplikaceBuild and deploy the application
  • Spuštění aplikace v clusteruRun the application on the cluster

Další informace o rozhraních API najdete v dokumentaci k rozhraní Producer API a Consumer API na webu Apache.For more information on the APIs, see Apache documentation on the Producer API and Consumer API.

PožadavkyPrerequisites

Vysvětlení kóduUnderstand the code

Ukázková aplikace se nachází na adrese https://github.com/Azure-Samples/hdinsight-kafka-java-get-started v podadresáři Producer-Consumer.The example application is located at https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in the Producer-Consumer subdirectory. Aplikace se skládá primárně ze čtyř souborů:The application consists primarily of four files:

  • pom.xml: Tento soubor definuje závislosti projektu, verze Javy a balení metody.pom.xml: This file defines the project dependencies, Java version, and packaging methods.
  • Producer.java: Tento soubor odešle náhodné věty Kafka pomocí konzumenta rozhraní API.Producer.java: This file sends random sentences to Kafka using the producer API.
  • Consumer.java: Tento soubor používá příjemce rozhraní API pro čtení dat z Kafka a posílat do STDOUT.Consumer.java: This file uses the consumer API to read data from Kafka and emit it to STDOUT.
  • Run.java: Rozhraní příkazového řádku používají ke spouštění kódu producenta a konzumenta.Run.java: The command-line interface used to run the producer and consumer code.

Pom.xmlPom.xml

V souboru pom.xml je důležité porozumět následujícímu:The important things to understand in the pom.xml file are:

  • Závislosti: Tento projekt využívá Kafka producenta a konzumenta rozhraní API, které jsou k dispozici ve kafka-clients balíčku.Dependencies: This project relies on the Kafka producer and consumer APIs, which are provided by the kafka-clients package. Tuto závislost definuje následující kód XML:The following XML code defines this dependency:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    

    Položka ${kafka.version} se deklaruje v části <properties>..</properties> souboru pom.xml a je nakonfigurovaná na verzi systému Kafka v clusteru HDInsight.The ${kafka.version} entry is declared in the <properties>..</properties> section of pom.xml, and is configured to the Kafka version of the HDInsight cluster.

  • Moduly plug-in: Moduly plug-in maven poskytují různé možnosti.Plugins: Maven plugins provide various capabilities. V tomto projektu se používají následující moduly plug-in:In this project, the following plugins are used:

    • maven-compiler-plugin: Slouží k nastavení Java verze používané v projektu na 8.maven-compiler-plugin: Used to set the Java version used by the project to 8. To je verze Javy, kterou používá HDInsight 3.6.This is the version of Java used by HDInsight 3.6.
    • maven-shade-plugin: Použít ke generování uber jar, který obsahuje tato aplikace, stejně jako všechny závislosti.maven-shade-plugin: Used to generate an uber jar that contains this application as well as any dependencies. Používá se také k nastavení vstupního bodu aplikace, abyste mohli přímo spustit soubor JAR bez nutnosti zadávat hlavní třídu.It is also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.

Producer.javaProducer.java

Producent komunikuje s hostiteli zprostředkovatelů Kafka (pracovní uzly) a odesílá data do tématu Kafka.The producer communicates with the Kafka broker hosts (worker nodes) and sends data to a Kafka topic. Následující fragment kódu je z Producer.java soubor úložiště GitHub a ukazuje, jak nastavit vlastnosti výrobce:The following code snippet is from the Producer.java file from the GitHub repository and shows how to set the producer properties:

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.javaConsumer.java

Konzument komunikuje s hostiteli zprostředkovatelů Kafka (pracovní uzly) a ve smyčce čte záznamy.The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. Následující fragment kódu ze souboru Consumer.java nastaví vlastnosti příjemce:The following code snippet from the Consumer.java file sets the consumer properties:

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

V tomto kódu je konzument nakonfigurovaný tak, aby četl od začátku tématu (hodnota auto.offset.reset je nastavená na earliest).In this code, the consumer is configured to read from the start of the topic (auto.offset.reset is set to earliest.)

Run.javaRun.java

Run.java soubor poskytuje rozhraní příkazového řádku, která spustí producenta nebo příjemce kód.The Run.java file provides a command-line interface that runs either the producer or consumer code. Jako parametr je potřeba zadat informace o hostiteli zprostředkovatele Kafka.You must provide the Kafka broker host information as a parameter. Hodnota ID skupiny, který je používán procesem příjemce může volitelně zahrnovat.You can optionally include a group ID value, which is used by the consumer process. Pokud vytvoříte více instancí příjemce pomocí stejné ID skupiny, se bude zatížení můžete vyrovnávat čtení z tématu.If you create multiple consumer instances using the same group ID, they will load balance reading from the topic.

Sestavení a nasazení příkladuBuild and deploy the example

  1. Stažení a extrakci příklady z https://github.com/Azure-Samples/hdinsight-kafka-java-get-started .Download and extract the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Nastavit aktuální adresář na umístění hdinsight-kafka-java-get-started\Producer-Consumer adresáře a použijte následující příkaz:Set your current directory to the location of the hdinsight-kafka-java-get-started\Producer-Consumer directory and use the following command:

    mvn clean package
    

    Tento příkaz vytvoří adresář s názvem target, který bude obsahovat soubor s názvem kafka-producer-consumer-1.0-SNAPSHOT.jar.This command creates a directory named target, that contains a file named kafka-producer-consumer-1.0-SNAPSHOT.jar.

  3. Místo sshuser použijte jméno uživatele SSH pro váš cluster a místo CLUSTERNAME zadejte název clusteru.Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. Zadejte následující příkaz pro kopírování kafka-producer-consumer-1.0-SNAPSHOT.jar soubor do vašeho clusteru HDInsight.Enter the following command to copy the kafka-producer-consumer-1.0-SNAPSHOT.jar file to your HDInsight cluster. Po zobrazení výzvy zadejte heslo uživatele SSH.When prompted enter the password for the SSH user.

    scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

Spuštění příkladuRun the example

  1. Místo sshuser použijte jméno uživatele SSH pro váš cluster a místo CLUSTERNAME zadejte název clusteru.Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. Otevřete připojení SSH ke clusteru, tak, že zadáte následující příkaz.Open an SSH connection to the cluster, by entering the following command. Pokud se zobrazí výzva, zadejte heslo uživatelského účtu SSH.If prompted, enter the password for the SSH user account.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Nainstalujte jq, příkazového řádku procesoru JSON.Install jq, a command-line JSON processor. V otevřené připojení SSH, zadejte následující příkaz k instalaci jq:From the open SSH connection, enter following command to install jq:

    sudo apt -y install jq
    
  3. Nastavte proměnné prostředí.Set up environment variables. Nahraďte PASSWORD a CLUSTERNAME přihlašovací heslo clusteru a clusteru název v uvedeném pořadí, a pak zadejte příkaz:Replace PASSWORD and CLUSTERNAME with the cluster login password and cluster name respectively, then enter the command:

    export password='PASSWORD'
    export clusterNameA='CLUSTERNAME'
    
  4. Rozbalte název clusteru správně notaci.Extract correctly cased cluster name. Skutečné malých a velkých písmen na název clusteru může být jiný než byste očekávali, v závislosti na způsobu vytvoření clusteru.The actual casing of the cluster name may be different than you expect, depending on how the cluster was created. Tento příkaz se získat skutečný malých a velkých písmen, uložte ho do proměnné a pak zobrazí správně cased název a název, který jste zadali dříve.This command will obtain the actual casing, store it in a variable, and then display the correctly cased name, and the name you provided earlier. Zadejte následující příkaz:Enter the following command:

    export clusterName=$(curl -u admin:$password -sS -G "https://$clusterNameA.azurehdinsight.net/api/v1/clusters" \
     | jq -r '.items[].Clusters.cluster_name')
    echo $clusterName, $clusterNameA
    
  5. Chcete-li získat zprostředkovatele Kafka, hostitele a hostitele Apache Zookeeper, použijte následující příkaz:To get the Kafka broker hosts and the Apache Zookeeper hosts, use the following command:

    export KAFKABROKERS=`curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER \
     | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  6. Vytvoří téma Kafka myTest, tak, že zadáte následující příkaz:Create Kafka topic, myTest, by entering the following command:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  7. Pokud chcete spustit producenta a zapsat data do tématu, použijte následující příkaz:To run the producer and write data to the topic, use the following command:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  8. Jakmile bude producent hotový, pomocí následujícího příkazu zahajte čtení z tématu:Once the producer has finished, use the following command to read from the topic:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    

    Zobrazí se počet načtených záznamů spolu s celkovým počtem.The records read, along with a count of records, is displayed.

  9. Konzumenta ukončíte stisknutím Ctrl+C.Use Ctrl + C to exit the consumer.

Víc současných konzumentůMultiple consumers

Konzumenti Kafka při čtení záznamů používají skupiny konzumentů.Kafka consumers use a consumer group when reading records. Výsledkem použití skupiny s více konzumenty je vyvážení zatížení při čtení záznamů z tématu.Using the same group with multiple consumers results in load balanced reads from a topic. Každý konzument ze skupiny obdrží určitou část záznamů.Each consumer in the group receives a portion of the records.

Aplikace konzumenta přijímá parametr, který se použije jako ID skupiny.The consumer application accepts a parameter that is used as the group ID. Například následující příkaz spustí konzumenta s použitím ID skupiny myGroup:For example, the following command starts a consumer using a group ID of myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Konzumenta ukončíte stisknutím Ctrl+C.Use Ctrl + C to exit the consumer.

Pokud chcete vidět tento proces v akci, použijte následující příkaz:To see this process in action, use the following command:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

Tento příkaz pomocí tmux rozdělí terminál do dvou sloupců.This command uses tmux to split the terminal into two columns. V obou sloupcích je spuštěný konzument se stejnou hodnotou ID skupiny.A consumer is started in each column, with the same group ID value. Jakmile konzumenti dokončí čtení, všimněte si, že oba přečetli pouze část záznamů.Once the consumers finish reading, notice that each read only a portion of the records. Použití Ctrl + C dvakrát ukončíte tmux.Use Ctrl + C twice to exit tmux.

Konzumace klienty ze stejné skupiny se realizuje rozdělením tématu na oddíly.Consumption by clients within the same group is handled through the partitions for the topic. V tomto vzorovém kódu má dříve vytvořené téma test osm oddílů.In this code sample, the test topic created earlier has eight partitions. Pokud spustíte osm konzumentů, každý z nich bude číst záznamy z jednoho oddílu tématu.If you start eight consumers, each consumer reads records from a single partition for the topic.

Důležité

Ve skupině příjemců nemůže být víc instancí konzumentů než má téma oddílů.There cannot be more consumer instances in a consumer group than partitions. V tomto příkladu může skupina konzumentů obsahovat až osm konzumentů, protože to je počet oddílů tématu.In this example, one consumer group can contain up to eight consumers since that is the number of partitions in the topic. Nebo můžete mít více skupin konzumentů, každou s maximálně osmi konzumenty.Or you can have multiple consumer groups, each with no more than eight consumers.

Záznamy se v systému Kafka ukládají v pořadí, ve kterém je oddíl přijme.Records stored in Kafka are stored in the order they are received within a partition. Pro dosažení doručování záznamů ve správném pořadí v rámci oddílu vytvořte skupinu příjemců, ve které bude počet instancí konzumentů odpovídat počtu oddílů.To achieve in-ordered delivery for records within a partition, create a consumer group where the number of consumer instances matches the number of partitions. Pro dosažení doručování záznamů ve správném pořadí v rámci tématu vytvořte skupinu obsahující pouze jednu instanci konzumenta.To achieve in-ordered delivery for records within the topic, create a consumer group with only one consumer instance.

Vyčištění prostředkůClean up resources

Pokud chcete vyčistit prostředky vytvořené v tomto kurzu, můžete odstranit skupinu prostředků.To clean up the resources created by this tutorial, you can delete the resource group. Odstraněním skupiny prostředků odstraníte také přidružený cluster HDInsight a všechny další prostředky, které jsou k příslušné skupině prostředků přidružené.Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

Odebrání skupiny prostředků pomocí webu Azure Portal:To remove the resource group using the Azure portal:

  1. Na webu Azure Portal rozbalením nabídky na levé straně otevřete nabídku služeb a pak zvolte Skupiny prostředků. Zobrazí se seznam skupin prostředků.In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. Vyhledejte skupinu prostředků, kterou chcete odstranit, a klikněte pravým tlačítkem na tlačítko Další (...) na pravé straně seznamu.Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. Vyberte Odstranit skupinu prostředků a potvrďte tuto akci.Select Delete resource group, and then confirm.

Další postupNext steps

V tomto dokumentu jste zjistili, jak používat Apache Kafka Producer and Consumer API s využitím Kafka v HDInsight.In this document, you learned how to use the Apache Kafka Producer and Consumer API with Kafka on HDInsight. Další informace o práci s platformou Kafka najdete v těchto zdrojích:Use the following to learn more about working with Kafka: