Samouczek: używanie interfejsu API strumieni platformy Apache Kafka w usłudze Azure HDInsight

Dowiedz się, jak utworzyć aplikację, która używa interfejsu API strumieni platformy Apache Kafka, i uruchomić ją na platformie Kafka w usłudze HDInsight.

Aplikacja przedstawiona w tym samouczku zlicza przesyłane strumieniowo wyrazy. Odczytuje ona dane tekstowe z tematu platformy Kafka, wyodrębnia poszczególne wyrazy, a następnie zapisuje liczbę wyrazów w innym temacie platformy Kafka.

Przetwarzanie strumieni platformy Kafka jest często wykonywane przy użyciu platformy Apache Spark. Platforma Kafka w wersji 2.1.1 i 2.4.1 (w usługach HDInsight 4.0 i 5.0) obsługuje interfejs API platformy Kafka Strumienie. Ten interfejs API umożliwia przekształcanie strumieni danych między tematami wejściowymi i wyjściowymi.

Aby uzyskać więcej informacji o strumieniach platformy Kafka, zobacz dokumentację Intro to Streams (Wprowadzenie do strumieni) w serwisie Apache.org.

Z tego samouczka dowiesz się, jak wykonywać następujące czynności:

  • Zrozumienie kodu
  • Kompilowanie i wdrażanie aplikacji
  • Konfigurowanie tematów platformy Kafka
  • Uruchamianie kodu

Wymagania wstępne

Zrozumienie kodu

Przykładowa aplikacja znajduje się pod adresem https://github.com/Azure-Samples/hdinsight-kafka-java-get-started w podkatalogu Streaming. Aplikacja składa się z dwóch plików:

  • pom.xml: w tym pliku są definiowane zależności projektu, wersja języka Java i metody pakowania.
  • Stream.java: ten plik implementuje logikę przesyłania strumieniowego.

Pom.xml

Należy zrozumieć następujące ważne kwestie dotyczące pliku pom.xml:

  • Zależności: ten projekt bazuje na interfejsie API strumieni platformy Kafka, który jest udostępniany w pakiecie kafka-clients. Ta zależność jest definiowana przez następujący kod XML:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    Wpis ${kafka.version} jest zadeklarowany w sekcji <properties>..</properties> pliku pom.xml i jest skonfigurowany zgodnie z wersją platformy Kafka znajdującą się w klastrze usługi HDInsight.

  • Wtyczki: wtyczki Maven zapewniają różne możliwości. W tym projekcie są używane następujące wtyczki:

    • maven-compiler-plugin: służy do ustawiania wersji 8 języka Java używanej przez projekt. Usługi HDInsight 4.0 i 5.0 wymagają środowiska Java 8.
    • maven-shade-plugin: służy do generowania pliku jar uber zawierającego tę aplikację i wszelkich zależności. Służy również do ustawiania punktu wejścia aplikacji, dzięki czemu można bezpośrednio uruchomić plik Jar bez konieczności określania klasy głównej.

Stream.java

Plik Stream.java używa interfejsu API strumieni do zaimplementowania aplikacji liczącej wyrazy. Odczytuje on dane z tematu Kafka o nazwie test i zapisuje liczbę wyrazów w temacie o nazwie wordcounts.

Poniższy kod definiuje aplikację do zliczania wyrazów:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Kompilowanie i wdrażanie przykładu

Aby skompilować i wdrożyć projekt na platformie Kafka w klastrze usługi HDInsight, wykonaj następujące kroki:

  1. Ustaw bieżący katalog na lokalizację hdinsight-kafka-java-get-started-master\Streaming katalogu, a następnie użyj następującego polecenia, aby utworzyć pakiet jar:

    mvn clean package
    

    To polecenie tworzy pakiet w lokalizacji target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Zamień ciąg sshuser na nazwę użytkownika SSH klastra i zamień ciąg clustername na nazwę klastra. Użyj następującego polecenia, aby skopiować plik do klastra kafka-streaming-1.0-SNAPSHOT.jar usługi HDInsight. Jeśli zostanie wyświetlony monit, wprowadź hasło konta użytkownika SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Tworzenie tematów platformy Apache Kafka

  1. Zamień ciąg sshuser na nazwę użytkownika SSH klastra i zamień ciąg CLUSTERNAME na nazwę klastra. Otwórz połączenie SSH z klastrem, wprowadzając następujące polecenie. Jeśli zostanie wyświetlony monit, wprowadź hasło konta użytkownika SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Zainstaluj procesor jq, wiersz polecenia JSON. W otwartym połączeniu SSH wprowadź następujące polecenie, aby zainstalować program jq:

    sudo apt -y install jq
    
  3. Konfigurowanie zmiennej hasła. Zastąp PASSWORD ciąg hasłem logowania klastra, a następnie wprowadź polecenie:

    export PASSWORD='PASSWORD'
    
  4. Wyodrębnij poprawnie przypadek nazwy klastra. Rzeczywista wielkość liter nazwy klastra może być inna niż oczekiwano, w zależności od sposobu utworzenia klastra. To polecenie uzyskuje rzeczywistą wielkość liter, a następnie przechowuje ją w zmiennej. Podaj następujące polecenie:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Uwaga

    Jeśli wykonujesz ten proces spoza klastra, istnieje inna procedura przechowywania nazwy klastra. Pobierz nazwę klastra w małym przypadku z witryny Azure Portal. Następnie zastąp nazwę klastra w <clustername> następującym poleceniu i wykonaj ją: export clusterName='<clustername>'.

  5. Aby uzyskać hosty brokera platformy Kafka i hosty usługi Apache Zookeeper, użyj poniższych poleceń. Po wyświetleniu monitu wprowadź hasło dla konta logowania klastra (administratora).

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Uwaga

    Te polecenia wymagają dostępu systemu Ambari. Jeśli klaster znajduje się za sieciową grupą zabezpieczeń, uruchom te polecenia z maszyny, która może uzyskać dostęp do systemu Ambari.

  6. Aby utworzyć tematy używane przez operację przesyłania strumieniowego, użyj następujących poleceń:

    Uwaga

    Może zostać wyświetlony błąd z informacją, że temat test już istnieje. Nie stanowi to problemu, ponieważ ten temat mógł zostać utworzony w samouczku dotyczącym interfejsu API producenta i odbiorcy.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Tematy są używane do następujących celów:

    • test: w tym temacie są odbierane rekordy. Aplikacja do przesyłania strumieniowego odczytuje dane z tego tematu.
    • wordcounts: w tym temacie aplikacja do przesyłania strumieniowego przechowuje swoje dane wyjściowe.
    • RekeyedIntermediateTopic: w tym temacie zachodzi ponowne dzielenie danych, ponieważ liczba wyrazów jest aktualizowana za pomocą operatora countByKey.
    • wordcount-example-Counts-changelog: ten temat jest magazynem stanów używanym przez operację countByKey

    Platformę Kafka w usłudze HDInsight można również skonfigurować w taki sposób, aby automatycznie tworzyła tematy. Aby uzyskać więcej informacji, zobacz dokument Configure automatic topic creation (Konfigurowanie automatycznego tworzenia tematów).

Uruchamianie kodu

  1. Aby uruchomić aplikację do przesyłania strumieniowego jako proces w tle, użyj następującego polecenia:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Może zostać wyświetlone ostrzeżenie dotyczące platformy Apache log4j. To ostrzeżenie można zignorować.

  2. Aby wysyłać rekordy do tematu test, użyj następującego polecenia w celu uruchomienia aplikacji producenta:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Po zakończeniu działania producenta użyj następującego polecenia, aby wyświetlić informacje przechowywane w temacie wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    Dzięki użyciu parametrów --property odbiorca konsoli drukuje zarówno klucz (wyraz), jak i liczbę (wartość). Ten parametr konfiguruje również deserializatora do użycia podczas odczytu tych wartości z platformy Kafka.

    Dane wyjściowe będą podobne do następującego tekstu:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    Parametr --from-beginning konfiguruje odbiorcę, aby rozpoczął przetwarzanie od początku rekordów przechowywanych w temacie. Liczba wystąpień zwiększa się każdorazowo po napotkaniu wyrazu, dlatego temat zawiera wiele pozycji dla każdego wyrazu ze zwiększającą się liczbą wystąpień.

  4. Użyj klawiszy Ctrl + C, aby zakończyć działanie producenta. Podobnie użyj klawiszy Ctrl + C, aby zakończyć działanie aplikacji i odbiorcy.

  5. Aby usunąć tematy używane przez operację przesyłania strumieniowego, użyj następujących poleceń:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Czyszczenie zasobów

Aby wyczyścić zasoby utworzone w tym samouczku, możesz usunąć grupę zasobów. Usunięcie grupy zasobów powoduje również usunięcie skojarzonego klastra usługi HDInsight i wszystkich innych zasobów skojarzonych z tą grupą zasobów.

Aby usunąć grupę zasobów za pomocą witryny Azure Portal:

  1. W witrynie Azure Portal rozwiń menu po lewej stronie, aby otworzyć menu usług, a następnie wybierz pozycję Grupy zasobów, aby wyświetlić listę grup zasobów.
  2. Znajdź grupę zasobów do usunięcia, a następnie kliknij prawym przyciskiem myszy przycisk Więcej (...) po prawej stronie listy.
  3. Wybierz pozycję Usuń grupę zasobów i potwierdź.

Następne kroki

W tym dokumencie zawarto informacje o sposobie korzystania z interfejsu API strumieni platformy Apache Kafka w usłudze HDInsight. Skorzystaj z poniższych informacji, aby dowiedzieć się więcej na temat pracy z platformą Kafka.