Samouczek: korzystanie z interfejsów API producentów i odbiorców platformy Apache Kafka

Informacje o sposobie korzystania z interfejsów API producentów i odbiorców platformy Apache Kafka w usłudze HDInsight.

Interfejs API producenta platformy Kafka umożliwia aplikacjom wysyłanie strumieni danych do klastra Kafka. Interfejs API odbiorcy platformy Kafka umożliwia aplikacjom odczytywanie strumieni danych z klastra.

Ten samouczek zawiera informacje na temat wykonywania następujących czynności:

  • Wymagania wstępne
  • Zrozumienie kodu
  • Kompilowanie i wdrażanie aplikacji
  • Uruchamianie aplikacji w klastrze

Aby uzyskać więcej informacji o tych interfejsach API, zobacz dokumentację platformy Apache dotyczącą interfejsu API producenta i interfejsu API odbiorcy.

Wymagania wstępne

Zrozumienie kodu

Przykładowa aplikacja znajduje się pod adresem https://github.com/Azure-Samples/hdinsight-kafka-java-get-started w podkatalogu Producer-Consumer. Jeśli używasz klastra kafka z włączonym pakietem Enterprise Security (ESP ), należy użyć wersji aplikacji znajdującej się w podkatalogu DomainJoined-Producer-Consumer .

Ta aplikacja składa się zasadniczo z czterech plików:

  • pom.xml: w tym pliku są definiowane zależności projektu, wersja języka Java i metody pakowania.
  • Producer.java: ten plik wysyła losowe zdania do platformy Kafka przy użyciu interfejsu API producenta.
  • Consumer.java: ten plik korzysta z interfejsu API odbiorcy do odczytywania danych z platformy Kafka i przekazywania ich do wyjścia STDOUT.
  • AdminClientWrapper.java: ten plik używa interfejsu API administratora do tworzenia, opisywania i usuwania tematów platformy Kafka.
  • Run.java: interfejs wiersza polecenia używany do uruchamiania kodu producenta i odbiorcy.

Pom.xml

Należy zrozumieć następujące ważne kwestie dotyczące pliku pom.xml:

  • Zależności: ten projekt bazuje na interfejsach API producenta i odbiorcy platformy Kafka, które są udostępniane w pakiecie kafka-clients. Ta zależność jest definiowana przez następujący kod XML:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    Wpis ${kafka.version} jest zadeklarowany w sekcji <properties>..</properties> pliku pom.xml i jest skonfigurowany zgodnie z wersją platformy Kafka znajdującą się w klastrze usługi HDInsight.

  • Wtyczki: wtyczki Maven zapewniają różne możliwości. W tym projekcie są używane następujące wtyczki:

    • maven-compiler-plugin: służy do ustawiania wersji 8 języka Java używanej przez projekt. Jest to wersja języka Java używana przez usługę HDInsight 3.6.
    • maven-shade-plugin: służy do generowania pełnego pliku jar zawierającego tę aplikację, a także wszelkie zależności. Jest ona również używana do ustawiania punktu wejścia aplikacji, dzięki czemu można bezpośrednio uruchamiać plik Jar bez konieczności określania klasy głównej.

Producer.java

Producent komunikuje się z hostami brokera platformy Kafka (węzłami procesu roboczego) i wysyła dane do tematu platformy Kafka. Poniższy fragment kodu pochodzi z pliku Producer.java z repozytorium GitHub i pokazuje, jak ustawić właściwości producenta. W przypadku klastrów z obsługą zabezpieczeń przedsiębiorstwa należy dodać dodatkową właściwość "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.java

Odbiorca komunikuje się z hostami brokera platformy Kafka (węzłami procesu roboczego) i odczytuje rekordy w pętli. Poniższy fragment kodu z pliku Consumer.java ustawia właściwości odbiorcy. W przypadku klastrów z obsługą zabezpieczeń przedsiębiorstwa należy dodać dodatkową właściwość "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

W tym kodzie odbiorca jest skonfigurowany do odczytywania od początku tematu (właściwość auto.offset.reset jest ustawiona na wartość earliest).

Run.java

Plik Run.java udostępnia interfejs wiersza polecenia, który uruchamia kod producenta lub odbiorcy. Jako parametr należy podać informacje o hoście brokera platformy Kafka. Opcjonalnie możesz uwzględnić wartość identyfikatora grupy, która jest używana przez proces odbiorcy. Jeśli utworzysz wiele wystąpień konsumentów przy użyciu tego samego identyfikatora grupy, będą one równoważyć obciążenie odczytu z tematu.

Kompilowanie i wdrażanie przykładu

Używanie wstępnie utworzonych plików JAR

Pobierz pliki jar z przykładu platformy Azure Wprowadzenie do platformy Kafka. Jeśli klaster jest włączony pakiet Enterprise Security (ESP), użyj pliku kafka-producer-consumer-esp.jar. Użyj poniższego polecenia, aby skopiować pliki jar do klastra.

scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

Kompilowanie plików JAR na podstawie kodu

Jeśli chcesz pominąć ten krok, wstępnie utworzone pliki jar można pobrać z podkatalogu Prebuilt-Jars . Pobierz plik kafka-producer-consumer.jar. Jeśli klaster jest włączony pakiet Enterprise Security (ESP), użyj pliku kafka-producer-consumer-esp.jar. Wykonaj krok 3, aby skopiować plik jar do klastra usługi HDInsight.

  1. Pobierz i wyodrębnij przykłady z pliku https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Ustaw bieżący katalog na lokalizację hdinsight-kafka-java-get-started\Producer-Consumer katalogu. Jeśli używasz klastra kafka z włączonym pakietem Enterprise Security (ESP ), ustaw lokalizację na DomainJoined-Producer-Consumerpodkatalog. Użyj następującego polecenia, aby skompilować aplikację:

    mvn clean package
    

    To polecenie tworzy katalog o nazwie target, który zawiera plik o nazwie kafka-producer-consumer-1.0-SNAPSHOT.jar. W przypadku klastrów ESP plik będzie kafka-producer-consumer-esp-1.0-SNAPSHOT.jar

  3. Zamień ciąg sshuser na nazwę użytkownika SSH klastra i zamień ciąg CLUSTERNAME na nazwę klastra. Wprowadź następujące polecenie, aby skopiować plik do klastra kafka-producer-consumer-1.0-SNAPSHOT.jar usługi HDInsight. Po wyświetleniu monitu wprowadź hasło użytkownika SSH.

    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

Uruchamianie przykładu

  1. Zamień ciąg sshuser na nazwę użytkownika SSH klastra i zamień ciąg CLUSTERNAME na nazwę klastra. Otwórz połączenie SSH z klastrem, wprowadzając następujące polecenie. Jeśli zostanie wyświetlony monit, wprowadź hasło konta użytkownika SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Aby uzyskać hosty brokera platformy Kafka, zastąp wartości i <clustername><password> w poniższym poleceniu i wykonaj je. Użyj tej samej wielkości liter, <clustername> jak pokazano w Azure Portal. Zastąp <password> element hasłem logowania klastra, a następnie wykonaj następujące polecenie:

    sudo apt -y install jq
    export CLUSTER_NAME='<clustername>'
    export PASSWORD='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Uwaga

    To polecenie wymaga dostępu ambari. Jeśli klaster znajduje się za sieciową grupą zabezpieczeń, uruchom to polecenie z maszyny, która może uzyskać dostęp do systemu Ambari.

  3. Utwórz temat platformy Kafka, myTest, wprowadzając następujące polecenie:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. Aby uruchomić producenta i zapisać dane w temacie, użyj następującego polecenia:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. Po zakończeniu procesu producenta odczytaj rekordy z tematu za pomocą następującego polecenia:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    Zostanie wyświetlona liczba odczytanych rekordów wraz z liczbą rekordów.

  6. Użyj klawiszy Ctrl+C, aby zakończyć działanie odbiorcy.

Wielu odbiorców

Odbiorcy platformy Kafka używają grupy odbiorców podczas odczytywania rekordów. Korzystanie z tej samej grupy przez wielu odbiorców umożliwia równoważenie obciążenia podczas przeprowadzania odczytu z tematu. Każdy odbiorca w grupie odbiera część rekordów.

Aplikacja odbiorcy akceptuje parametr, który jest używany jako identyfikator grupy. Na przykład następujące polecenie uruchamia odbiorcę przy użyciu identyfikatora grupy myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Użyj klawiszy Ctrl+C, aby zakończyć działanie odbiorcy.

Aby zobaczyć, jak działa ten proces, użyj następującego polecenia:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

To polecenie używa polecenia tmux, aby podzielić terminal na dwie kolumny. W każdej kolumnie jest uruchamiany odbiorca z tą samą wartością identyfikatora grupy. Kiedy odbiorcy zakończą odczytywanie, można zauważyć, że każdy z nich odczytał tylko część rekordów. Użyj dwukrotnie klawiszy Ctrl + C , aby zakończyć działanie tmuxpolecenia .

Użycie przez klientów w tej samej grupie jest obsługiwane przez partycje tematu. W tym przykładowym kodzie utworzony wcześniej temat test ma osiem partycji. Jeśli zostanie uruchomionych ośmiu odbiorców, każdy z nich będzie odczytywał rekordy z jednej partycji tematu.

Ważne

Grupa odbiorców nie może zawierać więcej wystąpień odbiorców niż partycji. W tym przykładzie jedna grupa odbiorców może zawierać maksymalnie ośmiu odbiorców, ponieważ tyle partycji znajduje się w temacie. Może też istnieć wiele grup odbiorców — każda z nich może zawierać maksymalnie ośmiu odbiorców.

Rekordy przechowywane na platformie Kafka są przechowywane w kolejności, w której są odbierane w ramach partycji. Aby dostarczać rekordy na partycji w określonej kolejności, utwórz grupę odbiorców, w której liczba wystąpień odbiorców jest zgodna z liczbą partycji. Aby dostarczać rekordy w temacie w określonej kolejności, utwórz grupę odbiorców z jednym wystąpieniem odbiorcy.

Typowe problemy, z którymi się borykają

  1. Tworzenie tematu kończy się niepowodzeniem Jeśli klaster jest włączony pakiet zabezpieczeń przedsiębiorstwa, użyj wstępnie utworzonych plików JAR dla producenta i odbiorcy. Plik JAR esp można skompilować na podstawie kodu w podkataloguDomainJoined-Producer-Consumer. Właściwości producenta i odbiorcy mają dodatkową właściwość CommonClientConfigs.SECURITY_PROTOCOL_CONFIG dla klastrów obsługujących protokół ESP.

  2. Niepowodzenie w klastrach z włączoną obsługą ze stanem rejestracji: jeśli operacje tworzenia i korzystania z nich kończą się niepowodzeniem i używasz klastra z włączoną obsługą ze stanem rejestracji, sprawdź, czy użytkownik kafka jest obecny we wszystkich zasadach platformy Ranger. Jeśli nie jest obecny, dodaj go do wszystkich zasad platformy Ranger.

Czyszczenie zasobów

Aby wyczyścić zasoby utworzone w tym samouczku, możesz usunąć grupę zasobów. Usunięcie grupy zasobów powoduje również usunięcie skojarzonego klastra usługi HDInsight i wszystkich innych zasobów skojarzonych z tą grupą zasobów.

Aby usunąć grupę zasobów za pomocą witryny Azure Portal:

  1. W witrynie Azure Portal rozwiń menu po lewej stronie, aby otworzyć menu usług, a następnie wybierz pozycję Grupy zasobów, aby wyświetlić listę grup zasobów.
  2. Znajdź grupę zasobów do usunięcia, a następnie kliknij prawym przyciskiem myszy przycisk Więcej (...) po prawej stronie listy.
  3. Wybierz pozycję Usuń grupę zasobów i potwierdź.

Następne kroki

W tym dokumencie zawarto informacje o sposobie korzystania z interfejsu API producenta i odbiorcy platformy Apache Kafka w usłudze HDInsight. Dowiedz się więcej o pracy z platformą Kafka, korzystając z następujących zasobów: