您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 MirrorMaker 通过 Kafka on HDInsight 复制 Apache Kafka 主题Use MirrorMaker to replicate Apache Kafka topics with Kafka on HDInsight

了解如何使用 Apache Kafka 镜像功能将主题复制到辅助群集。Learn how to use Apache Kafka's mirroring feature to replicate topics to a secondary cluster. 镜像可以作为连续进程运行,也可间断式地用作将数据从一个群集迁移到另一个群集的方法。Mirroring can be ran as a continuous process, or used intermittently as a method of migrating data from one cluster to another.

在此示例中,镜像用于在两个 HDInsight 群集之间复制主题。In this example, mirroring is used to replicate topics between two HDInsight clusters. 这两个群集位于不同数据中心内的不同虚拟网络中。Both clusters are in different virtual networks in different datacenters.

警告

不应将镜像视为实现容错的方法。Mirroring should not be considered as a means to achieve fault-tolerance. 主题中项的偏移在主要群集与辅助群集之间有所不同,因此客户端不能换用这两种群集。The offset to items within a topic are different between the primary and secondary clusters, so clients cannot use the two interchangeably.

如果担心容错,应在群集内为主题设置复制。If you are concerned about fault tolerance, you should set replication for the topics within your cluster. 有关详细信息,请参阅 Apache Kafka on HDInsight 入门For more information, see Get started with Apache Kafka on HDInsight.

Apache Kafka 镜像的工作原理How Apache Kafka mirroring works

镜像通过使用 MirrorMaker 工具(Apache Kafka 的一部分)来使用主要群集上主题中的记录,然后在辅助群集上创建本地副本。Mirroring works by using the MirrorMaker tool (part of Apache Kafka) to consume records from topics on the primary cluster and then create a local copy on the secondary cluster. MirrorMaker 使用一个或多个使用者从主要群集读取记录,使用生成者将记录写入本地(辅助)群集。****MirrorMaker uses one (or more) consumers that read from the primary cluster, and a producer that writes to the local (secondary) cluster.

最有用的灾难恢复镜像设置利用不同 Azure 区域中的 Kafka 群集。The most useful mirroring setup for disaster recovery utilizes Kafka clusters in different Azure regions. 为实现此目的,群集所在的虚拟网络将对等互连到一起。To achieve this, the virtual networks where the clusters reside are peered together.

下图演示了镜像过程以及群集之间的通信流动方式:The following diagram illustrates the mirroring process and how the communication flows between clusters:

镜像过程图示

主要和辅助群集在节点与分区数目方面可以不同,主题中的偏移也可以不同。The primary and secondary clusters can be different in the number of nodes and partitions, and offsets within the topics are different also. 镜像维护用于分区的密钥值,因此会按密钥来保留记录顺序。Mirroring maintains the key value that is used for partitioning, so record order is preserved on a per-key basis.

跨网络边界执行镜像操作Mirroring across network boundaries

如果需要在不同网络中的 Kafka 群集之间执行镜像操作,还需要考虑以下注意事项:If you need to mirror between Kafka clusters in different networks, there are the following additional considerations:

  • 网关:网络必须能够在 TCP/IP 级别进行通信。Gateways: The networks must be able to communicate at the TCP/IP level.

  • 服务器寻址:您可以选择使用群集节点的 IP 地址或完全限定的域名寻址。Server addressing: You can choose to address your cluster nodes using their IP addresses or fully qualified domain names.

    • IP 地址:如果将 Kafka 群集配置为使用 IP 地址广告,则可以使用代理节点和动物园管理员节点的 IP 地址继续进行镜像设置。IP addresses: If you configure your Kafka clusters to use IP address advertising, you can proceed with the mirroring setup using the IP addresses of the broker nodes and zookeeper nodes.

    • 域名:如果您未为 IP 地址广告配置 Kafka 群集,群集必须能够使用完全限定域名 (FQDN) 相互连接。Domain names: If you don't configure your Kafka clusters for IP address advertising, the clusters must be able to connect to each other by using Fully Qualified Domain Names (FQDNs). 这需要在每个网络中设置一台域名系统 (DNS) 服务器,并将其配置为向其他网络转发请求。This requires a Domain Name System (DNS) server in each network that is configured to forward requests to the other networks. 创建 Azure 虚拟网络时,必须指定自定义 DNS 服务器和服务器的 IP 地址,而不是使用网络提供的自动 DNS。When creating an Azure Virtual Network, instead of using the automatic DNS provided with the network, you must specify a custom DNS server and the IP address for the server. 创建虚拟网络后,必须创建一个使用该 IP 地址的 Azure 虚拟机,并在其上安装和配置 DNS 软件。After the Virtual Network has been created, you must then create an Azure Virtual Machine that uses that IP address, then install and configure DNS software on it.

    警告

    在将 HDInsight 安装到虚拟网络之前,需先创建和配置自定义 DNS 服务器。Create and configure the custom DNS server before installing HDInsight into the Virtual Network. HDInsight 不需要再进行其他配置,便可使用为虚拟网络配置的 DNS 服务器。There is no additional configuration required for HDInsight to use the DNS server configured for the Virtual Network.

有关连接两个 Azure 虚拟网络的详细信息,请参阅配置 VNet 到 VNet 的连接For more information on connecting two Azure Virtual Networks, see Configure a VNet-to-VNet connection.

镜像体系结构Mirroring architecture

此体系结构在不同的资源组和虚拟网络中配置两个群集:主要群集辅助群集This architecture features two clusters in different resource groups and virtual networks: a primary and secondary.

创建步骤Creation steps

  1. 创建两个新的资源组:Create two new resource groups:

    资源组Resource Group 位置Location
    kafka-primary-rgkafka-primary-rg 美国中部Central US
    kafka-secondary-rgkafka-secondary-rg 美国中北部North Central US
  2. kafka-primary-rg 中创建新的虚拟网络 kafka-primary-vnetCreate a new virtual network kafka-primary-vnet in kafka-primary-rg. 保留默认设置。Leave the default settings.

  3. kafka-secondary-rg 中创建新的虚拟网络 kafka-secondary-vnet,也保留默认设置。Create a new virtual network kafka-secondary-vnet in kafka-secondary-rg, also with default settings.

  4. 创建两个新的 Kafka 群集:Create two new Kafka clusters:

    群集名称Cluster name 资源组Resource Group 虚拟网络Virtual Network 存储帐户Storage Account
    kafka-primary-clusterkafka-primary-cluster kafka-primary-rgkafka-primary-rg kafka-primary-vnetkafka-primary-vnet kafkaprimarystoragekafkaprimarystorage
    kafka-secondary-clusterkafka-secondary-cluster kafka-secondary-rgkafka-secondary-rg kafka-secondary-vnetkafka-secondary-vnet kafkasecondarystoragekafkasecondarystorage
  5. 创建虚拟网络对等互连。Create virtual network peerings. 此步骤创建两个对等互连:一个从 kafka-primary-vnet 连接到 kafka-secondary-vnet,一个从 kafka-secondary-vnet 连接回到 kafka-primary-vnetThis step will create two peerings: one from kafka-primary-vnet to kafka-secondary-vnet and one back from kafka-secondary-vnet to kafka-primary-vnet.

    1. 选择“kafka-primary-vnet”虚拟网络。****Select the kafka-primary-vnet virtual network.

    2. 在“设置”**** 下,选择“对等互连”****。Select Peerings under Settings.

    3. 选择“添加”****。Select Add.

    4. 在“添加对等互连”屏幕上输入详细信息,如以下屏幕截图所示。****On the Add peering screen, enter the details as shown in the screenshot below.

      HDInsight 卡夫卡添加 vnet 对等互连

配置 IP 广告Configure IP advertising

配置 IP 广告以使客户端能够使用代理 IP 地址而不是域名进行连接。Configure IP advertising to enable a client to connect using broker IP addresses instead of domain names.

  1. 转到主要群集的 Ambari 仪表板:https://PRIMARYCLUSTERNAME.azurehdinsight.netGo to the Ambari dashboard for the primary cluster: https://PRIMARYCLUSTERNAME.azurehdinsight.net.

  2. 选择服务 > 卡夫卡Select Services > Kafka. 单击“配置”选项卡 ****。CliSelectck the Configs tab.

  3. 将以下配置行添加到底部的 kafka-env template 节。Add the following config lines to the bottom kafka-env template section. 选择“保存”。****Select Save.

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  4. 在“保存配置”屏幕上输入备注,然后单击“保存”。********Enter a note on the Save Configuration screen and click Save.

  5. 如果出现配置警告提示,请单击“仍然继续”。****If you're prompted with configuration warning, click Proceed Anyway.

  6. 在“保存配置更改”屏幕上选择“确定”。********Select Ok on the Save Configuration Changes.

  7. 选择 "重新启动 > 重新启动所有受影响的"****通知。Select Restart > Restart All Affected in the Restart Required notification. 选择 "确认全部重新启动"。Select Confirm Restart All.

    Apache Ambari 重新启动所有受影响的服务

将 Kafka 配置为侦听所有网络接口。Configure Kafka to listen on all network interfaces.

  1. 不要关闭“服务” > “Kafka”下的“配置”选项卡。************Stay on the Configs tab under Services > Kafka. 在“Kafka 代理”部分,将“侦听器”属性设置为 PLAINTEXT://0.0.0.0:9092。********In the Kafka Broker section set the listeners property to PLAINTEXT://0.0.0.0:9092.
  2. 选择“保存”。****Select Save.
  3. 依次选择“重启”、“确认全部重启”。********Select Restart, and Confirm Restart All.

记下主要群集的代理 IP 地址和 Zookeeper 地址。Record Broker IP addresses and Zookeeper addresses for primary cluster.

  1. 在 Ambari 仪表板上选择“主机”。****Select Hosts on the Ambari dashboard.

  2. 记下代理和 Zookeeper 的 IP 地址。Make a note of the IP Addresses for the Brokers and Zookeepers. 代理节点主机名的前两个字母为 wn,Zookeeper 节点主机名的前两个字母为 zkThe broker nodes have wn as the first two letters of the host name, and the zookeeper nodes have zk as the first two letters of the host name.

    阿帕奇·安巴里视图节点 ip 地址

  3. 对第二个群集 kafka-secondary-cluster 重复上述三个步骤:配置 IP 播发、设置侦听器并记下代理和 Zookeeper 的 IP 地址。Repeat the previous three steps for the second cluster kafka-secondary-cluster: configure IP advertising, set listeners and make a note of the Broker and Zookeeper IP addresses.

创建主题Create topics

  1. 使用 SSH 连接到主要群集:Connect to the primary cluster using SSH:

    ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.net
    

    用创建群集时使用的 SSH 用户名替换 sshuserReplace sshuser with the SSH user name used when creating the cluster. PRIMARYCLUSTER 替换为创建群集时使用的基名称。Replace PRIMARYCLUSTER with the base name used when creating the cluster.

    有关信息,请参阅将 SSH 与 HDInsight 配合使用For information, see Use SSH with HDInsight.

  2. 使用以下命令创建一个变量,其中包含主要群集的 Apache Zookeeper 主机。Use the following command to create a variable with the Apache Zookeeper hosts for the primary cluster. 必须将类似于 ZOOKEEPER_IP_ADDRESS1 的字符串替换为前面记下的实际 IP 地址,例如 10.23.0.1110.23.0.7The strings like ZOOKEEPER_IP_ADDRESS1 must be replaced with the actual IP addresses recorded earlier, such as 10.23.0.11 and 10.23.0.7. 如果您使用 FQDN 解析与自定义 DNS 服务器,请按照以下步骤获取代理和动物园管理员名称:If you're using FQDN resolution with a custom DNS server, follow these steps to get broker and zookeeper names.:

    # get the zookeeper hosts for the primary cluster
    export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181'
    
  3. 若要创建名为 testtopic 的主题,请使用以下命令:To create a topic named testtopic, use the following command:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
    
  4. 使用以下命令验证是否已创建该主题:Use the following command to verify that the topic was created:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
    

    响应包含 testtopicThe response contains testtopic.

  5. 使用以下命令查看此(主要)群集的 Zookeeper 主机信息:Use the following to view the Zookeeper host information for this (the primary) cluster:

    echo $PRIMARY_ZKHOSTS
    

    这会返回类似于以下文本的信息:This returns information similar to the following text:

    10.23.0.11:2181,10.23.0.7:2181,10.23.0.9:2181

    请保存此信息。Save this information. 它用于下一节。It's used in the next section.

配置镜像Configure mirroring

  1. 使用不同的 SSH 会话连接到辅助群集:Connect to the secondary cluster using a different SSH session:

    ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.net
    

    用创建群集时使用的 SSH 用户名替换 sshuserReplace sshuser with the SSH user name used when creating the cluster. SECONDARYCLUSTER 替换为创建群集时使用的名称。Replace SECONDARYCLUSTER with the name used when creating the cluster.

    有关信息,请参阅将 SSH 与 HDInsight 配合使用For information, see Use SSH with HDInsight.

  2. consumer.properties 文件用于配置与主要群集的通信。A consumer.properties file is used to configure communication with the primary cluster. 若要创建文件,请使用以下命令:To create the file, use the following command:

    nano consumer.properties
    

    将以下文本用作 consumer.properties 文件的内容:Use the following text as the contents of the consumer.properties file:

    zookeeper.connect=PRIMARY_ZKHOSTS
    group.id=mirrorgroup
    

    PRIMARY_ZKHOSTS 替换为主要群集中的 Zookeeper IP 地址。Replace PRIMARY_ZKHOSTS with the Zookeeper IP Addresses from the primary cluster.

    此文件说明从 Kafka 主要群集读取记录时要使用的使用者信息。This file describes the consumer information to use when reading from the primary Kafka cluster. 有关使用者配置的详细信息,请参阅 kafka.apache.org 中的使用者配置For more information consumer configuration, see Consumer Configs at kafka.apache.org.

    要保存文件,请使用 Ctrl + XY,并按 EnterTo save the file, use Ctrl + X, Y, and then Enter.

  3. 在配置与辅助群集通信的生产者之前,为辅助群集的代理 IP 地址设置变量。Before configuring the producer that communicates with the secondary cluster, set up a variable for the broker IP addresses of the secondary cluster. 使用以下命令创建此变量:Use the following commands to create this variable:

    export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    

    命令 echo $SECONDARY_BROKERHOSTS 返回的信息应类似于以下文本:The command echo $SECONDARY_BROKERHOSTS should return information similar to the following text:

    10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092

  4. producer.properties 文件用于与辅助群集通信。A producer.properties file is used to communicate the secondary cluster. 若要创建文件,请使用以下命令:To create the file, use the following command:

    nano producer.properties
    

    将以下文本用作 producer.properties 文件的内容:Use the following text as the contents of the producer.properties file:

    bootstrap.servers=SECONDARY_BROKERHOSTS
    compression.type=none
    

    SECONDARY_BROKERHOSTS 替换为在上一步骤中使用的代理 IP 地址。Replace SECONDARY_BROKERHOSTS with the broker IP addresses used in the previous step.

    有关创建器配置的详细信息,请参阅 kafka.apache.org 中的创建器配置For more information producer configuration, see Producer Configs at kafka.apache.org.

  5. 使用以下命令创建一个环境变量,其中包含辅助群集的 Zookeeper 主机 IP 地址:Use the following commands to create an environment variable with the IP addresses of the Zookeeper hosts for the secondary cluster:

    # get the zookeeper hosts for the secondary cluster
    export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
    
  6. HDInsight 上的 Kafka 的默认配置不允许自动创建主题。The default configuration for Kafka on HDInsight doesn't allow the automatic creation of topics. 在开始镜像过程之前,你必须使用以下选项之一:You must use one of the following options before starting the Mirroring process:

    • 在辅助群集上创建主题:此选项还允许您设置分区数和复制因子。Create the topics on the secondary cluster: This option also allows you to set the number of partitions and the replication factor.

      可以使用以下命令提前创建新的主题:You can create topics ahead of time by using the following command:

      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
      

      testtopic 替换为要创建的主题的名称。Replace testtopic with the name of the topic to create.

    • 为自动主题创建配置群集:此选项允许 MirrorMaker 自动创建主题,但它可能会使用与主主题不同的分区或复制因子创建主题。Configure the cluster for automatic topic creation: This option allows MirrorMaker to automatically create topics, however it may create them with a different number of partitions or replication factor than the primary topic.

      若要配置辅助群集以自动创建主题,请执行以下步骤:To configure the secondary cluster to automatically create topics, perform these steps:

      1. 转到辅助群集的 Ambari 仪表板:https://SECONDARYCLUSTERNAME.azurehdinsight.netGo to the Ambari dashboard for the secondary cluster: https://SECONDARYCLUSTERNAME.azurehdinsight.net.
      2. 单击服务 > 卡夫卡Click Services > Kafka. 单击“配置”**** 选项卡。Click the Configs tab.
      3. 在“筛选器”____ 字段中输入值 auto.createIn the Filter field, enter a value of auto.create. 这将筛选的属性,并显示列表auto.create.topics.enable设置。This filters the list of properties and displays the auto.create.topics.enable setting.
      4. 更改的值auto.create.topics.enable为 true,然后选择__保存__。Change the value of auto.create.topics.enable to true, and then select Save. 添加注释,然后选择__保存__。Add a note, and then select Save again.
      5. 选择 Kafka 服务,选择__重启__,然后选择__重启所有受影响的__。Select the Kafka service, select Restart, and then select Restart all affected. 当出现提示时,选择 "确认全部重新启动"。When prompted, select Confirm restart all.

      卡夫卡启用自动创建主题

启动 MirrorMakerStart MirrorMaker

  1. 辅助群集建立 SSH 连接后,使用以下命令启动 MirrorMaker 进程:From the SSH connection to the secondary cluster, use the following command to start the MirrorMaker process:

    /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
    

    此示例中使用的参数有:The parameters used in this example are:

    参数Parameter 描述Description
    --consumer.config--consumer.config 指定包含使用者属性的文件。Specifies the file that contains consumer properties. 这些属性用于创建可从主要 Kafka 群集读取记录的使用者。These properties are used to create a consumer that reads from the primary Kafka cluster.
    --producer.config--producer.config 指定包含创建者属性的文件。Specifies the file that contains producer properties. 这些属性用于创建可向辅助 Kafka 群集写入记录的生成者。These properties are used to create a producer that writes to the secondary Kafka cluster.
    --whitelist--whitelist MirrorMaker 从主要群集复制到辅助群集的主题列表。A list of topics that MirrorMaker replicates from the primary cluster to the secondary.
    --num.streams--num.streams 要创建的使用者线程数。The number of consumer threads to create.

    现在,辅助节点上的使用者正在等待接收消息。The consumer on the secondary node is now waiting to receive messages.

  2. 主要群集建立 SSH 连接后,使用以下命令启动生成者,并向主题发送消息:From the SSH connection to the primary cluster, use the following command to start a producer and send messages to the topic:

    export PRIMARY_BROKERHOSTS=BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092
    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $SOURCE_BROKERHOSTS --topic testtopic
    

    出现带有光标的空行时,请键入几条文本消息。When you arrive at a blank line with a cursor, type in a few text messages. 这些消息将发送到主要群集上的主题。The messages are sent to the topic on the primary cluster. 完成后,使用 Ctrl + C 结束生成者进程。When done, use Ctrl + C to end the producer process.

  3. 辅助群集建立 SSH 连接后,使用 Ctrl + C 结束 MirrorMaker 进程。From the SSH connection to the secondary cluster, use Ctrl + C to end the MirrorMaker process. 它可能需要几秒钟时间结束进程。It may take several seconds to end the process. 若要验证是否已将主题和消息复制到辅助群集,请使用以下命令:To verify that the messages were replicated to the secondary, use the following command:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_ZKHOSTS --topic testtopic --from-beginning
    

    主题列表现在包含 testtopic,该条目是在 MirrorMaster 将主题从主要群集镜像到辅助群集时创建的。The list of topics now includes testtopic, which is created when MirrorMaster mirrors the topic from the primary cluster to the secondary. 从主题检索到的消息与在主要群集上输入的消息相同。The messages retrieved from the topic are the same as the ones you entered on the primary cluster.

删除群集Delete the cluster

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it. 请参阅如何删除 HDInsight 群集See how to delete an HDInsight cluster.

本文档中的步骤已在不同的 Azure 资源组中创建了群集。The steps in this document created clusters in different Azure resource groups. 若要删除创建的所有资源,可以删除创建的两个资源组:kafka-primary-rgkafka-secondary_rgTo delete all of the resources created, you can delete the two resource groups created: kafka-primary-rg and kafka-secondary_rg. 删除资源组会删除遵循本文档创建的所有资源,包括群集、虚拟网络和存储帐户。Deleting the resource groups removes all of the resources created by following this document, including clusters, virtual networks, and storage accounts.

后续步骤Next steps

本文档已介绍如何使用 MirrorMaker 创建 Apache Kafka 群集的副本。In this document, you learned how to use MirrorMaker to create a replica of an Apache Kafka cluster. 使用以下链接来发现与 Kafka 配合使用的其他方式:Use the following links to discover other ways to work with Kafka: