Kurz: Použití rozhraní Apache Kafka Producer and Consumer API
Zjistěte, jak používat rozhraní Apache Kafka Producer and Consumer API se systémem Kafka ve službě HDInsight.
Rozhraní Kafka Producer API umožňuje aplikacím odesílat datové proudy do clusteru Kafka. Rozhraní Kafka Consumer API umožňuje aplikacím číst datové proudy z clusteru.
V tomto kurzu se naučíte:
- Požadavky
- Vysvětlení kódu
- Sestavení a nasazení aplikace
- Spuštění aplikace v clusteru
Další informace o rozhraních API najdete v dokumentaci k rozhraní Producer API a Consumer API na webu Apache.
Požadavky
- Apache Kafka v clusteru HDInsight. Informace o tom, jak vytvořit cluster, najdete v tématu Začínáme s Apache Kafka v HDInsight.
- Java Developer Kit (JDK) verze 8 nebo ekvivalent, jako je například OpenJDK.
- Apache Maven správně nainstalované v souladu s Apache. Maven je systém sestavení projektu pro projekty v jazyce Java.
- Klient SSH, jako je například výstup. Další informace najdete v tématu Připojení ke službě HDInsight (Apache Hadoop) pomocí SSH.
Vysvětlení kódu
Ukázková aplikace je umístěna https://github.com/Azure-Samples/hdinsight-kafka-java-get-started v Producer-Consumer podadresáři v adresáři. pokud používáte cluster Kafka s povoleným Balíček zabezpečení podniku (ESP) , měli byste použít verzi aplikace, která se nachází v DomainJoined-Producer-Consumer podadresáři.
Aplikace se skládá primárně ze čtyř souborů:
pom.xml: Tento soubor definuje závislosti projektu, verzi Javy a metody balení.Producer.java: Tento soubor pomocí rozhraní Producer API odesílá do systému Kafka náhodné věty.Consumer.java: Tento soubor pomocí rozhraní Consumer API čte data ze systému Kafka a posílá je do výstupu STDOUT.AdminClientWrapper.java: Tento soubor používá rozhraní API pro správu k vytváření, popisu a odstraňování Kafkach témat.Run.java: Rozhraní příkazového řádku, které slouží ke spuštění kódu producenta a konzumenta.
Pom.xml
V souboru pom.xml je důležité porozumět následujícímu:
Závislosti: Tento projekt spoléhá na rozhraní Kafka Producer and Consumer API, která jsou součástí balíčku
kafka-clients. Tuto závislost definuje následující kód XML:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>Položka
${kafka.version}se deklaruje v části<properties>..</properties>souborupom.xmla je nakonfigurovaná na verzi systému Kafka v clusteru HDInsight.Moduly plug-in: Moduly plug-in Mavenu poskytují různé funkce. V tomto projektu se používají následující moduly plug-in:
maven-compiler-plugin: Slouží k nastavení verze Javy, kterou projekt používá, na 8. To je verze Javy, kterou používá HDInsight 3.6.maven-shade-plugin: Slouží k vygenerování souboru JAR, který obsahuje tuto aplikaci i všechny závislosti. Používá se také k nastavení vstupního bodu aplikace, abyste mohli přímo spustit soubor JAR bez nutnosti zadávat hlavní třídu.
Producer.java
Producent komunikuje s hostiteli zprostředkovatelů Kafka (pracovní uzly) a odesílá data do tématu Kafka. následující fragment kódu pochází ze souboru producent. java z GitHubového úložiště a ukazuje, jak nastavit vlastnosti producenta. u clusterů s podporou zabezpečení Enterprise musí být přidána další vlastnost "properties. setProperty (CommonClientConfigs.SECURITY_PROTOCOL_CONFIG," SASL_PLAINTEXT ");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Konzument komunikuje s hostiteli zprostředkovatelů Kafka (pracovní uzly) a ve smyčce čte záznamy. Následující fragment kódu ze souboru Consumer. Java nastaví vlastnosti příjemce. u clusterů s podporou zabezpečení Enterprise musí být přidána další vlastnost "properties. setProperty (CommonClientConfigs.SECURITY_PROTOCOL_CONFIG," SASL_PLAINTEXT ");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
V tomto kódu je konzument nakonfigurovaný tak, aby četl od začátku tématu (hodnota auto.offset.reset je nastavená na earliest).
Run.java
Soubor Run. Java poskytuje rozhraní příkazového řádku, které spouští buď producenta, nebo kód příjemce. Jako parametr je potřeba zadat informace o hostiteli zprostředkovatele Kafka. Volitelně můžete zahrnout hodnotu ID skupiny, kterou používá proces příjemce. Pokud vytvoříte více instancí příjemce pomocí stejného ID skupiny, vyčtou se z tématu čtení z vyrovnávání zatížení.
Sestavení a nasazení příkladu
Použití předem připravených souborů JAR
stáhněte si jar z ukázky Azure pro Kafka Začínáme. pokud je váš cluster Balíček zabezpečení podniku (ESP) povolený, použijte kafka-producer-consumer-esp. jar. K zkopírování jar do clusteru použijte následující příkaz.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Sestavení souborů JAR z kódu
Chcete-li tento krok přeskočit, lze předem sestavené jar stáhnout z Prebuilt-Jars podadresáře. Stáhněte si Kafka-Producer-Consumer. jar. pokud je váš cluster Balíček zabezpečení podniku (ESP) povolený, použijte kafka-producer-consumer-esp. jar. Spusťte krok 3 ke zkopírování jar do clusteru HDInsight.
Stáhněte a extrahujte příklady z https://github.com/Azure-Samples/hdinsight-kafka-java-get-started .
Nastavte aktuální adresář na umístění
hdinsight-kafka-java-get-started\Producer-Consumeradresáře. pokud používáte cluster Kafka s povoleným Balíček zabezpečení podniku (ESP) , měli byste nastavit umístění naDomainJoined-Producer-Consumerpodadresář. Pomocí následujícího příkazu sestavte aplikaci:mvn clean packageTento příkaz vytvoří adresář s názvem
target, který bude obsahovat soubor s názvemkafka-producer-consumer-1.0-SNAPSHOT.jar. Pro clustery ESP bude souborkafka-producer-consumer-esp-1.0-SNAPSHOT.jarMísto
sshuserpoužijte jméno uživatele SSH pro váš cluster a místoCLUSTERNAMEzadejte název clusteru. Zadejte následující příkaz, který zkopírujekafka-producer-consumer-1.0-SNAPSHOT.jarsoubor do clusteru HDInsight. Po zobrazení výzvy zadejte heslo uživatele SSH.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Spustit příklad
Místo
sshuserpoužijte jméno uživatele SSH pro váš cluster a místoCLUSTERNAMEzadejte název clusteru. Zadáním následujícího příkazu otevřete připojení SSH ke clusteru. Pokud se zobrazí výzva, zadejte heslo uživatelského účtu SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.netPokud chcete získat hostitele zprostředkovatele Kafka, nahraďte hodnoty pro
<clustername>a<password>v následujícím příkazu a spusťte ho. Použijte stejnou velikost písmen pro<clustername>, jak je znázorněno v Azure Portal. Nahraďte<password>heslem přihlášení clusteru a pak spusťte:sudo apt -y install jq export clusterName='<clustername>' export password='<password>' export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);Poznámka
Tento příkaz vyžaduje přístup Ambari. Pokud je váš cluster za NSG, spusťte tento příkaz z počítače, který má přístup k Ambari.
Vytvořte Kafka téma,
myTesta to tak, že zadáte následující příkaz:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERSPokud chcete spustit producenta a zapsat data do tématu, použijte následující příkaz:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERSJakmile bude producent hotový, pomocí následujícího příkazu zahajte čtení z tématu:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jarZobrazí se počet načtených záznamů spolu s celkovým počtem.
Konzumenta ukončíte stisknutím Ctrl+C.
Víc současných konzumentů
Konzumenti Kafka při čtení záznamů používají skupiny konzumentů. Výsledkem použití skupiny s více konzumenty je vyvážení zatížení při čtení záznamů z tématu. Každý konzument ze skupiny obdrží určitou část záznamů.
Aplikace konzumenta přijímá parametr, který se použije jako ID skupiny. Například následující příkaz spustí konzumenta s použitím ID skupiny myGroup:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Konzumenta ukončíte stisknutím Ctrl+C.
Pokud chcete vidět tento proces v akci, použijte následující příkaz:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Tento příkaz pomocí tmux rozdělí terminál do dvou sloupců. V obou sloupcích je spuštěný konzument se stejnou hodnotou ID skupiny. Jakmile konzumenti dokončí čtení, všimněte si, že oba přečetli pouze část záznamů. Pro ukončení použijte dvakrát kombinaci kláves CTRL + C tmux .
Konzumace klienty ze stejné skupiny se realizuje rozdělením tématu na oddíly. V tomto vzorovém kódu má dříve vytvořené téma test osm oddílů. Pokud spustíte osm konzumentů, každý z nich bude číst záznamy z jednoho oddílu tématu.
Důležité
Ve skupině příjemců nemůže být víc instancí konzumentů než má téma oddílů. V tomto příkladu může skupina konzumentů obsahovat až osm konzumentů, protože to je počet oddílů tématu. Nebo můžete mít více skupin konzumentů, každou s maximálně osmi konzumenty.
Záznamy uložené v Kafka jsou uloženy v pořadí, v jakém jsou přijímány v rámci oddílu. Pro dosažení doručování záznamů ve správném pořadí v rámci oddílu vytvořte skupinu příjemců, ve které bude počet instancí konzumentů odpovídat počtu oddílů. Pro dosažení doručování záznamů ve správném pořadí v rámci tématu vytvořte skupinu obsahující pouze jednu instanci konzumenta.
Běžné problémy
Nepovedlo se vytvořit téma pokud je váš cluster Enterprise povolený, použijte předem připravené soubory JAR pro producenta a příjemce. Sklenice ESP může být sestavena z kódu v
DomainJoined-Producer-Consumerpodadresáři. Vlastnosti producent a příjemce mají dodatečnou vlastnostCommonClientConfigs.SECURITY_PROTOCOL_CONFIGpro clustery s podporou protokolu ESP.Selhání v clusterech s povoleným protokolem ESP: Pokud dojde k selhání operací a používání clusteru s povoleným protokolem ESP, ověřte, jestli
kafkase uživatel nachází ve všech zásadách Ranger. Pokud není k dispozici, přidejte ji do všech zásad Ranger.
Vyčištění prostředků
Pokud chcete vyčistit prostředky vytvořené v tomto kurzu, můžete odstranit skupinu prostředků. Odstraněním skupiny prostředků odstraníte také přidružený cluster HDInsight a všechny další prostředky, které jsou k příslušné skupině prostředků přidružené.
Odebrání skupiny prostředků pomocí webu Azure Portal:
- Na webu Azure Portal rozbalením nabídky na levé straně otevřete nabídku služeb a pak zvolte Skupiny prostředků. Zobrazí se seznam skupin prostředků.
- Vyhledejte skupinu prostředků, kterou chcete odstranit, a klikněte pravým tlačítkem na tlačítko Další (...) na pravé straně seznamu.
- Vyberte Odstranit skupinu prostředků a potvrďte tuto akci.
Další kroky
V tomto dokumentu jste zjistili, jak používat Apache Kafkaho výrobce a zákaznického rozhraní API s Kafka ve službě HDInsight. Další informace o práci s platformou Kafka najdete v těchto zdrojích: