你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将 Apache Kafka on HDInsight 与 Azure IoT 中心配合使用

了解如何使用 Apache Kafka Connect Azure IoT 中心连接器在 Apache Kafka on HDInsight 与 Azure IoT 中心之间移动数据。 本文档介绍如何通过群集中的边缘节点运行 IoT 中心连接器。

使用 Kafka Connect API 可以实施所需的连接器,用于将数据连续提取到 Kafka,或者将数据从 Kafka 推送到另一个系统。 Apache Kafka Connect Azure IoT 中心是可将数据从 Azure IoT 中心提取到 Kafka 的连接器。 该连接器还能将数据从 Kafka 推送到 IoT 中心。

从 IoT 中心提取数据时,可以使用连接器。 将数据推送到 IoT 中心时,可以使用接收器连接器。 IoT 中心连接器同时提供源连接器和接收器连接器。

下图显示了在使用连接器时,Azure IoT 中心与 Kafka on HDInsight 之间的数据流。

Image showing data flowing from IoT Hub to Kafka through the connector.

有关 Connect API 的详细信息,请参阅 https://kafka.apache.org/documentation/#connect

必备条件

生成连接器

  1. 将连接器的源从 https://github.com/Azure/toketi-kafka-connect-iothub/ 下载到本地环境。

  2. 在命令提示符中导航到 toketi-kafka-connect-iothub-master 目录。 然后使用以下命令生成并打包项目:

    sbt assembly
    

    生成过程需要花费几分钟时间才能完成。 使用此命令在项目的 toketi-kafka-connect-iothub-master\target\scala-2.11 目录中创建名为 kafka-connect-iothub-assembly_2.11-0.7.0.jar 的文件。

安装连接器

  1. 将 .jar 文件上传到 Kafka on HDInsight 群集的边缘节点。 编辑以下命令,将 CLUSTERNAME 替换为群集的实际名称。 我们使用 SSH 用户帐户的默认值和边缘节点的名称,可根据需要进行修改。

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. 文件复制完成后,使用 SSH 连接到边缘节点:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. 若要在 Kafka libs 目录中安装连接器,请使用以下命令:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

使 SSH 连接保持活动状态,以执行剩余步骤。

配置 Apache Kafka

与边缘节点建立 SSH 连接后,使用以下步骤将 Kafka 配置为在独立模式下运行连接器:

  1. 设置密码变量。 将 PASSWORD 替换为群集登录密码,然后输入以下命令:

    export password='PASSWORD'
    
  2. 安装 jq 实用工具。 使用 jq 可以更轻松地处理 Ambari 查询返回的 JSON 文档。 输入以下命令:

    sudo apt -y install jq
    
  3. 获取 Kafka 代理的地址。 群集中可能有许多的代理,但只需引用其中的一到两个。 若要获取两个代理主机的地址,请使用以下命令:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    复制这些值供稍后使用。 返回的值类似于下文:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. 获取 Apache Zookeeper 节点的地址。 群集中有多个 Zookeeper 节点,但只需引用其中的一到两个。 使用以下变量存储变量 KAFKAZKHOSTS 中的地址:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. 在独立模式下运行连接器时,将使用 /usr/hdp/current/kafka-broker/config/connect-standalone.properties 文件来与 Kafka 代理通信。 若要编辑 connect-standalone.properties 文件,请使用以下命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. 进行以下编辑:

    当前值 新值 注释
    bootstrap.servers=localhost:9092 localhost:9092 值替换为在上一步骤中获取的代理主机 将边缘节点的独立配置配置为查找 Kafka 代理。
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter 做出此项更改后,可以使用 Kafka 随附的控制台生成方执行测试。 对于其他生产方和使用方,可能需要不同的转换器。 有关使用其他转换器值的信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter 与给定内容相同。
    不可用 consumer.max.poll.records=10 添加到文件末尾。 此项更改会将接收器连接器限制为每次处理 10 条记录,防止该连接器发生超时。 有关详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md
  7. 要保存文件,请使用 Ctrl + XY,并按 Enter

  8. 若要创建连接器使用的主题,请使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    若要验证 iotiniotout 主题是否存在,请使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    iotin 主题用于从 IoT 中心接收消息。 iotout 主题用于向 IoT 中心发送消息。

获取 IoT 中心连接信息

若要检索连接器使用的 IoT 中心信息,请使用以下步骤:

  1. 获取事件中心兼容的终结点,以及 IoT 中心的事件中心兼容终结点名称。 若要获取此信息,请使用以下方法之一:

    • Azure 门户使用以下步骤:

      1. 导航到 IoT 中心并选择“终结点”。

      2. 在“内置终结点”中,选择“事件”。

      3. 在“属性”中,复制以下字段的值:

        • 与事件中心兼容的名称
        • 与事件中心兼容的终结点
        • 分区

        重要

        门户中的终结点值可能包含本示例不需要的额外文本。 提取与模式 sb://<randomnamespace>.servicebus.windows.net/ 匹配的文本。

    • Azure CLI 中使用以下命令:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      myhubname 替换为 IoT 中心的名称。 响应类似于以下文本:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. 获取共享访问策略密钥。 本示例使用服务密钥。 若要获取此信息,请使用以下方法之一:

    • Azure 门户使用以下步骤:

      1. 依次选择“共享访问策略”、“服务”。
      2. 复制“主密钥”值。
      3. 复制“连接字符串 - 主键” 值。
    • Azure CLI 中使用以下命令:

      1. 若要获取主密钥值,请使用以下命令:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        myhubname 替换为 IoT 中心的名称。 响应是发送到此中心的 service 策略的主密钥。

      2. 若要获取 service 策略的连接字符串,请使用以下命令:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        myhubname 替换为 IoT 中心的名称。 响应是 service 策略的连接字符串。

配置源连接

若要将源配置为使用 IoT 中心,请在与边缘节点建立 SSH 连接后执行以下操作:

  1. /usr/hdp/current/kafka-broker/config/ 目录中创建 connect-iot-source.properties 文件的副本。 若要从 toketi-kafka-connect-iothub 项目下载文件,请使用以下命令:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. 若要编辑 connect-iot-source.properties 文件并添加 IoT 中心信息,请使用以下命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. 在编辑器中,找到并更改以下条目:

    当前值 编辑
    Kafka.Topic=PLACEHOLDER PLACEHOLDER 替换为 iotin。 从 IoT 中心收到的消息将放入 iotin 主题中。
    IotHub.EventHubCompatibleName=PLACEHOLDER PLACEHOLDER 替换为与事件中心兼容的名称。
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER PLACEHOLDER 替换为与事件中心兼容的终结点。
    IotHub.AccessKeyName=PLACEHOLDER PLACEHOLDER 替换为 service
    IotHub.AccessKeyValue=PLACEHOLDER PLACEHOLDER 替换为 service 策略的主密钥。
    IotHub.Partitions=PLACEHOLDER PLACEHOLDER 替换为在上一步骤中获取的分区数。
    IotHub.StartTime=PLACEHOLDER PLACEHOLDER 替换为 UTC 日期。 此日期是连接器开始检查消息的时间。 日期格式为 yyyy-mm-ddThh:mm:ssZ
    BatchSize=100 100 替换为 5。 做出此项更改后,如果 IoT 中心出现五条新消息,则连接器会将消息读入 Kafka。

    有关示例配置,请参阅用于 Azure IoT 中心的 Kafka Connect 源连接器

  4. 若要保存更改,请依次按 Ctrl + XYEnter

有关配置连接器源的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md

配置接收器连接

若要将接收器连接配置为使用 IoT 中心,请在与边缘节点建立 SSH 连接后执行以下操作:

  1. /usr/hdp/current/kafka-broker/config/ 目录中创建 connect-iothub-sink.properties 文件的副本。 若要从 toketi-kafka-connect-iothub 项目下载文件,请使用以下命令:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. 若要编辑 connect-iothub-sink.properties 文件并添加 IoT 中心信息,请使用以下命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. 在编辑器中,找到并更改以下条目:

    当前值 编辑
    topics=PLACEHOLDER PLACEHOLDER 替换为 iotout。 写入 iotout 主题的消息将转发到 IoT 中心。
    IotHub.ConnectionString=PLACEHOLDER PLACEHOLDER 替换为 service 策略的连接字符串。

    有关示例配置,请参阅用于 Azure IoT 中心的 Kafka Connect 接收器连接器

  4. 若要保存更改,请依次按 Ctrl + XYEnter

有关配置连接器接收器的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md

启动源连接器

  1. 若要启动源连接器,请在与边缘节点建立 SSH 连接后使用以下命令:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    启动连接器后,将消息从设备发送到 IoT 中心。 当连接器从 IoT 中心读取消息并将其存储在 Kafka 主题时,会将信息记录到控制台:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    注意

    连接器启动时,可能会出现几条警告。 这些警告不会影响从 IoT 中心接收消息。

  2. 请在几分钟后按两次 Ctrl + C 停止连接器 。 停止连接器需要几分钟时间。

启动接收器连接器

与边缘节点建立 SSH 连接后,使用以下命令在独立模式下启动接收器连接器:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

连接器运行时,会显示类似于以下文本的信息:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

注意

连接器启动时,可能会出现几条警告。 可以放心地忽略这些警告。

发送消息

若要通过连接器发送消息,请使用以下步骤:

  1. 打开第二个 SSH 会话,连接到 Kafka 群集 :

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 获取新 SSH 会话的 Kafka 代理的地址。 将 PASSWORD 替换为群集登录密码,然后输入以下命令:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. 若要将消息发送到 iotout 主题,请使用以下命令:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    此命令不会将你返回到正常的 Bash 提示符, 而是将键盘输入发送到 iotout 主题。

  4. 若要将消息发送到设备,请将一个 JSON 文档粘贴到 kafka-console-producer 的 SSH 会话中。

    重要

    必须将 "deviceId" 条目的值设置为设备 ID。 在以下示例中,设备名为 myDeviceId

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md 中更详细地介绍了此 JSON 文档的架构。

如果使用模拟的 Raspberry Pi 设备,并且该设备正在运行,则设备会记录以下消息:

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

有关使用接收器连接器的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md

后续步骤

本文档已介绍如何使用 Apache Kafka Connect API 在 HDInsight 上启动 IoT Kafka 连接器。 使用以下链接来发现与 Kafka 配合使用的其他方式: