Självstudie: Använda Apache Kafka-producenten och konsument-API:er

Lär dig att använda Apache Kafka-producenten och konsument-API:er med Kafka i HDInsight.

Kafka-producentens API tillåter att program skickar dataströmmar till Kafka-klustret. Kafka-konsumentens API tillåter att program läser dataströmmar från klustret.

I den här guiden får du lära dig att:

  • Krav
  • Förstå koden
  • Skapa och distribuera programmet
  • Köra programmet på klustret

Mer information om API:er finns i Apache-dokumentationen i Producent-API och Konsument-API.

Förutsättningar

Förstå koden

Exempelprogrammet finns på https://github.com/Azure-Samples/hdinsight-kafka-java-get-started i underkatalogen Producer-Consumer. Om du använder Ett Kafka-kluster (Enterprise Security Package) bör du använda programversionen som finns i underkatalogen DomainJoined-Producer-Consumer .

Programmet består i huvudsak av fyra filer:

  • pom.xml: Den här filen definierar projektberoenden, Java-version och paketeringsmetoder.
  • Producer.java: Den här filen skickar slumpmässiga meningar till Kafka med producent-API:et.
  • Consumer.java: Den här filen använder konsument-API:n till att läsa data från Kafka och generera den till STDOUT.
  • AdminClientWrapper.java: Den här filen använder administratörs-API:et för att skapa, beskriva och ta bort Kafka-ämnen.
  • Run.java: Kommandoradsgränssnittet används för att köra producent- och konsumentkoden.

Pom.xml

Viktiga saker att förstå i pom.xml-filen är:

  • Beroenden: Det här projektet använder producent- och konsument-API:er i Kafka, som tillhandahålls av kafka-clients-paketet. Följande XML-kod definierar detta beroende:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    ${kafka.version}-posten har deklarerats i <properties>..</properties>-avsnittet i pom.xml och är konfigurerad till Kafka-versionen av HDInsight-klustret.

  • Plugin-program: Plugin-programmet Maven innehåller olika funktioner. I det här projektet används följande plugin-program:

    • maven-compiler-plugin: Används för att ange den Java-version som används av projektet till 8. Detta är den version av Java som används av HDInsight 3.6.
    • maven-shade-plugin: Används för att generera en Uber-jar som innehåller det här programmet, samt eventuella beroenden. Det används också att ange startpunkt för programmet, så att du kan köra Jar-filen direkt utan att behöva ange huvudklassen.

Producer.java

Producenten kommunicerar med värdar för Kafka-meddelandeköer (arbetarnoder) och skickar data till ett Kafka-ämne. Följande kodfragment kommer från filen Producer.java från GitHub-lagringsplatsen och visar hur du anger producentegenskaperna. För Enterprise Security-aktiverade kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.java

Konsumenten kommunicerar med värdar för Kafka-meddelandeköer (arbetarnoder) och läser posterna i en loop. Följande kodfragment från filen Consumer.java anger konsumentegenskaperna. För Enterprise Security-aktiverade kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

I den här koden är konsumenten konfigurerad att läsa från början av ämnet (auto.offset.reset är inställd på earliest.)

Run.java

Filen Run.java innehåller ett kommandoradsgränssnitt som kör antingen producent- eller konsumentkoden. Du måste ange värdinformationen om Kafka-meddelandeköerna som en parameter. Du kan också inkludera ett grupp-ID-värde som används av konsumentprocessen. Om du skapar flera konsumentinstanser med samma grupp-ID belastningsutjämningsläsning från ämnet.

Skapa och distribuera exemplet

Använda färdiga JAR-filer

Ladda ned jar-filerna från Azure-exemplet Kafka Kom igång. Om ditt kluster är aktiverat för Enterprise Security Package (ESP) använder du kafka-producer-consumer-esp.jar. Använd kommandot nedan för att kopiera jar-filerna till klustret.

scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

Skapa JAR-filerna från kod

Om du vill hoppa över det här steget kan fördefinierade jar-filer laddas ned från underkatalogen Prebuilt-Jars . Ladda ned kafka-producer-consumer.jar. Om ditt kluster är aktiverat för Enterprise Security Package (ESP) använder du kafka-producer-consumer-esp.jar. Kör steg 3 för att kopiera jar-filen till HDInsight-klustret.

  1. Ladda ned och extrahera exemplen från https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Ange den aktuella katalogen till platsen för hdinsight-kafka-java-get-started\Producer-Consumer katalogen. Om du använder Ett Esp-aktiverat Kafka-kluster (Enterprise Security Package) bör du ange platsen som DomainJoined-Producer-Consumerunderkatalog. Använd följande kommando för att skapa programmet:

    mvn clean package
    

    Det här kommandot skapar en katalog med namnet target, som innehåller en fil med namnet kafka-producer-consumer-1.0-SNAPSHOT.jar. För ESP-kluster kommer filen att kafka-producer-consumer-esp-1.0-SNAPSHOT.jar

  3. Ersätt sshuser med SSH-användare för klustret och ersätt CLUSTERNAME med namnet på klustret. Ange följande kommando för att kopiera kafka-producer-consumer-1.0-SNAPSHOT.jar filen till HDInsight-klustret. Ange lösenordet för SSH-användaren när du uppmanas till det.

    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

Kör exemplet

  1. Ersätt sshuser med SSH-användare för klustret och ersätt CLUSTERNAME med namnet på klustret. Öppna en SSH-anslutning till klustret genom att ange följande kommando. Ange lösenordet för SSH-användarkontot om du uppmanas till det.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Om du vill hämta Kafka Broker-värdarna ersätter du värdena för <clustername> och <password> i följande kommando och kör det. Använd samma hölje för <clustername> som i Azure Portal. Ersätt <password> med lösenordet för klusterinloggning och kör sedan:

    sudo apt -y install jq
    export CLUSTER_NAME='<clustername>'
    export PASSWORD='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Anteckning

    Det här kommandot kräver Ambari-åtkomst. Om klustret ligger bakom en NSG kör du det här kommandot från en dator som har åtkomst till Ambari.

  3. Skapa Kafka-ämne, myTest, genom att ange följande kommando:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. Om du vill köra producenten och skriva data till ämnet, använder du följande kommando:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. När producenten är klar kan du använda följande kommando för att läsa från ämnet:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    De lästa posterna, tillsammans med antalet poster, visas.

  6. Använd Ctrl + C om du vill avsluta konsumenten.

Flera konsumenter

Kafka-konsumenter använder en konsumentgrupp vid läsning av poster. Att använda samma grupp med flera konsumenter resulterar i belastningsutjämnaravläsningar från ett ämne. Varje konsument i gruppen tar emot en del av posterna.

Konsumentprogrammet accepterar en parameter som används som grupp-ID. Exempelvis startar följande kommando en konsument med hjälp av ett grupp-ID i myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Använd Ctrl + C om du vill avsluta konsumenten.

Använd följande kommando om du vill se hur det fungerar:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

Detta kommando använder tmux till att dela terminalen i två kolumner. En konsument startas i varje kolumn med samma ID-värde för gruppen. När konsumenterna har läst färdigt kan du se att de bara läst en del av posterna. Använd Ctrl + C två gånger för att avsluta tmux.

Förbrukning av klienter i samma grupp hanteras via partitionerna för ämnet. I det här kodexemplet har test-ämnet som skapades tidigare åtta partitioner. Om du startar åtta konsumenter läser varje konsument poster från en enda partition i ämnet.

Viktigt

Det får inte finnas flera instanser av konsumenten i en konsumentgrupp än partitioner. I det här exemplet kan en konsumentgrupp innehålla upp till åtta konsumenter, eftersom det är antalet partitioner i ämnet. Du kan även ha flera konsumentgrupper med högst åtta konsumenter vardera.

Poster som lagras i Kafka lagras i den ordning de tas emot i en partition. För att uppnå sorterad leverans av poster inom en partition skapar du en konsumentgrupp där antalet konsumentinstanser matchar antalet partitioner. För att uppnå sorterad leverans av poster i ämnet skapar du en konsumentgrupp med bara en konsumentinstans.

Vanliga problem som uppstår

  1. Det går inte att skapa ämnet Om ditt kluster är Enterprise Security Pack aktiverat använder du de färdiga JAR-filerna för producent och konsument. ESP-jar-filen kan skapas från koden i underkatalogenDomainJoined-Producer-Consumer. Producent- och konsumentegenskaperna har ytterligare en egenskap CommonClientConfigs.SECURITY_PROTOCOL_CONFIG för ESP-aktiverade kluster.

  2. Fel i ESP-aktiverade kluster: Om genererar och förbrukar åtgärder misslyckas och du använder ett ESP-aktiverat kluster kontrollerar du att användaren kafka finns i alla Ranger-principer. Om den inte finns lägger du till den i alla Ranger-principer.

Rensa resurser

Om du vill rensa resurserna som har skapats med den här självstudien kan du ta bort resursgruppen. När du tar bort resursgruppen raderas även det kopplade HDInsight-klustret och eventuella andra resurser som är associerade med resursgruppen.

Ta bort en resursgrupp med Azure Portal:

  1. I Azure Portal expanderar du menyn på vänster sida för att öppna tjänstemenyn och väljer sedan Resursgrupper för att visa listan med dina resursgrupper.
  2. Leta reda på den resursgrupp du vill ta bort och högerklicka på knappen Mer (...) till höger om listan.
  3. Välj Ta bort resursgrupp och bekräfta.

Nästa steg

I det här dokumentet har du lärt dig att använda Apache Kafka-producent- och konsument-API:et med Kafka i HDInsight. Använd följande för att lära dig mer om att arbeta med Kafka: