Použití Apache Kafka ve službě HDInsight se službou Azure IoT Hub

Zjistěte, jak pomocí konektoru Apache Kafka Připojení Azure IoT Hubu přesouvat data mezi Apache Kafka ve službě HDInsight a Azure IoT Hubem. V tomto dokumentu se dozvíte, jak spustit konektor IoT Hubu z hraničního uzlu v clusteru.

Rozhraní Kafka Připojení API umožňuje implementovat konektory, které průběžně načítá data do Systému Kafka, nebo odesílat data ze systému Kafka do jiného systému. Apache Kafka Připojení Azure IoT Hub je konektor, který načítá data z Azure IoT Hubu do Kafka. Může také odesílat data ze systému Kafka do IoT Hubu.

Při načítání ze služby IoT Hub použijete zdrojový konektor. Při odesílání do IoT Hubu použijete konektor jímky . Konektor IoT Hubu poskytuje konektory zdroje i jímky.

Následující diagram znázorňuje tok dat mezi Azure IoT Hubem a Kafka ve službě HDInsight při použití konektoru.

Image showing data flowing from IoT Hub to Kafka through the connector.

Další informace o rozhraní PŘIPOJENÍ API najdete v tématu https://kafka.apache.org/documentation/#connect.

Požadavky

Sestavení konektoru

  1. Stáhněte zdroj konektoru z https://github.com/Azure/toketi-kafka-connect-iothub/ místního prostředí.

  2. Z příkazového řádku přejděte do toketi-kafka-connect-iothub-master adresáře. Pak pomocí následujícího příkazu sestavte a zabalte projekt:

    sbt assembly
    

    Dokončení sestavení trvá několik minut. Příkaz vytvoří soubor pojmenovaný kafka-connect-iothub-assembly_2.11-0.7.0.jar v toketi-kafka-connect-iothub-master\target\scala-2.11 adresáři projektu.

Instalace konektoru

  1. Nahrajte soubor .jar do hraničního uzlu vašeho kafka v clusteru HDInsight. Upravte následující příkaz nahrazením CLUSTERNAME skutečného názvu clusteru. Výchozí hodnoty uživatelského účtu SSH a názvu hraničního uzlu se použijí a upraví podle potřeby.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Po dokončení kopírování souboru se připojte k hraničnímu uzlu pomocí SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Pokud chcete konektor nainstalovat do adresáře Kafka libs , použijte následující příkaz:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Ponechte připojení SSH aktivní pro zbývající kroky.

Konfigurace Apache Kafka

Z připojení SSH k hraničnímu uzlu pomocí následujících kroků nakonfigurujte Kafka tak, aby spouštět konektor v samostatném režimu:

  1. Nastavte proměnnou hesla. Nahraďte heslo heslem pro přihlášení ke clusteru a pak zadejte příkaz:

    export password='PASSWORD'
    
  2. Nainstalujte nástroj jq. Jq usnadňuje zpracování dokumentů JSON vrácených z dotazů Ambari. Zadejte tento příkaz:

    sudo apt -y install jq
    
  3. Získejte adresu zprostředkovatelů Kafka. V clusteru může být mnoho zprostředkovatelů, ale stačí odkazovat jenom na jednu nebo dvě. Pokud chcete získat adresu dvou hostitelů zprostředkovatele, použijte následující příkaz:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Zkopírujte hodnoty pro pozdější použití. Vrácená hodnota je obdobná následujícímu textu:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Získejte adresu uzlů Apache Zookeeper. V clusteru je několik uzlů Zookeeper, ale stačí odkazovat jenom na jeden nebo dva. Pomocí následujícího příkazu uložte adresy do proměnné KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Při spuštění konektoru v samostatném režimu /usr/hdp/current/kafka-broker/config/connect-standalone.properties se soubor používá ke komunikaci s zprostředkovateli Kafka. Pokud chcete soubor upravit connect-standalone.properties , použijte následující příkaz:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Proveďte následující úpravy:

    Aktuální hodnota Nová hodnota Komentář
    bootstrap.servers=localhost:9092 localhost:9092 Nahraďte hodnotu hostiteli zprostředkovatele z předchozího kroku. Nakonfiguruje samostatnou konfiguraci hraničního uzlu pro vyhledání zprostředkovatelů Kafka.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Tato změna umožňuje testovat pomocí producenta konzoly, který je součástí systému Kafka. Možná budete potřebovat různé převaděče pro ostatní producenty a spotřebitele. Informace o použití jiných hodnot převaděče naleznete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Stejné jako dané.
    consumer.max.poll.records=10 Přidat na konec souboru Touto změnou zabráníte vypršení časových limitů konektoru jímky tím, že ho omezíte na 10 záznamů najednou. Další informace najdete na webu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Pokud chcete soubor uložit, použijte ctrl + X, Y a pak Enter.

  8. Pokud chcete vytvořit témata používaná konektorem, použijte následující příkazy:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    K ověření iotin existence těchto témat iotout použijte následující příkaz:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Toto iotin téma se používá k příjmu zpráv ze služby IoT Hub. Toto iotout téma se používá k odesílání zpráv do IoT Hubu.

Získání informací o připojení ke službě IoT Hub

Pokud chcete načíst informace o službě IoT Hub používané konektorem, postupujte následovně:

  1. Získejte koncový bod kompatibilní s centrem událostí a název koncového bodu kompatibilního s centrem událostí pro vaše centrum IoT. K získání těchto informací použijte jednu z následujících metod:

    • Na webu Azure Portal použijte následující kroky:

      1. Přejděte do služby IoT Hub a vyberte Koncové body.

      2. V předdefinovaných koncových bodech vyberte Události.

      3. Z vlastností zkopírujte hodnotu následujících polí:

        • Název kompatibilní s centrem událostí
        • Koncový bod kompatibilní s centrem událostí
        • Oddíly

        Důležité

        Hodnota koncového bodu z portálu může obsahovat další text, který není v tomto příkladu potřeba. Extrahujte text, který odpovídá tomuto vzoru sb://<randomnamespace>.servicebus.windows.net/.

    • V Azure CLI použijte následující příkaz:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Nahraďte myhubname názvem vašeho centra IoT. Odpověď je podobná následujícímu textu:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Získejte zásady a klíč sdíleného přístupu. V tomto příkladu použijte klíč služby . K získání těchto informací použijte jednu z následujících metod:

    • Na webu Azure Portal použijte následující kroky:

      1. Vyberte zásady sdíleného přístupu a pak vyberte službu.
      2. Zkopírujte hodnotu primárního klíče .
      3. Zkopírujte Připojení ion řetězec – hodnota primárního klíče.
    • V Azure CLI použijte následující příkaz:

      1. Pokud chcete získat hodnotu primárního klíče, použijte následující příkaz:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Nahraďte myhubname názvem vašeho centra IoT. Odpověď je primárním klíčem k zásadám service pro toto centrum.

      2. K získání připojovací řetězec pro zásadu service použijte následující příkaz:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        Nahraďte myhubname názvem vašeho centra IoT. Odpověď je připojovací řetězec zásadyservice.

Konfigurace zdrojového připojení

Pokud chcete nakonfigurovat zdroj pro práci se službou IoT Hub, proveďte následující akce z připojení SSH k hraničnímu uzlu:

  1. Vytvořte kopii connect-iot-source.properties souboru v /usr/hdp/current/kafka-broker/config/ adresáři. Pokud chcete stáhnout soubor z projektu toketi-kafka-connect-iothub, použijte následující příkaz:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Pokud chcete soubor upravit connect-iot-source.properties a přidat informace o službě IoT Hub, použijte následující příkaz:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. V editoru vyhledejte a změňte následující položky:

    Aktuální hodnota Upravit
    Kafka.Topic=PLACEHOLDER Nahraďte PLACEHOLDERiotin. Zprávy přijaté ze služby IoT Hub se umístí do iotin tématu.
    IotHub.EventHubCompatibleName=PLACEHOLDER Nahraďte PLACEHOLDER názvem kompatibilním s centrem událostí.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Nahraďte PLACEHOLDER koncovým bodem kompatibilním s centrem událostí.
    IotHub.AccessKeyName=PLACEHOLDER Nahraďte PLACEHOLDERservice.
    IotHub.AccessKeyValue=PLACEHOLDER Nahraďte PLACEHOLDER primárním klíčem service zásady.
    IotHub.Partitions=PLACEHOLDER Nahraďte PLACEHOLDER počtem oddílů z předchozích kroků.
    IotHub.StartTime=PLACEHOLDER Nahraďte PLACEHOLDER datem UTC. Toto datum je, když konektor začne kontrolovat zprávy. Formát data je yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Nahraďte 1005. Tato změna způsobí, že konektor bude číst zprávy do Systému Kafka, jakmile bude v IoT Hubu pět nových zpráv.

    Příklad konfigurace najdete v tématu Kafka Připojení Source Připojení or pro Azure IoT Hub.

  4. Pokud chcete uložit změny, použijte ctrl + X, Y a pak Enter.

Další informace o konfiguraci zdroje konektoru najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Konfigurace připojení jímky

Pokud chcete nakonfigurovat připojení jímky tak, aby fungovalo se službou IoT Hub, proveďte následující akce z připojení SSH k hraničnímu uzlu:

  1. Vytvořte kopii connect-iothub-sink.properties souboru v /usr/hdp/current/kafka-broker/config/ adresáři. Pokud chcete stáhnout soubor z projektu toketi-kafka-connect-iothub, použijte následující příkaz:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Pokud chcete soubor upravit connect-iothub-sink.properties a přidat informace o službě IoT Hub, použijte následující příkaz:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. V editoru vyhledejte a změňte následující položky:

    Aktuální hodnota Upravit
    topics=PLACEHOLDER Nahraďte PLACEHOLDERiotout. Zprávy zapsané do iotout tématu se přeposílají do centra IoT.
    IotHub.ConnectionString=PLACEHOLDER Nahraďte PLACEHOLDER připojovací řetězec zásadyservice.

    Příklad konfigurace najdete v tématu Kafka Připojení Sink Připojení or pro Azure IoT Hub.

  4. Pokud chcete uložit změny, použijte ctrl + X, Y a pak Enter.

Další informace o konfiguraci jímky konektoru najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Spuštění zdrojového konektoru

  1. Ke spuštění zdrojového konektoru použijte následující příkaz z připojení SSH k hraničnímu uzlu:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Jakmile se konektor spustí, odešlete zprávy do ioT Hubu z vašich zařízení. Když konektor čte zprávy ze služby IoT Hub a ukládá je do tématu Kafka, protokoluje informace do konzoly:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Poznámka:

    Při spuštění konektoru se může zobrazit několik upozornění. Tato upozornění nezpůsobují problémy s příjmem zpráv ze služby IoT Hub.

  2. Zastavte spojnici po několika minutách pomocí kombinace kláves Ctrl + C dvakrát. Zastavení konektoru trvá několik minut.

Spuštění konektoru jímky

Pomocí následujícího příkazu z připojení SSH ke hraničnímu uzlu spusťte konektor jímky v samostatném režimu:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

Při spuštění konektoru se zobrazí podobné informace jako v následujícím textu:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Poznámka:

Při spuštění konektoru si můžete všimnout několika upozornění. Ta můžete bezpečně ignorovat.

Odesílání zpráv

Pokud chcete odesílat zprávy přes konektor, postupujte následovně:

  1. Otevřete druhou relaci SSH pro cluster Kafka:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Získejte adresu zprostředkovatelů Kafka pro novou relaci SSH. Nahraďte heslo heslem pro přihlášení ke clusteru a pak zadejte příkaz:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. K odesílání zpráv do iotout tématu použijte následující příkaz:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Tento příkaz vás nevrátí do normálního příkazového řádku Bash. Místo toho odesílá do tématu vstup iotout pomocí klávesnice.

  4. Pokud chcete odeslat zprávu do zařízení, vložte dokument JSON do relace SSH pro dané kafka-console-producerzařízení.

    Důležité

    Je nutné nastavit hodnotu "deviceId" položky na ID vašeho zařízení. V následujícím příkladu má zařízení název myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Schéma pro tento dokument JSON je podrobněji popsáno na adrese https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Pokud používáte simulované zařízení Raspberry Pi a je spuštěné, zařízení zaznamená následující zprávu:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Další informace o použití konektoru jímky naleznete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Další kroky

V tomto dokumentu jste zjistili, jak pomocí rozhraní Apache Kafka Připojení API spustit ioT Kafka Připojení or ve službě HDInsight. Pomocí následujících odkazů můžete zjistit další způsoby práce se systémem Kafka: