Použití Apache Kafka ve službě HDInsight s Azure IoT Hub
Zjistěte, jak pomocí konektoru Apache Kafka Připojení Azure IoT Hub přesouvat data mezi Apache Kafka ve službě HDInsight a Azure IoT Hub. V tomto dokumentu se dozvíte, jak spustit konektor IoT Hub z hraničního uzlu v clusteru.
Rozhraní Kafka Připojení API umožňuje implementovat konektory, které průběžně načítá data do Kafka, nebo odesílat data ze systému Kafka do jiného systému. Připojení Azure IoT Hub Apache Kafka je konektor, který načítá data z Azure IoT Hub do Kafka. Může také odesílat data ze systému Kafka do IoT Hub.
Při načítání z IoT Hub použijete zdrojový konektor. Při nasdílení do IoT Hub použijete konektor jímky. Konektor IoT Hub poskytuje konektory zdroje i jímky.
Následující diagram znázorňuje tok dat mezi Azure IoT Hub a Kafka ve službě HDInsight při použití konektoru.
Další informace o rozhraní API Připojení naleznete v tématu https://kafka.apache.org/documentation/#connect.
Požadavky
Cluster Apache Kafka ve službě HDInsight. Další informace najdete v dokumentu Rychlý start k systému Kafka ve službě HDInsight.
Hraniční uzel v clusteru Kafka. Další informace najdete v dokumentu Použití hraničních uzlů se službou HDInsight .
Klient SSH. Další informace najdete v tématu Připojení ke službě HDInsight (Apache Hadoop) pomocí SSH.
Azure IoT Hub a zařízení Pro účely tohoto článku zvažte použití online simulátoru Raspberry Pi Připojení k Azure IoT Hub.
Sestavení konektoru
Stáhněte zdroj konektoru z https://github.com/Azure/toketi-kafka-connect-iothub/ místního prostředí.
Z příkazového řádku přejděte do
toketi-kafka-connect-iothub-master
adresáře. Pak pomocí následujícího příkazu sestavte a zabalte projekt:sbt assembly
Dokončení sestavení bude trvat několik minut. Příkaz vytvoří soubor pojmenovaný
kafka-connect-iothub-assembly_2.11-0.7.0.jar
toketi-kafka-connect-iothub-master\target\scala-2.11
v adresáři projektu.
Instalace konektoru
Upload soubor .jar k hraničnímu uzlu vašeho Kafka v clusteru HDInsight. Upravte následující příkaz nahrazením
CLUSTERNAME
skutečného názvu clusteru. Výchozí hodnoty uživatelského účtu SSH a názvu hraničního uzlu se používají níže, podle potřeby je upravte.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Po dokončení kopírování souboru se připojte k hraničnímu uzlu pomocí SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Pokud chcete konektor nainstalovat do adresáře Kafka
libs
, použijte následující příkaz:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Ponechte připojení SSH aktivní pro zbývající kroky.
Konfigurace Apache Kafka
Z připojení SSH k hraničnímu uzlu pomocí následujícího postupu nakonfigurujte Kafka tak, aby spouštět konektor v samostatném režimu:
Nastavte proměnnou hesla. Nahraďte HESLO přihlašovacím heslem clusteru a zadejte příkaz:
export password='PASSWORD'
Nainstalujte nástroj jq . Jq usnadňuje zpracování dokumentů JSON vrácených z dotazů Ambari. Zadejte následující příkaz:
sudo apt -y install jq
Získejte adresu zprostředkovatelů Kafka. V clusteru může být mnoho zprostředkovatelů, ale stačí odkazovat pouze na jednu nebo dvě. Adresu dvou hostitelů zprostředkovatele získáte pomocí následujícího příkazu:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Zkopírujte hodnoty pro pozdější použití. Vrácená hodnota je obdobná následujícímu textu:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Získejte adresu uzlů Apache Zookeeper. V clusteru je několik uzlů Zookeeper, ale stačí odkazovat pouze na jeden nebo dva uzly. Pomocí následujícího příkazu uložte adresy do proměnné
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Při spuštění konektoru v samostatném režimu
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
se soubor používá ke komunikaci s zprostředkovateli Kafka. Pokud chcete soubor upravitconnect-standalone.properties
, použijte následující příkaz:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Proveďte následující úpravy:
Aktuální hodnota Nová hodnota Komentář bootstrap.servers=localhost:9092
localhost:9092
Nahraďte hodnotu hostiteli zprostředkovatele z předchozího kroku.Nakonfiguruje samostatnou konfiguraci hraničního uzlu pro vyhledání zprostředkovatelů Kafka. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Tato změna umožňuje testovat pomocí producenta konzoly, který je součástí kafka. Možná budete potřebovat různé převaděče pro jiné producenty a spotřebitele. Informace o použití jiných hodnot převaděče naleznete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Platí to samé jako výše. – consumer.max.poll.records=10
Přidejte na konec souboru. Touto změnou zabráníte vypršení časových limitů konektoru jímky tím, že ho omezíte na 10 záznamů najednou. Další informace naleznete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Pokud chcete soubor uložit, použijte ctrl + X, Y a pak Enter.
Pokud chcete vytvořit témata používaná konektorem, použijte následující příkazy:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
K ověření existence těchto
iotin
tématiotout
použijte následující příkaz:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Téma
iotin
slouží k příjmu zpráv z IoT Hub. Témaiotout
slouží k odesílání zpráv do IoT Hub.
Získání informací o připojení IoT Hub
Pokud chcete načíst informace ioT Hubu používané konektorem, postupujte následovně:
Získejte koncový bod kompatibilní s centrem událostí a název koncového bodu kompatibilního s centrem událostí pro vaše centrum IoT. K získání těchto informací použijte jednu z následujících metod:
V Azure Portal použijte následující kroky:
Přejděte na IoT Hub a vyberte Koncové body.
V předdefinovaných koncových bodech vyberte Události.
Z vlastnosti zkopírujte hodnotu následujících polí:
- Název kompatibilní s centrem událostí
- Koncový bod kompatibilní s centrem událostí
- Oddíly
Důležité
Hodnota koncového bodu z portálu může obsahovat další text, který není v tomto příkladu potřeba. Extrahujte text, který odpovídá tomuto vzoru
sb://<randomnamespace>.servicebus.windows.net/
.
V Azure CLI použijte následující příkaz:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Nahraďte
myhubname
názvem vašeho centra IoT. Odpověď je podobná následujícímu textu:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Získejte zásady a klíčsdíleného přístupu. V tomto příkladu použijte klíč služby . K získání těchto informací použijte jednu z následujících metod:
V Azure Portal použijte následující kroky:
- Vyberte zásady sdíleného přístupu a pak vyberte službu.
- Zkopírujte hodnotu primárního klíče .
- Zkopírujte hodnotu připojovacího řetězce – primární klíč .
V Azure CLI použijte následující příkaz:
Pokud chcete získat hodnotu primárního klíče, použijte následující příkaz:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Nahraďte
myhubname
názvem vašeho centra IoT. Odpověď je primárním klíčem k zásadámservice
pro toto centrum.Pokud chcete získat připojovací řetězec pro zásadu
service
, použijte následující příkaz:az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
Nahraďte
myhubname
názvem vašeho centra IoT. Odpověď je připojovací řetězec pro zásaduservice
.
Konfigurace zdrojového připojení
Pokud chcete zdroj nakonfigurovat tak, aby fungoval s vaším IoT Hub, proveďte následující akce z připojení SSH k hraničnímu uzlu:
Vytvořte kopii
connect-iot-source.properties
souboru v/usr/hdp/current/kafka-broker/config/
adresáři. Pokud chcete stáhnout soubor z projektu toketi-kafka-connect-iothub, použijte následující příkaz:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Pokud chcete soubor upravit
connect-iot-source.properties
a přidat informace služby IoT Hub, použijte následující příkaz:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
V editoru vyhledejte a změňte následující položky:
Aktuální hodnota Upravit Kafka.Topic=PLACEHOLDER
Nahraďte PLACEHOLDER
zaiotin
(Jak velká může být moje znalostní báze?). Zprávy přijaté ze služby IoT Hub se umístí doiotin
tématu.IotHub.EventHubCompatibleName=PLACEHOLDER
Nahraďte PLACEHOLDER
názvem kompatibilním s centrem událostí.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Nahraďte PLACEHOLDER
koncovým bodem kompatibilním s centrem událostí.IotHub.AccessKeyName=PLACEHOLDER
Nahraďte PLACEHOLDER
zaservice
(Jak velká může být moje znalostní báze?).IotHub.AccessKeyValue=PLACEHOLDER
Nahraďte PLACEHOLDER
primárním klíčemservice
zásady.IotHub.Partitions=PLACEHOLDER
Nahraďte PLACEHOLDER
počtem oddílů z předchozích kroků.IotHub.StartTime=PLACEHOLDER
Nahraďte PLACEHOLDER
datem UTC. Toto datum je, když konektor začne kontrolovat zprávy. Formát data jeyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Nahraďte 100
za5
(Jak velká může být moje znalostní báze?). Tato změna způsobí, že konektor bude číst zprávy do Kafka, jakmile bude v IoT Hubu pět nových zpráv.Příklad konfigurace najdete v tématu Kafka Připojení Source Connector pro Azure IoT Hub.
Pokud chcete změny uložit, použijte kombinaci kláves Ctrl+ X, Y a pak Enter.
Další informace o konfiguraci zdroje konektoru najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Konfigurace připojení jímky
Pokud chcete nakonfigurovat připojení jímky tak, aby fungovalo s vaším IoT Hub, proveďte následující akce z připojení SSH k hraničnímu uzlu:
Vytvořte kopii
connect-iothub-sink.properties
souboru v/usr/hdp/current/kafka-broker/config/
adresáři. Pokud chcete stáhnout soubor z projektu toketi-kafka-connect-iothub, použijte následující příkaz:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Pokud chcete soubor upravit
connect-iothub-sink.properties
a přidat informace služby IoT Hub, použijte následující příkaz:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
V editoru vyhledejte a změňte následující položky:
Aktuální hodnota Upravit topics=PLACEHOLDER
Nahraďte PLACEHOLDER
zaiotout
(Jak velká může být moje znalostní báze?). Zprávy napsané doiotout
tématu se přeposílají do centra IoT.IotHub.ConnectionString=PLACEHOLDER
Nahraďte PLACEHOLDER
připojovacím řetězcem pro zásaduservice
.Příklad konfigurace najdete v tématu Kafka Připojení Sink Connector pro Azure IoT Hub.
Pokud chcete změny uložit, použijte kombinaci kláves Ctrl+ X, Y a pak Enter.
Další informace o konfiguraci jímky konektoru najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Spuštění zdrojového konektoru
Pokud chcete spustit zdrojový konektor, použijte následující příkaz z připojení SSH k hraničnímu uzlu:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Po spuštění konektoru odešlete zprávy do ioT Hubu z vašich zařízení. Protože konektor čte zprávy ze služby IoT Hub a ukládá je do tématu Kafka, protokoluje informace do konzoly:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Poznámka
Při spuštění konektoru se může zobrazit několik upozornění. Tato upozornění nezpůsobují problémy s příjmem zpráv ze služby IoT Hub.
Zastavte konektor po několika minutách pomocí kombinace kláves Ctrl + C dvakrát. Zastavení konektoru bude trvat několik minut.
Spuštění konektoru jímky
Z připojení SSH k hraničnímu uzlu pomocí následujícího příkazu spusťte konektor jímky v samostatném režimu:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Při spuštění konektoru se zobrazí informace podobné následujícímu textu:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Poznámka
Při spuštění konektoru si můžete všimnout několika upozornění. Ta můžete bezpečně ignorovat.
Odesílání zpráv
Pokud chcete odesílat zprávy prostřednictvím konektoru, postupujte následovně:
Otevřete druhou relaci SSH pro cluster Kafka:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Získejte adresu zprostředkovatelů Kafka pro novou relaci SSH. Heslo nahraďte heslem pro přihlášení ke clusteru a zadejte příkaz:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Pokud chcete odesílat zprávy do
iotout
tématu, použijte následující příkaz:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Tento příkaz vás nevrátí do normálního příkazového řádku Bash. Místo toho odesílá vstup klávesnice do
iotout
tématu.Pokud chcete odeslat zprávu do zařízení, vložte dokument JSON do relace SSH pro dané
kafka-console-producer
zařízení .Důležité
Je nutné nastavit hodnotu
"deviceId"
položky na ID vašeho zařízení. V následujícím příkladu má zařízení názevmyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Schéma tohoto dokumentu JSON je podrobněji popsáno na https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mdadrese .
Pokud používáte simulované zařízení Raspberry Pi a je spuštěné, zařízení zaprotokoluje následující zprávu:
Receive message: Turn On
Znovu odešlete dokument JSON, ale změňte hodnotu
"message"
položky. Nová hodnota se zaprotokoluje zařízením.
Další informace o používání konektoru jímky najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Další kroky
V tomto dokumentu jste zjistili, jak pomocí rozhraní Apache Kafka Připojení API spustit konektor IoT Kafka ve službě HDInsight. Pomocí následujících odkazů můžete zjistit další způsoby práce se systémem Kafka: