Zelfstudie: Werken met de Producer- en Consumer-API's van Apache KafkaTutorial: Use the Apache Kafka Producer and Consumer APIs

Leer hoe u de Producer- en Consumer-API's van Apache Kafka gebruikt met Kafka in HDInsight.Learn how to use the Apache Kafka Producer and Consumer APIs with Kafka on HDInsight.

Met de Producer-API van Kafka kunnen toepassingen gegevensstromen naar het Kafka-cluster verzenden.The Kafka Producer API allows applications to send streams of data to the Kafka cluster. Met de Consumer-API van Kafka kunnen toepassingen gegevensstromen uit het cluster lezen.The Kafka Consumer API allows applications to read streams of data from the cluster.

In deze zelfstudie leert u het volgende:In this tutorial, you learn how to:

  • VereistenPrerequisites
  • De code begrijpenUnderstand the code
  • De toepassing compileren en implementerenBuild and deploy the application
  • De toepassing uitvoeren in het clusterRun the application on the cluster

Meer informatie over de Producer-API en de Consumer-API kunt u lezen in de Apache-documentatie (Engelstalig).For more information on the APIs, see Apache documentation on the Producer API and Consumer API.

VereistenPrerequisites

De code begrijpenUnderstand the code

De voorbeeldtoepassing bevindt zich op https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in de submap Producer-Consumer.The example application is located at https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in the Producer-Consumer subdirectory. De toepassing bestaat hoofdzakelijk uit vier bestanden:The application consists primarily of four files:

  • pom.xml: met dit bestand worden de projectafhankelijkheden, de Java-versie en de pakketmethoden gedefinieerd.pom.xml: This file defines the project dependencies, Java version, and packaging methods.
  • Producer.java: met dit bestand worden willekeurige zinnen naar Kafka verzonden met behulp van de Producer-API.Producer.java: This file sends random sentences to Kafka using the producer API.
  • Consumer.java: dit bestand gebruikt de Consumer-API om gegevens te lezen uit Kafka en deze te verzenden naar STDOUT.Consumer.java: This file uses the consumer API to read data from Kafka and emit it to STDOUT.
  • Run.java: de opdrachtregelinterface die wordt gebruikt voor het uitvoeren van de Producer- en Consumer-code.Run.java: The command-line interface used to run the producer and consumer code.

Pom.xmlPom.xml

Belangrijke aandachtspunten voor het bestand pom.xml:The important things to understand in the pom.xml file are:

  • Afhankelijkheden: dit project is afhankelijk van de Kafka-API's Producer en Consumer, die worden geleverd door het pakket kafka-clients.Dependencies: This project relies on the Kafka producer and consumer APIs, which are provided by the kafka-clients package. Deze afhankelijkheid wordt gedefinieerd met de volgende XML-code:The following XML code defines this dependency:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    

    De vermelding ${kafka.version} wordt gedeclareerd in de sectie <properties>..</properties> van pom.xml, en wordt geconfigureerd voor de Kafka-versie van het HDInsight-cluster.The ${kafka.version} entry is declared in the <properties>..</properties> section of pom.xml, and is configured to the Kafka version of the HDInsight cluster.

  • Invoegtoepassingen: de Maven-invoegtoepassingen bieden diverse mogelijkheden.Plugins: Maven plugins provide various capabilities. In dit project worden de volgende plugins of invoegtoepassingen gebruikt:In this project, the following plugins are used:

    • maven-compiler-plugin: wordt gebruikt om de Java-versie die wordt gebruikt door het project in te stellen op 8.maven-compiler-plugin: Used to set the Java version used by the project to 8. Dit is de versie van Java die door HDInsight 3.6 wordt gebruikt.This is the version of Java used by HDInsight 3.6.
    • maven-shade-plugin: wordt gebruikt voor het genereren van een uber jar die deze toepassing bevat, evenals eventuele afhankelijkheden.maven-shade-plugin: Used to generate an uber jar that contains this application as well as any dependencies. Dit bestand wordt ook gebruikt om het toegangspunt van de toepassing in te stellen, zodat u het Jar-bestand rechtstreeks kunt uitvoeren, dus zonder de hoofdklasse op te geven.It is also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.

Producer.javaProducer.java

De producer communiceert met de Kafka-brokerhosts (werkknooppunten) en verzendt gegevens naar een Kafka-onderwerp.The producer communicates with the Kafka broker hosts (worker nodes) and sends data to a Kafka topic. Het volgende codefragment is afkomstig uit de Producer.java -bestand uit de GitHub-opslagplaats en laat zien hoe de producent-eigenschappen in te stellen:The following code snippet is from the Producer.java file from the GitHub repository and shows how to set the producer properties:

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.javaConsumer.java

De Consumer-API communiceert met de Kafka-brokerhosts (werkknooppunten) en leest records in een lus.The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. Met het volgende codefragment van het bestand Consumer.java worden de Consumer-eigenschappen ingesteld:The following code snippet from the Consumer.java file sets the consumer properties:

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

In deze code is de Consumer-API geconfigureerd voor het lezen vanaf het begin van het onderwerp (auto.offset.reset is ingesteld op earliest.)In this code, the consumer is configured to read from the start of the topic (auto.offset.reset is set to earliest.)

Run.javaRun.java

De Run.java bestand biedt een opdrachtregelinterface die wordt uitgevoerd op de producent of de consument code.The Run.java file provides a command-line interface that runs either the producer or consumer code. U moet de gegevens van de Kafka-brokerhost opgeven als een parameter.You must provide the Kafka broker host information as a parameter. U kunt eventueel een groep-ID-waarde, die wordt gebruikt door het consumerproces opnemen.You can optionally include a group ID value, which is used by the consumer process. Als u meerdere consumentexemplaren met behulp van dezelfde groep-ID maakt, worden ze lezen van het onderwerp Verdeel de belasting.If you create multiple consumer instances using the same group ID, they will load balance reading from the topic.

Het voorbeeld compileren en implementerenBuild and deploy the example

  1. Downloaden en uitpakken van de voorbeelden van https://github.com/Azure-Samples/hdinsight-kafka-java-get-started .Download and extract the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Instellen van uw huidige map naar de locatie van de hdinsight-kafka-java-get-started\Producer-Consumer directory en gebruik de volgende opdracht uit:Set your current directory to the location of the hdinsight-kafka-java-get-started\Producer-Consumer directory and use the following command:

    mvn clean package
    

    Met deze opdracht maakt u een directory met de naam target, die een bestand met de naam kafka-producer-consumer-1.0-SNAPSHOT.jar bevat.This command creates a directory named target, that contains a file named kafka-producer-consumer-1.0-SNAPSHOT.jar.

  3. Vervang sshuser door de SSH-gebruiker voor uw cluster en CLUSTERNAME door de naam van het cluster.Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. Voer de volgende opdracht om te kopiëren de kafka-producer-consumer-1.0-SNAPSHOT.jar bestand met uw HDInsight-cluster.Enter the following command to copy the kafka-producer-consumer-1.0-SNAPSHOT.jar file to your HDInsight cluster. Voer het wachtwoord van de SSH-gebruiker in wanneer hierom wordt gevraagd.When prompted enter the password for the SSH user.

    scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

Het voorbeeld uitvoerenRun the example

  1. Vervang sshuser door de SSH-gebruiker voor uw cluster en CLUSTERNAME door de naam van het cluster.Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. Open een SSH-verbinding met het cluster, met de volgende opdracht.Open an SSH connection to the cluster, by entering the following command. Voer het wachtwoord voor het SSH-gebruikersaccount in wanneer hierom wordt gevraagd.If prompted, enter the password for the SSH user account.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Installeer jq, een opdrachtregelprogramma JSON-processor.Install jq, a command-line JSON processor. Van de SSH-verbinding openen, voert u de volgende opdracht uit om te installeren jq:From the open SSH connection, enter following command to install jq:

    sudo apt -y install jq
    
  3. Omgevingsvariabelen instellen.Set up environment variables. Vervang PASSWORD en CLUSTERNAME naam respectievelijk met het wachtwoord voor clusteraanmelding en cluster, voert u de opdracht:Replace PASSWORD and CLUSTERNAME with the cluster login password and cluster name respectively, then enter the command:

    export password='PASSWORD'
    export clusterNameA='CLUSTERNAME'
    
  4. De naam van cluster extract correct-indeling.Extract correctly cased cluster name. De werkelijke behuizing van de naam van het cluster is mogelijk anders dan u verwacht, afhankelijk van hoe het cluster is gemaakt.The actual casing of the cluster name may be different than you expect, depending on how the cluster was created. Met deze opdracht wordt het werkelijke hoofdlettergebruik verkrijgen, opslaan in een variabele en vervolgens de naam van de juiste omhuld en de naam die u eerder hebt opgegeven, worden weergegeven.This command will obtain the actual casing, store it in a variable, and then display the correctly cased name, and the name you provided earlier. Voer de volgende opdracht in:Enter the following command:

    export clusterName=$(curl -u admin:$password -sS -G "https://$clusterNameA.azurehdinsight.net/api/v1/clusters" \
     | jq -r '.items[].Clusters.cluster_name')
    echo $clusterName, $clusterNameA
    
  5. Als u de Kafka-broker-hosts en de Apache Zookeeper-hosts, gebruikt u de volgende opdracht uit:To get the Kafka broker hosts and the Apache Zookeeper hosts, use the following command:

    export KAFKABROKERS=`curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER \
     | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  6. Kafka-onderwerp maken myTest, met de volgende opdracht:Create Kafka topic, myTest, by entering the following command:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  7. Gebruik de volgende opdracht om de Producer-API uit te voeren en gegevens te schrijven naar het onderwerp:To run the producer and write data to the topic, use the following command:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  8. Als dit proces is voltooid, gebruikt u de volgende opdracht om gegevens uit het onderwerp te lezen:Once the producer has finished, use the following command to read from the topic:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    

    De gelezen records worden weergegeven, samen met een telling van de records.The records read, along with a count of records, is displayed.

  9. Gebruik Ctrl + C om de consument af te sluiten.Use Ctrl + C to exit the consumer.

Meerdere consumentenMultiple consumers

Kafka-consumenten gebruiken een consumentengroep bij het lezen van records.Kafka consumers use a consumer group when reading records. Door dezelfde groep voor meerdere consumenten te gebruiken, worden leestaken voor onderwerpen gelijk verdeeld.Using the same group with multiple consumers results in load balanced reads from a topic. Elke consument in de groep ontvangt een deel van de records.Each consumer in the group receives a portion of the records.

De Consumer-toepassing accepteert een parameter die wordt gebruikt als de groeps-id.The consumer application accepts a parameter that is used as the group ID. Met de volgende opdracht start u bijvoorbeeld een Consumer met behulp van de groeps-id myGroup:For example, the following command starts a consumer using a group ID of myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Gebruik Ctrl + C om de consument af te sluiten.Use Ctrl + C to exit the consumer.

Om dit proces in actie te zien, gebruikt u de volgende opdracht:To see this process in action, use the following command:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

Deze opdracht gebruikt tmux om de terminal op te splitsen in twee kolommen.This command uses tmux to split the terminal into two columns. In elke kolom wordt een Consumer gestart, met dezelfde waarde voor de groeps-id.A consumer is started in each column, with the same group ID value. Als de Consumers klaar zijn met lezen, ziet u dat ieder Consumer slechts een deel van de records heeft gelezen.Once the consumers finish reading, notice that each read only a portion of the records. Gebruik Ctrl + C twee keer om af te sluiten tmux.Use Ctrl + C twice to exit tmux.

Gebruik door clients binnen dezelfde groep wordt verwerkt door de partities voor het onderwerp.Consumption by clients within the same group is handled through the partitions for the topic. Het eerder gemaakte onderwerp test uit dit codevoorbeeld heeft acht partities.In this code sample, the test topic created earlier has eight partitions. Als u acht Consumers start, leest elke Consumer records uit één partitie van het onderwerp.If you start eight consumers, each consumer reads records from a single partition for the topic.

Belangrijk

Een consumentengroep kan niet meer consumentexemplaren dan partities bevatten.There cannot be more consumer instances in a consumer group than partitions. In dit voorbeeld kan één consumentengroep maximaal acht consumenten bevatten, omdat het onderwerp dit aantal partities heeft.In this example, one consumer group can contain up to eight consumers since that is the number of partitions in the topic. U kunt ook meerdere consumentengroepen hebben, waarvan elke groep niet meer dan acht consumenten bevat.Or you can have multiple consumer groups, each with no more than eight consumers.

Records worden in Kafka opgeslagen in de volgorde waarin deze worden ontvangen binnen een partitie.Records stored in Kafka are stored in the order they are received within a partition. Als u records binnen een partitie op volgorde wilt leveren, maakt u een consumentengroep waarvan het aantal consumentexemplaren gelijk is aan het aantal partities.To achieve in-ordered delivery for records within a partition, create a consumer group where the number of consumer instances matches the number of partitions. Als u records binnen het onderwerp op volgorde wilt leveren, maakt u een consumentengroep met slechts één consumentexemplaar.To achieve in-ordered delivery for records within the topic, create a consumer group with only one consumer instance.

Resources opschonenClean up resources

Als u de in deze zelfstudie gemaakte resources wilt opschonen, kunt u de resourcegroep verwijderen.To clean up the resources created by this tutorial, you can delete the resource group. Als u de resourcegroep verwijdert, worden ook het bijbehorende HDInsight-cluster en eventuele andere resources die aan de resourcegroep zijn gekoppeld, verwijderd.Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

Ga als volgt te werk om de resourcegroep te verwijderen in Azure Portal:To remove the resource group using the Azure portal:

  1. Vouw het menu aan de linkerkant in Azure Portal uit om het menu met services te openen en kies Resourcegroepen om de lijst met resourcegroepen weer te geven.In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. Zoek de resourcegroep die u wilt verwijderen en klik met de rechtermuisknop op de knop Meer (... ) aan de rechterkant van de vermelding.Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. Selecteer Resourcegroep verwijderen en bevestig dit.Select Delete resource group, and then confirm.

Volgende stappenNext steps

In dit document hebt u geleerd hoe u de Producer- en Consumer-API's van Apache Kafka gebruikt met Kafka in HDInsight.In this document, you learned how to use the Apache Kafka Producer and Consumer API with Kafka on HDInsight. Gebruik de volgende documenten voor meer informatie over het werken met Kafka:Use the following to learn more about working with Kafka: