Självstudie: Använda Apache Kafka Streams API i Azure HDInsight
Lär dig hur du skapar ett program som använder Apache Kafka Streams-API:et och kör det med Kafka i HDInsight.
Programmet som används i den här självstudien är en strömmande ordräkning. Den läser textdata från ett Kafka-ämne, extraherar enskilda ord och lagrar sedan ordet och antalet till ett annat Kafka-ämne.
Kafka-dataströmbearbetning utförs ofta med Apache Spark. Kafka version 2.1.1 och 2.4.1 (i HDInsight 4.0 och 5.0) stöder Kafka Flöden API. Med detta API kan du transformera dataströmmar mellan indata- och utdata-ämnen.
Mer information om Kafka Streams finns i dokumentationen Intro to Streams (Introduktion till Streams) på Apache.org.
I den här självstudien lär du dig att:
- Förstå koden
- Skapa och distribuera programmet
- Konfigurera Kafka-ämnen
- Kör koden
Förutsättningar
Ett Kafka på HDInsight 4.0- eller 5.0-kluster. Information om hur du skapar en Kafka på ett HDInsight-kluster finns i dokumentet Starta med Apache Kafka i HDInsight.
Utför stegen i dokumentet Apache Kafka-konsument- och producent-API. Stegen i det här dokumentet använder exempelprogram och avsnitt som skapats i den här kursen.
Java Developer Kit (JDK) version 8 eller motsvarande, till exempel OpenJDK.
Apache Maven har installerats korrekt enligt Apache. Maven är ett projektbyggsystem för Java-projekt.
En SSH-klient. Mer information finns i Ansluta till HDInsight (Apache Hadoop) med hjälp av SSH.
Förstå koden
Exempelprogrammet finns på https://github.com/Azure-Samples/hdinsight-kafka-java-get-started i underkatalogen Streaming
. Programmet består av två filer:
pom.xml
: Den här filen definierar projektberoenden, Java-version och paketeringsmetoder.Stream.java
: Den här filen implementerar strömningslogiken.
Pom.xml
Viktiga saker att förstå i pom.xml
-filen är:
Beroenden: Det här projektet använder Kafka Streams-API:er, som tillhandahålls av
kafka-clients
-paketet. Följande XML-kod definierar detta beroende:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
${kafka.version}
-posten har deklarerats i<properties>..</properties>
-avsnittet ipom.xml
och är konfigurerad till Kafka-versionen av HDInsight-klustret.Plugin-program: Plugin-programmet Maven innehåller olika funktioner. I det här projektet används följande plugin-program:
maven-compiler-plugin
: Används för att ange att den Java-version som används av projektet är 8. HDInsight 4.0 och 5.0 kräver Java 8.maven-shade-plugin
: Används för att generera en uber-jar som innehåller det här programmet och eventuella beroenden. Den används också för att ange startpunkten för programmet, så att du kan köra Jar-filen direkt utan att behöva ange huvudklassen.
Stream.java
Filen Stream.java använder Streams-API för att implementera ett ordantalprogram. Den läser data från ett Kafka-ämne som heter test
och skriver ordräkningen till ett ämne som heter wordcounts
.
Följande kod definierar ordräkningsprogrammet:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Skapa och distribuera exemplet
För att skapa och distribuera projektet till Kafka p HDInsight-klustret utför du följande steg:
Ange den aktuella katalogen till platsen för
hdinsight-kafka-java-get-started-master\Streaming
katalogen och använd sedan följande kommando för att skapa ett jar-paket:mvn clean package
Det här kommandot skapar paketet på
target/kafka-streaming-1.0-SNAPSHOT.jar
.Ersätt
sshuser
med SSH-användare för klustret och ersättclustername
med namnet på klustret. Använd följande kommando för att kopierakafka-streaming-1.0-SNAPSHOT.jar
filen till ditt HDInsight-kluster. Ange lösenordet för SSH-användarkontot om du uppmanas till det.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Skapa Apache Kafka-ämnen
Ersätt
sshuser
med SSH-användare för klustret och ersättCLUSTERNAME
med namnet på klustret. Öppna en SSH-anslutning till klustret genom att ange följande kommando. Ange lösenordet för SSH-användarkontot om du uppmanas till det.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Installera jq, en JSON-processor på kommandoraden. Från den öppna SSH-anslutningen anger du följande kommando för att installera
jq
:sudo apt -y install jq
Konfigurera lösenordsvariabel. Ersätt
PASSWORD
med lösenordet för klusterinloggning och ange sedan kommandot:export PASSWORD='PASSWORD'
Extrahera ett korrekt skiftlägesklusternamn. Det faktiska höljet för klusternamnet kan skilja sig från förväntat, beroende på hur klustret skapades. Det här kommandot hämtar det faktiska höljet och lagrar det sedan i en variabel. Ange följande kommando:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Kommentar
Om du gör den här processen utanför klustret finns det en annan procedur för att lagra klusternamnet. Hämta klusternamnet i gemener från Azure-portalen. Ersätt sedan klusternamnet med
<clustername>
i följande kommando och kör det:export clusterName='<clustername>'
.Använd följande kommandon för att hämta värdar för Apache Kafka-meddelandeköer och Apache Zookeeper. När du blir ombedd anger du lösenordet till klusterinloggningskontot (admin).
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Kommentar
Dessa kommandon kräver Ambari-åtkomst. Om klustret ligger bakom en NSG kör du dessa kommandon från en dator som har åtkomst till Ambari.
Skapa ämnen som används av strömningen med följande kommandon:
Kommentar
Du kan få ett felmeddelande om att ämnet
test
redan finns. Det är OK eftersom det kan ha skapats i självstudien Producent- och konsument-API./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Ämnena använda för följande ändamål:
test
: I det här ämnet tas poster emot. Strömningsprogrammet läses härifrån.wordcounts
: I det här ämnet lagrar strömningsprogrammet sina utdata.RekeyedIntermediateTopic
: Det här ämnet används för att partitionera om data när antalet uppdateras av operatorncountByKey
.wordcount-example-Counts-changelog
: Det är ämnet är ett tillståndslager som används av åtgärdencountByKey
Kafka på HDInsight kan också konfigureras för att automatiskt skapa ämnen. Mer information finns i dokumentet Configure automatic topic creation (Konfigurera automatiskt skapande av ämne).
Kör koden
Om du vill starta strömningsprogrammet som en bakgrundsprocess använder du följande kommando:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Du kan få en varning om Apache
log4j
. Du kan ignorera den här varningen.För att skicka poster till ämnet
test
använder du följande kommando för att starta producentprogrammet:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
När producenten är klar använder du följande kommando för att visa informationen som är lagrad i ämnet
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
Parametrarna
--property
uppmanar konsolkonsumenten att skriva ut nyckeln (ordet) tillsammans med antalet (värdet). Den här parametern konfigurerar även funktionen för avserialisering som ska användas vid läsning av dessa värden från Kafka.De utdata som genereras liknar följande text:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
Parametern
--from-beginning
konfigurerar konsumenten för att börja vid början av posterna i ämnet. Antalet ökar varje gång ett ord påträffas, så det här ämnet innehåller flera poster för varje ord med ett ökande antal.Använd Ctrl + C om du vill avsluta producenten. Fortsätt att använda Ctrl + C för att avsluta programmet och konsumenten.
Om du vill ta bort de ämnen som används av strömningsåtgärden använder du följande kommandon:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Rensa resurser
Om du vill rensa resurserna som har skapats med den här självstudien kan du ta bort resursgruppen. När du tar bort resursgruppen raderas även det kopplade HDInsight-klustret och eventuella andra resurser som är associerade med resursgruppen.
Ta bort en resursgrupp med Azure Portal:
- I Azure Portal expanderar du menyn på vänster sida för att öppna tjänstemenyn och väljer sedan Resursgrupper för att visa listan med dina resursgrupper.
- Leta reda på den resursgrupp du vill ta bort och högerklicka på knappen Mer (...) till höger om listan.
- Välj Ta bort resursgrupp och bekräfta.
Nästa steg
I det här dokumentet har du lärt dig använda Apache Kafka Streams-API med Kafka i HDInsight. Använd följande för att lära dig mer om att arbeta med Kafka.