Použití Apache Kafka ve službě HDInsight s Azure IoT Hub

Zjistěte, jak pomocí konektoru Apache Kafka Připojení Azure IoT Hub přesouvat data mezi Apache Kafka ve službě HDInsight a Azure IoT Hub. V tomto dokumentu se dozvíte, jak spustit konektor IoT Hub z hraničního uzlu v clusteru.

Rozhraní Kafka Připojení API umožňuje implementovat konektory, které průběžně načítá data do Kafka, nebo odesílat data ze systému Kafka do jiného systému. Připojení Azure IoT Hub Apache Kafka je konektor, který načítá data z Azure IoT Hub do Kafka. Může také odesílat data ze systému Kafka do IoT Hub.

Při načítání z IoT Hub použijete zdrojový konektor. Při nasdílení do IoT Hub použijete konektor jímky. Konektor IoT Hub poskytuje konektory zdroje i jímky.

Následující diagram znázorňuje tok dat mezi Azure IoT Hub a Kafka ve službě HDInsight při použití konektoru.

Image showing data flowing from IoT Hub to Kafka through the connector

Další informace o rozhraní API Připojení naleznete v tématu https://kafka.apache.org/documentation/#connect.

Požadavky

Sestavení konektoru

  1. Stáhněte zdroj konektoru z https://github.com/Azure/toketi-kafka-connect-iothub/ místního prostředí.

  2. Z příkazového řádku přejděte do toketi-kafka-connect-iothub-master adresáře. Pak pomocí následujícího příkazu sestavte a zabalte projekt:

    sbt assembly
    

    Dokončení sestavení bude trvat několik minut. Příkaz vytvoří soubor pojmenovaný kafka-connect-iothub-assembly_2.11-0.7.0.jartoketi-kafka-connect-iothub-master\target\scala-2.11 v adresáři projektu.

Instalace konektoru

  1. Upload soubor .jar k hraničnímu uzlu vašeho Kafka v clusteru HDInsight. Upravte následující příkaz nahrazením CLUSTERNAME skutečného názvu clusteru. Výchozí hodnoty uživatelského účtu SSH a názvu hraničního uzlu se používají níže, podle potřeby je upravte.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Po dokončení kopírování souboru se připojte k hraničnímu uzlu pomocí SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Pokud chcete konektor nainstalovat do adresáře Kafka libs , použijte následující příkaz:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Ponechte připojení SSH aktivní pro zbývající kroky.

Konfigurace Apache Kafka

Z připojení SSH k hraničnímu uzlu pomocí následujícího postupu nakonfigurujte Kafka tak, aby spouštět konektor v samostatném režimu:

  1. Nastavte proměnnou hesla. Nahraďte HESLO přihlašovacím heslem clusteru a zadejte příkaz:

    export password='PASSWORD'
    
  2. Nainstalujte nástroj jq . Jq usnadňuje zpracování dokumentů JSON vrácených z dotazů Ambari. Zadejte následující příkaz:

    sudo apt -y install jq
    
  3. Získejte adresu zprostředkovatelů Kafka. V clusteru může být mnoho zprostředkovatelů, ale stačí odkazovat pouze na jednu nebo dvě. Adresu dvou hostitelů zprostředkovatele získáte pomocí následujícího příkazu:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Zkopírujte hodnoty pro pozdější použití. Vrácená hodnota je obdobná následujícímu textu:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Získejte adresu uzlů Apache Zookeeper. V clusteru je několik uzlů Zookeeper, ale stačí odkazovat pouze na jeden nebo dva uzly. Pomocí následujícího příkazu uložte adresy do proměnné KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Při spuštění konektoru v samostatném režimu /usr/hdp/current/kafka-broker/config/connect-standalone.properties se soubor používá ke komunikaci s zprostředkovateli Kafka. Pokud chcete soubor upravit connect-standalone.properties , použijte následující příkaz:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Proveďte následující úpravy:

    Aktuální hodnota Nová hodnota Komentář
    bootstrap.servers=localhost:9092 localhost:9092 Nahraďte hodnotu hostiteli zprostředkovatele z předchozího kroku. Nakonfiguruje samostatnou konfiguraci hraničního uzlu pro vyhledání zprostředkovatelů Kafka.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Tato změna umožňuje testovat pomocí producenta konzoly, který je součástí kafka. Možná budete potřebovat různé převaděče pro jiné producenty a spotřebitele. Informace o použití jiných hodnot převaděče naleznete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Platí to samé jako výše.
    consumer.max.poll.records=10 Přidejte na konec souboru. Touto změnou zabráníte vypršení časových limitů konektoru jímky tím, že ho omezíte na 10 záznamů najednou. Další informace naleznete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Pokud chcete soubor uložit, použijte ctrl + X, Y a pak Enter.

  8. Pokud chcete vytvořit témata používaná konektorem, použijte následující příkazy:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    K ověření existence těchto iotin témat iotout použijte následující příkaz:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Téma iotin slouží k příjmu zpráv z IoT Hub. Téma iotout slouží k odesílání zpráv do IoT Hub.

Získání informací o připojení IoT Hub

Pokud chcete načíst informace ioT Hubu používané konektorem, postupujte následovně:

  1. Získejte koncový bod kompatibilní s centrem událostí a název koncového bodu kompatibilního s centrem událostí pro vaše centrum IoT. K získání těchto informací použijte jednu z následujících metod:

    • V Azure Portal použijte následující kroky:

      1. Přejděte na IoT Hub a vyberte Koncové body.

      2. V předdefinovaných koncových bodech vyberte Události.

      3. Z vlastnosti zkopírujte hodnotu následujících polí:

        • Název kompatibilní s centrem událostí
        • Koncový bod kompatibilní s centrem událostí
        • Oddíly

        Důležité

        Hodnota koncového bodu z portálu může obsahovat další text, který není v tomto příkladu potřeba. Extrahujte text, který odpovídá tomuto vzoru sb://<randomnamespace>.servicebus.windows.net/.

    • V Azure CLI použijte následující příkaz:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Nahraďte myhubname názvem vašeho centra IoT. Odpověď je podobná následujícímu textu:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Získejte zásady a klíčsdíleného přístupu. V tomto příkladu použijte klíč služby . K získání těchto informací použijte jednu z následujících metod:

    • V Azure Portal použijte následující kroky:

      1. Vyberte zásady sdíleného přístupu a pak vyberte službu.
      2. Zkopírujte hodnotu primárního klíče .
      3. Zkopírujte hodnotu připojovacího řetězce – primární klíč .
    • V Azure CLI použijte následující příkaz:

      1. Pokud chcete získat hodnotu primárního klíče, použijte následující příkaz:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Nahraďte myhubname názvem vašeho centra IoT. Odpověď je primárním klíčem k zásadám service pro toto centrum.

      2. Pokud chcete získat připojovací řetězec pro zásadu service , použijte následující příkaz:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        Nahraďte myhubname názvem vašeho centra IoT. Odpověď je připojovací řetězec pro zásadu service .

Konfigurace zdrojového připojení

Pokud chcete zdroj nakonfigurovat tak, aby fungoval s vaším IoT Hub, proveďte následující akce z připojení SSH k hraničnímu uzlu:

  1. Vytvořte kopii connect-iot-source.properties souboru v /usr/hdp/current/kafka-broker/config/ adresáři. Pokud chcete stáhnout soubor z projektu toketi-kafka-connect-iothub, použijte následující příkaz:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Pokud chcete soubor upravit connect-iot-source.properties a přidat informace služby IoT Hub, použijte následující příkaz:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. V editoru vyhledejte a změňte následující položky:

    Aktuální hodnota Upravit
    Kafka.Topic=PLACEHOLDER Nahraďte PLACEHOLDER za iotin (Jak velká může být moje znalostní báze?). Zprávy přijaté ze služby IoT Hub se umístí do iotin tématu.
    IotHub.EventHubCompatibleName=PLACEHOLDER Nahraďte PLACEHOLDER názvem kompatibilním s centrem událostí.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Nahraďte PLACEHOLDER koncovým bodem kompatibilním s centrem událostí.
    IotHub.AccessKeyName=PLACEHOLDER Nahraďte PLACEHOLDER za service (Jak velká může být moje znalostní báze?).
    IotHub.AccessKeyValue=PLACEHOLDER Nahraďte PLACEHOLDER primárním klíčem service zásady.
    IotHub.Partitions=PLACEHOLDER Nahraďte PLACEHOLDER počtem oddílů z předchozích kroků.
    IotHub.StartTime=PLACEHOLDER Nahraďte PLACEHOLDER datem UTC. Toto datum je, když konektor začne kontrolovat zprávy. Formát data je yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Nahraďte 100 za 5 (Jak velká může být moje znalostní báze?). Tato změna způsobí, že konektor bude číst zprávy do Kafka, jakmile bude v IoT Hubu pět nových zpráv.

    Příklad konfigurace najdete v tématu Kafka Připojení Source Connector pro Azure IoT Hub.

  4. Pokud chcete změny uložit, použijte kombinaci kláves Ctrl+ X, Y a pak Enter.

Další informace o konfiguraci zdroje konektoru najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Konfigurace připojení jímky

Pokud chcete nakonfigurovat připojení jímky tak, aby fungovalo s vaším IoT Hub, proveďte následující akce z připojení SSH k hraničnímu uzlu:

  1. Vytvořte kopii connect-iothub-sink.properties souboru v /usr/hdp/current/kafka-broker/config/ adresáři. Pokud chcete stáhnout soubor z projektu toketi-kafka-connect-iothub, použijte následující příkaz:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Pokud chcete soubor upravit connect-iothub-sink.properties a přidat informace služby IoT Hub, použijte následující příkaz:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. V editoru vyhledejte a změňte následující položky:

    Aktuální hodnota Upravit
    topics=PLACEHOLDER Nahraďte PLACEHOLDER za iotout (Jak velká může být moje znalostní báze?). Zprávy napsané do iotout tématu se přeposílají do centra IoT.
    IotHub.ConnectionString=PLACEHOLDER Nahraďte PLACEHOLDER připojovacím řetězcem pro zásadu service .

    Příklad konfigurace najdete v tématu Kafka Připojení Sink Connector pro Azure IoT Hub.

  4. Pokud chcete změny uložit, použijte kombinaci kláves Ctrl+ X, Y a pak Enter.

Další informace o konfiguraci jímky konektoru najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Spuštění zdrojového konektoru

  1. Pokud chcete spustit zdrojový konektor, použijte následující příkaz z připojení SSH k hraničnímu uzlu:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Po spuštění konektoru odešlete zprávy do ioT Hubu z vašich zařízení. Protože konektor čte zprávy ze služby IoT Hub a ukládá je do tématu Kafka, protokoluje informace do konzoly:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Poznámka

    Při spuštění konektoru se může zobrazit několik upozornění. Tato upozornění nezpůsobují problémy s příjmem zpráv ze služby IoT Hub.

  2. Zastavte konektor po několika minutách pomocí kombinace kláves Ctrl + C dvakrát. Zastavení konektoru bude trvat několik minut.

Spuštění konektoru jímky

Z připojení SSH k hraničnímu uzlu pomocí následujícího příkazu spusťte konektor jímky v samostatném režimu:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

Při spuštění konektoru se zobrazí informace podobné následujícímu textu:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Poznámka

Při spuštění konektoru si můžete všimnout několika upozornění. Ta můžete bezpečně ignorovat.

Odesílání zpráv

Pokud chcete odesílat zprávy prostřednictvím konektoru, postupujte následovně:

  1. Otevřete druhou relaci SSH pro cluster Kafka:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Získejte adresu zprostředkovatelů Kafka pro novou relaci SSH. Heslo nahraďte heslem pro přihlášení ke clusteru a zadejte příkaz:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Pokud chcete odesílat zprávy do iotout tématu, použijte následující příkaz:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Tento příkaz vás nevrátí do normálního příkazového řádku Bash. Místo toho odesílá vstup klávesnice do iotout tématu.

  4. Pokud chcete odeslat zprávu do zařízení, vložte dokument JSON do relace SSH pro dané kafka-console-producerzařízení .

    Důležité

    Je nutné nastavit hodnotu "deviceId" položky na ID vašeho zařízení. V následujícím příkladu má zařízení název myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Schéma tohoto dokumentu JSON je podrobněji popsáno na https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mdadrese .

    Pokud používáte simulované zařízení Raspberry Pi a je spuštěné, zařízení zaprotokoluje následující zprávu:

    Receive message: Turn On
    

    Znovu odešlete dokument JSON, ale změňte hodnotu "message" položky. Nová hodnota se zaprotokoluje zařízením.

Další informace o používání konektoru jímky najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Další kroky

V tomto dokumentu jste zjistili, jak pomocí rozhraní Apache Kafka Připojení API spustit konektor IoT Kafka ve službě HDInsight. Pomocí následujících odkazů můžete zjistit další způsoby práce se systémem Kafka: