Självstudie: Använda Apache Kafka-producenten och konsument-API:er
Lär dig att använda Apache Kafka-producenten och konsument-API:er med Kafka i HDInsight.
Kafka-producentens API tillåter att program skickar dataströmmar till Kafka-klustret. Kafka-konsumentens API tillåter att program läser dataströmmar från klustret.
I den här guiden får du lära dig att:
- Krav
- Förstå koden
- Skapa och distribuera programmet
- Köra programmet på klustret
Mer information om API:er finns i Apache-dokumentationen i Producent-API och Konsument-API.
Förutsättningar
- Apache Kafka hdinsight-kluster. Information om hur du skapar klustret finns i Börja med Apache Kafka på HDInsight.
- Java Developer Kit (JDK) version 8 eller motsvarande, till exempel OpenJDK.
- Apache Maven är korrekt installerat enligt Apache. Maven är ett projektbyggsystem för Java-projekt.
- En SSH-klient som Putty. Mer information finns i Ansluta till HDInsight (Apache Hadoop) med hjälp av SSH.
Förstå koden
Exempelprogrammet finns på https://github.com/Azure-Samples/hdinsight-kafka-java-get-started i Producer-Consumer underkatalogen . Om du använder Enterprise Security Package (ESP) aktiverat Kafka-kluster bör du använda den programversion som finns i DomainJoined-Producer-Consumer underkatalogen.
Programmet består i huvudsak av fyra filer:
pom.xml: Den här filen definierar projektberoenden, Java-version och paketeringsmetoder.Producer.java: Den här filen skickar slumpmässiga meningar till Kafka med producent-API:et.Consumer.java: Den här filen använder konsument-API:n till att läsa data från Kafka och generera den till STDOUT.AdminClientWrapper.java: Den här filen använder admin-API:et för att skapa, beskriva och ta bort Kafka-ämnen.Run.java: Kommandoradsgränssnittet används för att köra producent- och konsumentkoden.
Pom.xml
Viktiga saker att förstå i pom.xml-filen är:
Beroenden: Det här projektet använder producent- och konsument-API:er i Kafka, som tillhandahålls av
kafka-clients-paketet. Följande XML-kod definierar detta beroende:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>${kafka.version}-posten har deklarerats i<properties>..</properties>-avsnittet ipom.xmloch är konfigurerad till Kafka-versionen av HDInsight-klustret.Plugin-program: Plugin-programmet Maven innehåller olika funktioner. I det här projektet används följande plugin-program:
maven-compiler-plugin: Används för att ange den Java-version som används av projektet till 8. Detta är den version av Java som används av HDInsight 3.6.maven-shade-plugin: Används för att generera en Uber-jar som innehåller det här programmet, samt eventuella beroenden. Det används också att ange startpunkt för programmet, så att du kan köra Jar-filen direkt utan att behöva ange huvudklassen.
Producer.java
Producenten kommunicerar med värdar för Kafka-meddelandeköer (arbetarnoder) och skickar data till ett Kafka-ämne. Följande kodfragment kommer från filen Producer.java från GitHub och visar hur du anger producentegenskaperna. För Enterprise Security Enabled-kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Konsumenten kommunicerar med värdar för Kafka-meddelandeköer (arbetarnoder) och läser posterna i en loop. Följande kodfragment från filen Consumer.java anger konsumentegenskaperna. För Enterprise Security Enabled-kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
I den här koden är konsumenten konfigurerad att läsa från början av ämnet (auto.offset.reset är inställd på earliest.)
Run.java
Filen Run.java innehåller ett kommandoradsgränssnitt som kör antingen producent- eller konsumentkoden. Du måste ange värdinformationen om Kafka-meddelandeköerna som en parameter. Du kan också inkludera ett grupp-ID-värde som används av konsumentprocessen. Om du skapar flera konsumentinstanser med samma grupp-ID belastningsutjämnar de läsningen från ämnet.
Skapa och distribuera exemplet
Använda förbyggda JAR-filer
Ladda ned jar-filer från Kafka Kom igång Azure-exempel. Om klustret är Enterprise Security Package (ESP) aktiverat använder du kafka-producer-consumer-esp.jar. Använd kommandot nedan för att kopiera jar-filer till klustret.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Skapa JAR-filerna från kod
Om du vill hoppa över det här steget kan fördefinierade jar-filer laddas ned från Prebuilt-Jars underkatalogen. Ladda ned kafka-producer-consumer.jar. Om klustret är Enterprise Security Package (ESP) aktiverat använder du kafka-producer-consumer-esp.jar. Kör steg 3 för att kopiera jar-filen till ditt HDInsight-kluster.
Ladda ned och extrahera exemplen från https://github.com/Azure-Samples/hdinsight-kafka-java-get-started .
Ange katalogens plats som din
hdinsight-kafka-java-get-started\Producer-Consumeraktuella katalog. Om du använder Enterprise Security Package (ESP) aktiverat Kafka-kluster bör du ange platsen tillDomainJoined-Producer-Consumerunderkatalog. Använd följande kommando för att skapa programmet:mvn clean packageDet här kommandot skapar en katalog med namnet
target, som innehåller en fil med namnetkafka-producer-consumer-1.0-SNAPSHOT.jar. För ESP-kluster blir filenkafka-producer-consumer-esp-1.0-SNAPSHOT.jarErsätt
sshusermed SSH-användare för klustret och ersättCLUSTERNAMEmed namnet på klustret. Ange följande kommando för att kopiera filenkafka-producer-consumer-1.0-SNAPSHOT.jartill ditt HDInsight-kluster. Ange lösenordet för SSH-användaren när du uppmanas till det.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Kör exemplet
Ersätt
sshusermed SSH-användare för klustret och ersättCLUSTERNAMEmed namnet på klustret. Öppna en SSH-anslutning till klustret genom att ange följande kommando. Ange lösenordet för SSH-användarkontot om du uppmanas till det.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.netOm du vill hämta Värdar för Kafka-meddelandeköer ersätter du värdena
<clustername>för och i följande kommando och kör<password>det. Använd samma hölje för<clustername>som i Azure Portal. Ersätt<password>med lösenordet för klusterinloggning och kör sedan:sudo apt -y install jq export clusterName='<clustername>' export password='<password>' export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);Anteckning
Det här kommandot kräver Ambari-åtkomst. Om klustret finns bakom en NSG kör du det här kommandot från en dator som har åtkomst till Ambari.
Skapa Kafka-ämne,
myTest, genom att ange följande kommando:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERSOm du vill köra producenten och skriva data till ämnet, använder du följande kommando:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERSNär producenten är klar kan du använda följande kommando för att läsa från ämnet:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jarDe lästa posterna, tillsammans med antalet poster, visas.
Använd Ctrl + C om du vill avsluta konsumenten.
Flera konsumenter
Kafka-konsumenter använder en konsumentgrupp vid läsning av poster. Att använda samma grupp med flera konsumenter resulterar i belastningsutjämnaravläsningar från ett ämne. Varje konsument i gruppen tar emot en del av posterna.
Konsumentprogrammet accepterar en parameter som används som grupp-ID. Exempelvis startar följande kommando en konsument med hjälp av ett grupp-ID i myGroup:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Använd Ctrl + C om du vill avsluta konsumenten.
Använd följande kommando om du vill se hur det fungerar:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Detta kommando använder tmux till att dela terminalen i två kolumner. En konsument startas i varje kolumn med samma ID-värde för gruppen. När konsumenterna har läst färdigt kan du se att de bara läst en del av posterna. Använd Ctrl + C två gånger för att avsluta tmux .
Förbrukning av klienter i samma grupp hanteras via partitionerna för ämnet. I det här kodexemplet har test-ämnet som skapades tidigare åtta partitioner. Om du startar åtta konsumenter läser varje konsument poster från en enda partition i ämnet.
Viktigt
Det får inte finnas flera instanser av konsumenten i en konsumentgrupp än partitioner. I det här exemplet kan en konsumentgrupp innehålla upp till åtta konsumenter, eftersom det är antalet partitioner i ämnet. Du kan även ha flera konsumentgrupper med högst åtta konsumenter vardera.
Poster som lagras i Kafka lagras i den ordning de tas emot inom en partition. För att uppnå sorterad leverans av poster inom en partition skapar du en konsumentgrupp där antalet konsumentinstanser matchar antalet partitioner. För att uppnå sorterad leverans av poster i ämnet skapar du en konsumentgrupp med bara en konsumentinstans.
Vanliga problem
Det går inte att skapa ämnet Om ditt kluster är Enterprise Security Pack aktiverat använder du de förbyggda JAR-filerna för producent och konsument. ESP-jar-koden kan skapas från koden i
DomainJoined-Producer-Consumerunderkatalogen. Producent- och konsumentegenskaperna har ytterligare enCommonClientConfigs.SECURITY_PROTOCOL_CONFIGegenskap för ESP-aktiverade kluster.Fel i ESP-aktiverade kluster: Om det inte går att skapa och använda åtgärder och du använder ett ESP-aktiverat kluster kontrollerar du att användaren finns i
kafkaalla Ranger-principer. Om den inte finns lägger du till den i alla Ranger-principer.
Rensa resurser
Om du vill rensa resurserna som har skapats med den här självstudien kan du ta bort resursgruppen. När du tar bort resursgruppen raderas även det kopplade HDInsight-klustret och eventuella andra resurser som är associerade med resursgruppen.
Ta bort en resursgrupp med Azure Portal:
- I Azure Portal expanderar du menyn på vänster sida för att öppna tjänstemenyn och väljer sedan Resursgrupper för att visa listan med dina resursgrupper.
- Leta reda på den resursgrupp du vill ta bort och högerklicka på knappen Mer (...) till höger om listan.
- Välj Ta bort resursgrupp och bekräfta.
Nästa steg
I det här dokumentet har du lärt dig att använda Apache Kafka-producent- och konsument-API:et med Kafka i HDInsight. Använd följande för att lära dig mer om att arbeta med Kafka: