Share via


在 HDInsight 上使用 Apache Kafka 搭配 Azure IoT 中樞

瞭解如何使用 Apache Kafka 連線 Azure IoT 中樞 連接器,在 HDInsight 上的 Apache Kafka 與 Azure IoT 中樞 之間行動數據。 在本檔中,您將瞭解如何從叢集中的邊緣節點執行 IoT 中樞 連接器。

Kafka 連線 API 可讓您實作連接器,以持續將數據提取至 Kafka,或將數據從 Kafka 推送至另一個系統。 Apache Kafka 連線 Azure IoT 中樞 是一種連接器,會將數據從 Azure IoT 中樞 提取到 Kafka。 它也可以將數據從 Kafka 推送至 IoT 中樞。

從 IoT 中樞 提取時,您可以使用來源連接器。 推送至 IoT 中樞 時,您會使用接收連接器。 IoT 中樞 連接器同時提供來源和接收連接器。

下圖顯示使用連接器時,Azure IoT 中樞 與 HDInsight 上的 Kafka 之間的數據流。

Image showing data flowing from IoT Hub to Kafka through the connector.

如需 連線 API 的詳細資訊,請參閱 https://kafka.apache.org/documentation/#connect

必要條件

建置連接器

  1. 從 下載連接器 https://github.com/Azure/toketi-kafka-connect-iothub/ 的來源到您的本機環境。

  2. 從命令提示字元,瀏覽至 toketi-kafka-connect-iothub-master 目錄。 然後使用下列命令來建置和封裝專案:

    sbt assembly
    

    建置需要幾分鐘的時間才能完成。 命令會在項目的目錄中建立名為 kafka-connect-iothub-assembly_2.11-0.7.0.jartoketi-kafka-connect-iothub-master\target\scala-2.11 檔案。

安裝連接器

  1. 將.jar檔案上傳至 HDInsight 叢集上 Kafka 的邊緣節點。 以您叢集的實際名稱取代 CLUSTERNAME ,以編輯下列命令。 SSH 使用者帳戶和邊緣節點名稱的預設值會視需要使用和修改。

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. 檔案複製完成後,請使用 SSH 連線到邊緣節點:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. 若要將連接器安裝到 Kafka libs 目錄,請使用下列命令:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

讓 SSH 連線保持作用中,以取得其餘步驟。

設定 Apache Kafka

從 SSH 連線到邊緣節點,使用下列步驟設定 Kafka 以獨立模式執行連接器:

  1. 設定密碼變數。 將PASSWORD取代為叢集登入密碼,然後輸入命令:

    export password='PASSWORD'
    
  2. 安裝 jq 公用程式。 jq 可讓您更輕鬆地處理從 Ambari 查詢傳回的 JSON 檔。 輸入下列命令:

    sudo apt -y install jq
    
  3. 取得 Kafka 訊息代理程式位址。 叢集中可能會有許多訊息代理程式,但您只需要參考一或兩個訊息代理程式。 若要取得兩個訊息代理程式主機的位址,請使用下列命令:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    複製值以供日後使用。 傳回的值類似於下列文字:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. 取得 Apache Zookeeper 節點的位址。 叢集中有數個 Zookeeper 節點,但您只需要參考一或兩個。 使用下列命令將位址儲存在變數 KAFKAZKHOSTS中:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. 在獨立模式中執行連接器時, /usr/hdp/current/kafka-broker/config/connect-standalone.properties 檔案會用來與 Kafka 訊息代理程式通訊。 若要編輯檔案 connect-standalone.properties ,請使用下列命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. 進行下列編輯:

    目前的值 新值 註解
    bootstrap.servers=localhost:9092 localhost:9092 值取代為上一個步驟中的訊息代理程式主機 設定邊緣節點的獨立設定,以尋找 Kafka 訊息代理程式。
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter 這項變更可讓您使用 Kafka 隨附的主控台產生者進行測試。 對於其他生產者和取用者,您可能需要不同的轉換器。 如需使用其他轉換器值的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter 與指定相同。
    N/A consumer.max.poll.records=10 新增至檔尾。 這項變更是防止接收連接器中的逾時,一次限制為10筆記錄。 如需詳細資訊,請參閱https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md
  7. 若要儲存盤案,請使用 Ctrl + XY,然後 輸入

  8. 若要建立連接器所使用的主題,請使用下列命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    若要確認 iotiniotout 主題存在,請使用下列命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    本主題iotin用來接收來自 IoT 中樞 的訊息。 本主題iotout用來將訊息傳送至 IoT 中樞。

取得 IoT 中樞 連線資訊

若要擷取連接器所使用的IoT中樞資訊,請使用下列步驟:

  1. 取得IoT中樞的事件中樞相容端點和事件中樞相容端點名稱。 若要取得這項資訊,請使用下列其中一種方法:

    • 從 Azure 入口網站,使用下列步驟:

      1. 流覽至您的 IoT 中樞,然後選取 [端點]。

      2. 內建端點選取 [事件]。

      3. [屬性] 複製下列欄位的值:

        • 事件中樞相容名稱
        • 事件中樞相容端點
        • 資料分割

        重要

        來自入口網站的端點值可能包含此範例中不需要的額外文字。 擷取符合此模式 sb://<randomnamespace>.servicebus.windows.net/的文字。

    • 從 Azure CLI 使用下列命令:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      將取代 myhubname 為您的IoT中樞名稱。 回應類似下列文字:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. 取得共用存取原則金鑰。 在此範例中 ,請使用服務 密鑰。 若要取得這項資訊,請使用下列其中一種方法:

    • 從 Azure 入口網站,使用下列步驟:

      1. 選取 [共用存取原則],然後選取 [服務]。
      2. 複製 [ 主鍵 ] 值。
      3. 複製 連線 ion string--primary key 值。
    • 從 Azure CLI 使用下列命令:

      1. 若要取得主鍵值,請使用下列命令:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        將取代 myhubname 為您的IoT中樞名稱。 回應是此中樞原則 service 的主要密鑰。

      2. 若要取得原則的 連接字串service,請使用下列命令:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        將取代 myhubname 為您的IoT中樞名稱。 回應是原則service的 連接字串。

設定來源連線

若要設定來源以使用您的 IoT 中樞,請從 SSH 連線到邊緣節點執行下列動作:

  1. 在目錄中建立檔案/usr/hdp/current/kafka-broker/config/connect-iot-source.properties複本。 若要從 toketi-kafka-connect-iothub 專案下載檔案,請使用下列命令:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. 若要編輯 connect-iot-source.properties 檔案並新增IoT中樞資訊,請使用下列命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. 在編輯器中,尋找並變更下列專案:

    目前的值 編輯
    Kafka.Topic=PLACEHOLDER PLACEHOLDER 替換為 iotin。 從IoT中樞收到的訊息會放在主題中 iotin
    IotHub.EventHubCompatibleName=PLACEHOLDER 將取代 PLACEHOLDER 為事件中樞相容名稱。
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER 將取代 PLACEHOLDER 為事件中樞相容的端點。
    IotHub.AccessKeyName=PLACEHOLDER PLACEHOLDER 替換為 service
    IotHub.AccessKeyValue=PLACEHOLDER 將取代 PLACEHOLDERservice 原則的主鍵。
    IotHub.Partitions=PLACEHOLDER 將取代 PLACEHOLDER 為先前步驟中的分割區數目。
    IotHub.StartTime=PLACEHOLDER 將取代 PLACEHOLDER 為 UTC 日期。 此日期是連接器開始檢查訊息的時機。 日期格式為 yyyy-mm-ddThh:mm:ssZ
    BatchSize=100 100 替換為 5。 這項變更會導致連接器在IoT中樞有五個新訊息後,將訊息讀入Kafka。

    如需範例組態,請參閱 kafka 連線 source 連線 or for Azure IoT 中樞

  4. 若要儲存變更,請使用 Ctrl + XY,然後 輸入

如需設定連接器來源的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md

設定接收連線

若要設定接收連線以使用您的 IoT 中樞,請從 SSH 連線到邊緣節點執行下列動作:

  1. 在目錄中建立檔案/usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties複本。 若要從 toketi-kafka-connect-iothub 專案下載檔案,請使用下列命令:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. 若要編輯 connect-iothub-sink.properties 檔案並新增IoT中樞資訊,請使用下列命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. 在編輯器中,尋找並變更下列專案:

    目前的值 編輯
    topics=PLACEHOLDER PLACEHOLDER 替換為 iotout。 寫入 iotout 主題的訊息會轉送至IoT中樞。
    IotHub.ConnectionString=PLACEHOLDER 將取代PLACEHOLDER為原則service的 連接字串。

    如需範例組態,請參閱 Azure IoT 中樞的 Kafka 連線 接收 連線 or

  4. 若要儲存變更,請使用 Ctrl + XY,然後 輸入

如需設定連接器接收的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md

啟動來源連接器

  1. 若要啟動來源連接器,請使用從 SSH 連線到邊緣節點的下列命令:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    連接器啟動時,請從您的裝置將訊息傳送至IoT中樞。 當連接器從IoT中樞讀取訊息,並將其儲存在Kafka主題中時,它會將資訊記錄到主控台:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    注意

    當連接器啟動時,您可能會看到數個警告。 這些警告不會造成從IoT中樞接收訊息的問題。

  2. 使用 Ctrl + C 幾分鐘後停止連接器兩次。 連接器需要幾分鐘的時間才能停止。

啟動接收連接器

從 SSH 連線到邊緣節點,使用下列命令以獨立模式啟動接收連接器:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

當連接器執行時,會顯示類似下列文字的資訊:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

注意

當連接器啟動時,您可能會注意到數個警告。 您可以放心地忽略這些。

傳送訊息

若要透過連接器傳送訊息,請使用下列步驟:

  1. 開啟 Kafka 叢集的第二 個 SSH 工作階段:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 取得新 ssh 工作階段的 Kafka 訊息代理程式位址。 將PASSWORD取代為叢集登入密碼,然後輸入命令:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. 若要將訊息傳送至 iotout 主題,請使用下列命令:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    此命令不會將您傳回一般Bash提示字元。 相反地,它會將鍵盤輸入傳送至 iotout 主題。

  4. 若要將訊息傳送至您的裝置,請將 JSON 檔貼到 的 SSH 工作階段中 kafka-console-producer

    重要

    您必須將專案的值 "deviceId" 設定為裝置的識別碼。 在下列範例中,裝置名為 myDeviceId

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    此 JSON 檔的架構會在 中詳細說明 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md

如果您使用仿真的 Raspberry Pi 裝置,且其正在執行中,裝置會記錄下列訊息:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

如需使用接收連接器的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md

下一步

在本檔中,您已瞭解如何使用 Apache Kafka 連線 API 在 HDInsight 上啟動 IoT Kafka 連線 or。 使用下列連結來探索使用 Kafka 的其他方式: