Självstudie: Använda Apache Kafka-producenten och konsument-API:er

Lär dig att använda Apache Kafka-producenten och konsument-API:er med Kafka i HDInsight.

Kafka-producentens API tillåter att program skickar dataströmmar till Kafka-klustret. Kafka-konsumentens API tillåter att program läser dataströmmar från klustret.

I den här guiden får du lära dig att:

  • Krav
  • Förstå koden
  • Skapa och distribuera programmet
  • Köra programmet på klustret

Mer information om API:er finns i Apache-dokumentationen i Producent-API och Konsument-API.

Förutsättningar

Förstå koden

Exempelprogrammet finns på https://github.com/Azure-Samples/hdinsight-kafka-java-get-started i Producer-Consumer underkatalogen . Om du använder Enterprise Security Package (ESP) aktiverat Kafka-kluster bör du använda den programversion som finns i DomainJoined-Producer-Consumer underkatalogen.

Programmet består i huvudsak av fyra filer:

  • pom.xml: Den här filen definierar projektberoenden, Java-version och paketeringsmetoder.
  • Producer.java: Den här filen skickar slumpmässiga meningar till Kafka med producent-API:et.
  • Consumer.java: Den här filen använder konsument-API:n till att läsa data från Kafka och generera den till STDOUT.
  • AdminClientWrapper.java: Den här filen använder admin-API:et för att skapa, beskriva och ta bort Kafka-ämnen.
  • Run.java: Kommandoradsgränssnittet används för att köra producent- och konsumentkoden.

Pom.xml

Viktiga saker att förstå i pom.xml-filen är:

  • Beroenden: Det här projektet använder producent- och konsument-API:er i Kafka, som tillhandahålls av kafka-clients-paketet. Följande XML-kod definierar detta beroende:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    ${kafka.version}-posten har deklarerats i <properties>..</properties>-avsnittet i pom.xml och är konfigurerad till Kafka-versionen av HDInsight-klustret.

  • Plugin-program: Plugin-programmet Maven innehåller olika funktioner. I det här projektet används följande plugin-program:

    • maven-compiler-plugin: Används för att ange den Java-version som används av projektet till 8. Detta är den version av Java som används av HDInsight 3.6.
    • maven-shade-plugin: Används för att generera en Uber-jar som innehåller det här programmet, samt eventuella beroenden. Det används också att ange startpunkt för programmet, så att du kan köra Jar-filen direkt utan att behöva ange huvudklassen.

Producer.java

Producenten kommunicerar med värdar för Kafka-meddelandeköer (arbetarnoder) och skickar data till ett Kafka-ämne. Följande kodfragment kommer från filen Producer.java från GitHub och visar hur du anger producentegenskaperna. För Enterprise Security Enabled-kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.java

Konsumenten kommunicerar med värdar för Kafka-meddelandeköer (arbetarnoder) och läser posterna i en loop. Följande kodfragment från filen Consumer.java anger konsumentegenskaperna. För Enterprise Security Enabled-kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

I den här koden är konsumenten konfigurerad att läsa från början av ämnet (auto.offset.reset är inställd på earliest.)

Run.java

Filen Run.java innehåller ett kommandoradsgränssnitt som kör antingen producent- eller konsumentkoden. Du måste ange värdinformationen om Kafka-meddelandeköerna som en parameter. Du kan också inkludera ett grupp-ID-värde som används av konsumentprocessen. Om du skapar flera konsumentinstanser med samma grupp-ID belastningsutjämnar de läsningen från ämnet.

Skapa och distribuera exemplet

Använda förbyggda JAR-filer

Ladda ned jar-filer från Kafka Kom igång Azure-exempel. Om klustret är Enterprise Security Package (ESP) aktiverat använder du kafka-producer-consumer-esp.jar. Använd kommandot nedan för att kopiera jar-filer till klustret.

scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

Skapa JAR-filerna från kod

Om du vill hoppa över det här steget kan fördefinierade jar-filer laddas ned från Prebuilt-Jars underkatalogen. Ladda ned kafka-producer-consumer.jar. Om klustret är Enterprise Security Package (ESP) aktiverat använder du kafka-producer-consumer-esp.jar. Kör steg 3 för att kopiera jar-filen till ditt HDInsight-kluster.

  1. Ladda ned och extrahera exemplen från https://github.com/Azure-Samples/hdinsight-kafka-java-get-started .

  2. Ange katalogens plats som din hdinsight-kafka-java-get-started\Producer-Consumer aktuella katalog. Om du använder Enterprise Security Package (ESP) aktiverat Kafka-kluster bör du ange platsen till DomainJoined-Producer-Consumer underkatalog. Använd följande kommando för att skapa programmet:

    mvn clean package
    

    Det här kommandot skapar en katalog med namnet target, som innehåller en fil med namnet kafka-producer-consumer-1.0-SNAPSHOT.jar. För ESP-kluster blir filen kafka-producer-consumer-esp-1.0-SNAPSHOT.jar

  3. Ersätt sshuser med SSH-användare för klustret och ersätt CLUSTERNAME med namnet på klustret. Ange följande kommando för att kopiera filen kafka-producer-consumer-1.0-SNAPSHOT.jar till ditt HDInsight-kluster. Ange lösenordet för SSH-användaren när du uppmanas till det.

    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

Kör exemplet

  1. Ersätt sshuser med SSH-användare för klustret och ersätt CLUSTERNAME med namnet på klustret. Öppna en SSH-anslutning till klustret genom att ange följande kommando. Ange lösenordet för SSH-användarkontot om du uppmanas till det.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Om du vill hämta Värdar för Kafka-meddelandeköer ersätter du värdena <clustername> för och i följande kommando och kör <password> det. Använd samma hölje för <clustername> som i Azure Portal. Ersätt <password> med lösenordet för klusterinloggning och kör sedan:

    sudo apt -y install jq
    export clusterName='<clustername>'
    export password='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Anteckning

    Det här kommandot kräver Ambari-åtkomst. Om klustret finns bakom en NSG kör du det här kommandot från en dator som har åtkomst till Ambari.

  3. Skapa Kafka-ämne, myTest , genom att ange följande kommando:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. Om du vill köra producenten och skriva data till ämnet, använder du följande kommando:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. När producenten är klar kan du använda följande kommando för att läsa från ämnet:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    De lästa posterna, tillsammans med antalet poster, visas.

  6. Använd Ctrl + C om du vill avsluta konsumenten.

Flera konsumenter

Kafka-konsumenter använder en konsumentgrupp vid läsning av poster. Att använda samma grupp med flera konsumenter resulterar i belastningsutjämnaravläsningar från ett ämne. Varje konsument i gruppen tar emot en del av posterna.

Konsumentprogrammet accepterar en parameter som används som grupp-ID. Exempelvis startar följande kommando en konsument med hjälp av ett grupp-ID i myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Använd Ctrl + C om du vill avsluta konsumenten.

Använd följande kommando om du vill se hur det fungerar:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

Detta kommando använder tmux till att dela terminalen i två kolumner. En konsument startas i varje kolumn med samma ID-värde för gruppen. När konsumenterna har läst färdigt kan du se att de bara läst en del av posterna. Använd Ctrl + C två gånger för att avsluta tmux .

Förbrukning av klienter i samma grupp hanteras via partitionerna för ämnet. I det här kodexemplet har test-ämnet som skapades tidigare åtta partitioner. Om du startar åtta konsumenter läser varje konsument poster från en enda partition i ämnet.

Viktigt

Det får inte finnas flera instanser av konsumenten i en konsumentgrupp än partitioner. I det här exemplet kan en konsumentgrupp innehålla upp till åtta konsumenter, eftersom det är antalet partitioner i ämnet. Du kan även ha flera konsumentgrupper med högst åtta konsumenter vardera.

Poster som lagras i Kafka lagras i den ordning de tas emot inom en partition. För att uppnå sorterad leverans av poster inom en partition skapar du en konsumentgrupp där antalet konsumentinstanser matchar antalet partitioner. För att uppnå sorterad leverans av poster i ämnet skapar du en konsumentgrupp med bara en konsumentinstans.

Vanliga problem

  1. Det går inte att skapa ämnet Om ditt kluster är Enterprise Security Pack aktiverat använder du de förbyggda JAR-filerna för producent och konsument. ESP-jar-koden kan skapas från koden i DomainJoined-Producer-Consumer underkatalogen. Producent- och konsumentegenskaperna har ytterligare en CommonClientConfigs.SECURITY_PROTOCOL_CONFIG egenskap för ESP-aktiverade kluster.

  2. Fel i ESP-aktiverade kluster: Om det inte går att skapa och använda åtgärder och du använder ett ESP-aktiverat kluster kontrollerar du att användaren finns i kafka alla Ranger-principer. Om den inte finns lägger du till den i alla Ranger-principer.

Rensa resurser

Om du vill rensa resurserna som har skapats med den här självstudien kan du ta bort resursgruppen. När du tar bort resursgruppen raderas även det kopplade HDInsight-klustret och eventuella andra resurser som är associerade med resursgruppen.

Ta bort en resursgrupp med Azure Portal:

  1. I Azure Portal expanderar du menyn på vänster sida för att öppna tjänstemenyn och väljer sedan Resursgrupper för att visa listan med dina resursgrupper.
  2. Leta reda på den resursgrupp du vill ta bort och högerklicka på knappen Mer (...) till höger om listan.
  3. Välj Ta bort resursgrupp och bekräfta.

Nästa steg

I det här dokumentet har du lärt dig att använda Apache Kafka-producent- och konsument-API:et med Kafka i HDInsight. Använd följande för att lära dig mer om att arbeta med Kafka: