Share via


Zelfstudie: Apache Kafka streams-API gebruiken in Azure HDInsight

Leer hoe u een toepassing maakt die gebruikmaakt van de Streams-API van Apache Kafka en hoe u deze uitvoert met Kafka in HDInsight.

De voorbeeldtoepassing die wordt gebruikt in deze zelfstudie, is een app voor het tellen van woorden die via een stream worden aangeboden. Eerst worden er tekstgegevens gelezen uit een onderwerp van Kafka, vervolgens worden afzonderlijke woorden uitgepakt en ten slotte worden de woorden en het aantal woorden opgeslagen in een ander Kafka-onderwerp.

Kafka-stroomverwerking wordt vaak uitgevoerd met Behulp van Apache Spark. Kafka versie 2.1.1 en 2.4.1 (in HDInsight 4.0 en 5.0) ondersteunt de Kafka Streams-API. Met deze API kunt u gegevensstromen transformeren tussen invoer- en uitvoeronderwerpen.

Meer informatie over de Streams-API van Kafka vindt u in het Engelstalige artikel Intro to Streams op Apache.org.

In deze zelfstudie leert u het volgende:

  • De code begrijpen
  • De toepassing compileren en implementeren
  • Kafka onderwerpen configureren
  • De code uitvoeren

Vereisten

De code begrijpen

De voorbeeldtoepassing bevindt zich op https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in de submap Streaming. De toepassing bestaat uit twee bestanden:

  • pom.xml: dit bestand definieert de projectafhankelijkheden, de Java-versie en de pakketmethoden.
  • Stream.java: dit bestand implementeert de streaming-logica.

Pom.xml

Belangrijke aandachtspunten voor het bestand pom.xml:

  • Afhankelijkheden: dit project is afhankelijk van de Kafka-API Streams, die wordt geleverd door het pakket kafka-clients. Deze afhankelijkheid wordt gedefinieerd met de volgende XML-code:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    De vermelding ${kafka.version} wordt gedeclareerd in de sectie <properties>..</properties> van pom.xml, en wordt geconfigureerd voor de Kafka-versie van het HDInsight-cluster.

  • Plugins: de Maven-plugins bieden diverse mogelijkheden. In dit project worden de volgende plugins of invoegtoepassingen gebruikt:

    • maven-compiler-plugin: wordt gebruikt om de Java-versie die wordt gebruikt door het project in te stellen op 8. HDInsight 4.0 en 5.0 vereist Java 8.
    • maven-shade-plugin: Wordt gebruikt voor het genereren van een uber jar die deze toepassing en eventuele afhankelijkheden bevat. Dit bestand wordt ook gebruikt om het toegangspunt van de toepassing in te stellen, zodat u het Jar-bestand rechtstreeks kunt uitvoeren, dus zonder de hoofdklasse op te geven.

Stream.java

Het bestand Stream.java gebruikt de Streams-API voor het implementeren van een toepassing voor het tellen van woorden. De toepassing leest gegevens uit een Kafka-onderwerp met de naam test en schrijft het gelezen aantal woorden naar een onderwerp met de naam wordcounts.

Met de volgende code wordt de toepassing gedefinieerd:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Het voorbeeld compileren en implementeren

Als u het project wilt implementeren in het Kafka-cluster in HDInsight, voert u de volgende stappen uit:

  1. Stel uw huidige mappen in op de locatie van de map hdinsight-kafka-java-get-started-master\Streaming en gebruik vervolgens deze opdracht om een JAR-pakket te maken:

    mvn clean package
    

    Met deze opdracht maakt u het pakket op target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Vervang sshuser door de SSH-gebruiker voor uw cluster en clustername door de naam van het cluster. Gebruik de volgende opdracht om het bestand kafka-streaming-1.0-SNAPSHOT.jar naar uw HDInsight-cluster te kopiëren. Voer het wachtwoord voor het SSH-gebruikersaccount in wanneer hierom wordt gevraagd.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Apache Kafka-onderwerpen maken

  1. Vervang sshuser door de SSH-gebruiker voor uw cluster en CLUSTERNAME door de naam van het cluster. Gebruik de volgende opdracht om een SSH-verbinding naar het cluster te openen. Voer het wachtwoord voor het SSH-gebruikersaccount in wanneer hierom wordt gevraagd.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Installeer jq, een opdrachtregel-JSON-processor. Voer in de open SSH-verbinding de volgende opdracht in om jq te installeren:

    sudo apt -y install jq
    
  3. wachtwoordvariabele instellen. Vervang PASSWORD door het aanmeldwachtwoord voor het cluster en voer de volgende opdracht in:

    export PASSWORD='PASSWORD'
    
  4. extraheer de clusternaam met de juiste letters. De daadwerkelijke lettergrootte van de clusternaam kan anders zijn dan verwacht, afhankelijk van hoe het cluster is gemaakt. Met deze opdracht wordt de werkelijke behuizing verkregen en vervolgens opgeslagen in een variabele. Voer de volgende opdracht in:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Notitie

    Als u dit proces van buiten het cluster uitvoert, is er een andere procedure voor het opslaan van de clusternaam. Haal de clusternaam op in kleine letters uit de Azure-portal. Vervang vervolgens de clusternaam voor <clustername> in de volgende opdracht en voer deze uit: export clusterName='<clustername>'.

  5. Gebruik de volgende opdrachten als u de Kafka-brokerhosts en de Apache Zookeeper-hosts wilt opvragen. Voer desgevraagd het wachtwoord voor het account voor clusteraanmelding (admin).

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Notitie

    Voor deze opdrachten is toegang tot Ambari vereist. Als uw cluster zich achter een NSG bevindt, voert u deze opdrachten uit vanaf een computer die toegang heeft tot Ambari.

  6. Gebruik de volgende opdrachten om de onderwerpen te maken die worden gebruikt door de streaming-bewerking:

    Notitie

    Er kan een foutbericht worden weergegeven dat het onderwerp test al bestaat. Dit is geen probleem omdat het onderwerp mogelijk in de zelfstudie over de Producer- en Consumer-API's van Apache Kafka is gemaakt.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    De onderwerpen worden gebruikt voor de volgende doeleinden:

    • test: dit is het onderwerp waarin records worden ontvangen. De streaming-toepassing leest uit dit onderwerp.
    • wordcounts: dit is het onderwerp waarin de uitvoer van de streaming-toepassing wordt opgeslagen.
    • RekeyedIntermediateTopic: dit onderwerp wordt gebruikt voor het opnieuw partitioneren van gegevens wanneer het aantal woorden wordt bijgewerkt door de operator countByKey.
    • wordcount-example-Counts-changelog: in dit onderwerp worden statuswaarden opgeslagen die worden gebruikt door de bewerking countByKey.

    Kafka in HDInsight kan ook worden geconfigureerd voor het automatisch maken van onderwerpen. Zie How to configure Apache Kafka on HDInsight to automatically create topics (Apache Kafka in HDInsight configureren voor het automatisch maken van onderwerpen) voor meer informatie.

De code uitvoeren

  1. Gebruik de volgende opdracht om de streaming-toepassing te starten als een achtergrondproces:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Mogelijk krijgt u een waarschuwing over Apache log4j. U kunt deze waarschuwing negeren.

  2. Als u records wilt verzenden naar het onderwerp test, gebruikt u de volgende opdracht gebruiken om de Producer-toepassing te starten:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Zodra de toepassing is voltooid, gebruikt u de volgende opdracht om de informatie weer te geven die is opgeslagen in het onderwerp wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    De parameters --property geven de Consumer-API opdracht om de key (het woord) samen met de count (waarde) weer te geven. Deze parameter configureert ook welke deserializer moet worden gebruikt bij het lezen van deze waarden uit Kafka.

    De uitvoer lijkt op het volgende:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    De parameter --from-beginning configureert de Consumer om te beginnen bij het begin van de records die zijn opgeslagen in het onderwerp. Het aantal wordt met elk ontvangen woord opgehoogd, zodat het onderwerp meerdere vermeldingen voor elk woord bevat, met een oplopend aantal.

  4. Gebruik Ctrl+C om de Producer af te sluiten. Blijf op Ctrl+C drukken om de toepassing en de Consumer af te sluiten.

  5. Gebruik de volgende opdrachten om de onderwerpen te verwijderen die worden gebruikt door de streaming-bewerking:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Resources opschonen

Als u de in deze zelfstudie gemaakte resources wilt opschonen, kunt u de resourcegroep verwijderen. Als u de resourcegroep verwijdert, worden ook het bijbehorende HDInsight-cluster en eventuele andere resources die aan de resourcegroep zijn gekoppeld, verwijderd.

Ga als volgt te werk om de resourcegroep te verwijderen in Azure Portal:

  1. Vouw het menu aan de linkerkant in Azure Portal uit om het menu met services te openen en kies Resourcegroepen om de lijst met resourcegroepen weer te geven.
  2. Zoek de resourcegroep die u wilt verwijderen en klik met de rechtermuisknop op de knop Meer (... ) aan de rechterkant van de vermelding.
  3. Selecteer Resourcegroep verwijderen en bevestig dit.

Volgende stappen

In dit document hebt u geleerd hoe u de Streams-API van Apache Kafka gebruikt met Kafka in HDInsight. Gebruik de volgende documenten voor meer informatie over het werken met Kafka.