Esercitazione: Usare l'API dei flussi Apache Kafka in Azure HDInsight

Informazioni su come creare un'applicazione che usa l'API Apache Kafka Streams ed eseguirla con Kafka in HDInsight.

In questa esercitazione viene usata un'applicazione di conteggio delle parole. L'applicazione legge i dati di testo da un argomento Kafka, estrae singole parole e quindi archivia il conteggio delle parole in un altro argomento Kafka.

L'elaborazione dei flussi di Kafka viene spesso eseguita usando Apache Spark o Apache Storm. Con Kafka versione 1.1.0 (in HDInsight 3.5 e 3.6) è stata introdotta l'API Kafka Streams, che consente di trasformare i flussi di dati tra argomenti di input e argomenti di output. In alcuni casi, questa può essere una valida alternativa alla creazione di una soluzione di streaming Spark or Storm.

Per altre informazioni su Kafka Streams, vedere la documentazione di introduzione ai flussi su Apache.org.

In questa esercitazione verranno illustrate le procedure per:

  • Informazioni sul codice
  • Compilare e distribuire l'applicazione
  • Configurare gli argomenti Kafka
  • Eseguire il codice

Prerequisiti

Informazioni sul codice

L'applicazione di esempio si trova in https://github.com/Azure-Samples/hdinsight-kafka-java-get-started nella sottodirectory Streaming. L'applicazione è costituita da due file:

  • pom.xml: Questo file definisce le dipendenze progetto, la versione Java e i metodi di creazione pacchetti.
  • Stream.java: implementa la logica di flusso.

Pom.xml

Gli aspetti importanti da comprendere nel file pom.xml sono:

  • Dipendenze: questo progetto si basa sull'API Kafka Streams, che è disponibile nel pacchetto kafka-clients. Il codice XML seguente definisce questa dipendenza:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    La voce ${kafka.version} viene dichiarata nella sezione <properties>..</properties> di pom.xml ed è configurata per la versione Kafka del cluster HDInsight.

  • Plug-in: I plug-in Maven offrono varie funzionalità. In questo progetto vengono usati i plug-in seguenti:

    • maven-compiler-plugin: Usato per impostare su 8 la versione Java usata dal progetto. Java 8 è richiesto da HDInsight 3.6.
    • maven-shade-plugin: usato per generare un file jar uber che contiene questa applicazione e le eventuali dipendenze. Viene inoltre usato per impostare il punto di ingresso dell'applicazione, in modo che sia possibile eseguire il file Jar direttamente senza dover specificare la classe principale.

Stream.Java

Il file Stream.java usa l'API Streams per implementare un'applicazione di conteggio delle parole. Legge i dati da un argomento Kafka denominato test e scrive il conteggio delle parole in un argomento denominato wordcounts.

Il codice seguente descrive l'applicazione di conteggio delle parole:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Creare e distribuire l'esempio

Per creare e distribuire il progetto in un cluster Kafka in HDInsight, seguire questa procedura:

  1. Impostare la directory corrente sul percorso della directory hdinsight-kafka-java-get-started-master\Streaming e quindi usare il comando seguente per creare un pacchetto JAR:

    mvn clean package
    

    Questo comando crea il pacchetto in target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Sostituire sshuser con il nome utente SSH del cluster e sostituire clustername con il nome del cluster. Usare il comando seguente per copiare il file kafka-streaming-1.0-SNAPSHOT.jar nel cluster HDInsight. Quando richiesto, immettere la password per l'account utente SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Creare gli argomenti di Apache Kafka

  1. Sostituire sshuser con il nome utente SSH del cluster e sostituire CLUSTERNAME con il nome del cluster. Per aprire una connessione SSH al cluster, immettere il comando seguente. Quando richiesto, immettere la password per l'account utente SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Installare jq, un processore JSON da riga di comando. Dalla connessione SSH aperta, immettere il comando seguente per installare jq:

    sudo apt -y install jq
    
  3. Configurare la variabile di password. Sostituire PASSWORD con la password di accesso al cluster e quindi immettere il comando:

    export password='PASSWORD'
    
  4. Estrarre il nome del cluster con l'uso corretto di maiuscole e minuscole. L'uso effettivo di maiuscole e minuscole nel nome del cluster può differire dal previsto, a seconda della modalità di creazione del cluster. Questo comando otterrà la combinazione di maiuscole e minuscole effettiva e quindi la archivierà in una variabile. Immettere il comando seguente:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Nota

    Se si esegue questo processo dall'esterno del cluster, è disponibile una procedura diversa per l'archiviazione del nome del cluster. Recuperare il nome del cluster in lettere minuscole dal portale di Azure. Sostituire quindi <clustername> con il nome del cluster nel comando seguente ed eseguire il comando: export clusterName='<clustername>'.

  5. Per ottenere gli host del broker Kafka e gli host Apache Zookeeper, usare i comandi seguenti. Quando richiesto, immettere la password dell'account (admin) di accesso al cluster.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Questi comandi richiedono l'accesso a Ambari. Se il cluster è protetto da un gruppo NSG, eseguire questi comandi da un computer in grado di accedere ad Ambari.

  6. Per creare gli argomenti usati dall'operazione di streaming, usare i comandi seguenti:

    Nota

    È possibile che venga visualizzato un errore che indica che l'argomento test esiste già. Non è un problema poiché potrebbe essere stato creato nell'esercitazione dell'API Producer e Consumer.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Gli argomenti vengono usati per gli scopi seguenti:

    • test: in questo argomento vengono ricevuti i record. Applicazione di streaming legge i dati da questo argomento.
    • wordcounts: in questo argomento l'applicazione di streaming archivia l'output.
    • RekeyedIntermediateTopic: questo argomento viene usato per partizionare nuovamente i dati mentre il conteggio viene aggiornato dall'operatore countByKey.
    • wordcount-example-Counts-changelog: questo argomento è un archivio di stati usato dall'operazione countByKey.

    È possibile configurare Kafka in HDInsight anche in modo che gli argomenti vengano creati automaticamente. Per altre informazioni, vedere il documento Configure automatic topic creation (Configurare la creazione automatica degli argomenti).

Eseguire il codice

  1. Per avviare l'applicazione di streaming come processo in background, usare il comando seguente:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    È possibile che venga visualizzato un avviso relativo ad Apache log4j. Questo avviso può essere ignorato.

  2. Per inviare i record all'argomento test, usare il comando seguente per avviare l'applicazione producer:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Al termine dell'elaborazione del producer, usare il comando seguente per visualizzare le informazioni archiviate nell'argomento wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    I parametri --property indicano al consumer di console di stampare sia la chiave (parola) sia il numero (valore). Questo parametro configura anche il deserializzatore da usare durante la lettura dei valori da Kafka.

    L'output è simile al testo seguente:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    Il parametro --from-beginning configura il consumer in modo che venga avviato all'inizio dei record archiviati nell'argomento. Il conteggio viene incrementato ogni volta che viene rilevata una parola, pertanto l'argomento contiene più voci per ogni parola, con un numero crescente.

  4. Usare Ctrl + C per chiudere il producer. Usare ancora Ctrl + C per chiudere l'applicazione e il consumer.

  5. Per eliminare gli argomenti usati dall'operazione di streaming, eseguire questi comandi:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Pulire le risorse

Per pulire le risorse create da questa esercitazione, eliminare il gruppo di risorse. Se si elimina il gruppo di risorse, vengono eliminati anche il cluster HDInsight associato e tutte le altre risorse correlate al gruppo di risorse.

Per rimuovere il gruppo di risorse usando il portale di Azure:

  1. Nel portale di Azure espandere il menu a sinistra per aprire il menu dei servizi e quindi scegliere Gruppi di risorse per visualizzare l'elenco dei gruppi di risorse.
  2. Individuare il gruppo di risorse da eliminare e quindi fare clic con il pulsante destro del mouse su Altro (...) a destra dell'elenco.
  3. Scegliere Elimina gruppo di risorse e quindi confermare.

Passaggi successivi

In questo documento si è appreso come usare l'API Apache Kafka Streams con Kafka in HDInsight. Per altre informazioni sull'uso di Kafka, vedere: