使用 MirrorMaker,透過 HDInsight 上的 Kafka 來複寫 Apache Kafka 主題

了解如何使用 Apache Kafka 的鏡像功能,將主題複寫至次要叢集。 您可以執行鏡像作為連續程序,或間歇性執行,以將資料從一個叢集移轉至另一個叢集。

在本文中,您將使用鏡像來複寫兩個 HDInsight 叢集之間的主題。 這些叢集位於不同資料中心的不同虛擬網路中。

警告

請勿使用鏡像作為達成容錯的方法。 主題內項目的位移在主要叢集與次要叢集之間有所不同,所以用戶端無法交替使用這兩者。 如果您很擔心容錯,您應該為叢集內的主題設定複寫。 如需詳細資訊,請參閱 開始使用 HDInsight 上的 Apache Kafka

Apache Kafka 鏡像的運作方式

鏡像的運作方式是使用 MirrorMaker 工具,這是 Apache Kafka 的一部分。 MirrorMaker 會從主要叢集上的主題取用記錄,然後在次要叢集上建立本機複本。 MirrorMaker 會使用一個或多個從主要叢集讀取的取用者,以及一個寫入至本機 (次要) 叢集的生產者

災害復原的最有用鏡像設定會利用不同 Azure 區域中的 Kafka 叢集。 為了達成此目的,叢集所在的虛擬網路會對等互連。

下圖說明鏡像程序以及通訊在叢集之間的流動方式:

Diagram of the mirroring process.

主要與次要叢集的節點與磁碟分割數目可能有所不同,且主題內的位移也會不同。 鏡像會維護用於資料分割的機碼值,因此會根據每個機碼保留記錄順序。

跨網路界限鏡像

如果您需要在不同網路中的 Kafka 叢集之間進行鏡像,請注意下列其他考量:

  • 閘道:網路必須能夠在 TCP/IP 層級進行通訊。

  • 伺服器定址:您可以選擇使用 IP 位址或完整網域名稱來定址叢集節點。

    • IP 位址:如果您將 Kafka 叢集設定為使用 IP 位址公告,則可以使用訊息代理程式節點和 Zookeeper 節點的 IP 位址,繼續進行鏡像設定。

    • 網域名稱:如果您未設定 Kafka 叢集進行 IP 位址公告,叢集必須能夠使用完整網域名稱 (FQDN) 彼此連接。 這需要每個網路中的網域名稱系統 (DNS) 伺服器設定為將要求轉送到其他網路。 建立 Azure 虛擬網路時,您必須指定自訂 DNS 伺服器和伺服器的 IP 位址,而不是使用隨著網路提供的自動 DNS。 建立虛擬網路之後,您必須接著建立使用該 IP 位址的 Azure 虛擬機器。 然後您會在其上安裝和設定 DNS 軟體。

    重要

    先建立和設定自訂 DNS 伺服器,然後再將 HDInsight 安裝到虛擬網路中。 HDInsight 不需要進行其他設定,即可使用針對虛擬網路設定的 DNS 伺服器。

如需有關如何連接兩個 Azure 虛擬網路的詳細資訊,請參閱設定連線

鏡像結構

此結構會在不同的資源群組和虛擬網路中提供兩個叢集:主要和次要。

建立步驟

  1. 建立兩個新的資源群組:

    資源群組 Location
    kafka-primary-rg 美國中部
    kafka-secondary-rg 美國中北部
  2. kafka-primary-rg 中建立新的虛擬網路 kafka-primary-vnet。 保留預設設定。

  3. kafka-secondary-rg 中建立新的虛擬網路 kafka-secondary-vnet,也具有預設設定。

  4. 建立兩個新的 Kafka 叢集:

    叢集名稱 資源群組 虛擬網路 儲存體帳戶
    kafka-primary-cluster kafka-primary-rg kafka-primary-vnet kafkaprimarystorage
    kafka-secondary-cluster kafka-secondary-rg kafka-secondary-vnet kafkasecondarystorage
  5. 建立虛擬網路對等互連。 此步驟將建立兩個對等互連:一個從 kafka-primary-vnetkafka-secondary-vnet,另一個從 kafka-secondary-vnet 回到 kafka-primary-vnet

    1. 選取 kafka-primary-vnet 虛擬網路。

    2. 在 [設定] 底下,選取 [對等互連]

    3. 選取 [新增]。

    4. 在 [新增對等互連] 畫面上輸入詳細資料,如下列螢幕擷取畫面所示。

      Screenshot that shows H D Insight Kafka add virtual network peering.

設定 IP 通告

設定 IP 通告,讓用戶端能夠使用訊息代理程式 IP 位址 (而不是網域名稱) 連線。

  1. 移至主要叢集的 Ambari 儀表板:https://PRIMARYCLUSTERNAME.azurehdinsight.net

  2. 選取 [服務]> [Kafka]。 選取 [ 設定 ] 索引標籤。

  3. 將下列組態行新增至底部的 kafka-env 範本區段。 選取 [儲存]。

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  4. 在 [儲存組態] 畫面上輸入備註,然後選取 [儲存]

  5. 如果您收到組態警告,請選取 [仍要繼續]

  6. 在 [儲存組態變更] 上,選取 [確定]

  7. 在 [需要重新啟動] 通知中,選取 [重新啟動]> [重新啟動所有受影響項目]。 然後選取 [確認重新啟動所有項目]

    Screenshot that shows the Apache Ambari option to restart all affected.

設定 Kafka 以在所有網路介面上接聽

  1. 停留在 [服務]> [Kafka] 下的 [設定] 索引標籤上。 在 [Kafka Broker] 區段中,將 [接聽程式] 屬性設定為 PLAINTEXT://0.0.0.0:9092
  2. 選取 [儲存]。
  3. 選取 [重新啟動]> [確認全部重新啟動]

記錄主要叢集的訊息代理程式 IP 位址和 ZooKeeper 位址

  1. 選取 Ambari 儀表板上的 [主機]

  2. 記下訊息代理程式和 ZooKeepers 的 IP 位址。 訊息代理程式節點主機名稱的前兩個字母為 wn,而 ZooKeeper 節點主機名稱的前兩個字母則為 zk

    Screenshot that shows the Apache Ambari view node i p addresses.

  3. 針對第二個叢集 kafka-secondary-cluster 重複上述三個步驟:設定 IP 通告、設定接聽程式,並記下訊息代理程式和 ZooKeeper IP 位址。

建立主題

  1. 使用 SSH 連線至主要叢集:

    ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.net
    

    sshuser 取代為建立叢集時所使用的 SSH 使用者名稱。 將 PRIMARYCLUSTER 取代為建立叢集時所使用的基礎名稱。

    如需詳細資訊,請參閱搭配 HDInsight 使用 SSH

  2. 使用下列命令來建立兩個環境變數,其中的主要叢集包含 Apache ZooKeeper 主機和訊息代理程式主機。 將 ZOOKEEPER_IP_ADDRESS1 之類的字串取代為先前記錄的實際 IP 位址,例如 10.23.0.1110.23.0.7BROKER_IP_ADDRESS1 也是如此。 如果您使用 FQDN 解析搭配自訂 DNS 伺服器,請遵循這些步驟來取得訊息代理程式和 ZooKeeper 名稱。

    # get the ZooKeeper hosts for the primary cluster
    export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181'
    
    # get the broker hosts for the primary cluster
    export PRIMARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    
  3. 若要建立名為 testtopic 的主題,請使用下列命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
    
  4. 使用下列命令確認已建立主題:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
    

    回應包含 testtopic

  5. 使用下列命令來檢視這個 (主要) 叢集的訊息代理程式主機資訊:

    echo $PRIMARY_BROKERHOSTS
    

    此命令會傳回類似以下文字的資訊:

    10.23.0.11:9092,10.23.0.7:9092,10.23.0.9:9092

    請儲存此資訊。 此資訊用於下一節。

設定鏡像功能

  1. 使用不同的 SSH 工作階段連線至次要叢集:

    ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.net
    

    sshuser 取代為建立叢集時所使用的 SSH 使用者名稱。 將 SECONDARYCLUSTER 取代為建立叢集時所使用的名稱。

    如需詳細資訊,請參閱搭配 HDInsight 使用 SSH

  2. 使用 consumer.properties 檔案來設定與主要叢集的通訊。 若要建立檔案,請使用下列命令:

    nano consumer.properties
    

    使用下列文字做為 consumer.properties 檔案的內容:

    bootstrap.servers=PRIMARY_BROKERHOSTS
    group.id=mirrorgroup
    

    PRIMARY_BROKERHOSTS 取代為來自主要叢集的訊息代理程式主機 IP 位址。

    此檔案描述從主要 Kafka 叢集讀取資料時所要使用的取用者資訊。 如需詳細資訊,請參閱位於 kafka.apache.org取用者組態

    若要儲存檔案,請按 Ctrl+X、按 Y,然後按 Enter。

  3. 設定與次要叢集通訊的產生者之前,請先為次要叢集的訊息代理程式 IP 位址設定變數。 使用下列命令來建立此變數:

    export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    

    命令 echo $SECONDARY_BROKERHOSTS 應該會傳回類似下列文字的資訊:

    10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092

  4. 使用 producer.properties 檔案來與次要叢集通訊。 若要建立檔案,請使用下列命令:

    nano producer.properties
    

    使用下列文字做為 producer.properties 檔案的內容:

    bootstrap.servers=SECONDARY_BROKERHOSTS
    compression.type=none
    

    SECONDARY_BROKERHOSTS 取代為上一個步驟中使用的訊息代理程式 IP 位址。

    如需詳細資訊,請參閱位於 kafka.apache.org生產者組態

  5. 使用下列命令來建立環境變數,其中包含次要叢集 ZooKeeper 主機的 IP 位址:

    # get the ZooKeeper hosts for the secondary cluster
    export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
    
  6. HDInsight 上 Kafka 的預設組態不允許自動建立主題。 您必須先使用下列其中一個選項,才能啟動鏡像程序:

    • 在次要叢集上建立主題:此選項也可讓您設定分割數目和複寫因子。

      您可以使用下列命令提前建立主題:

      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
      

      testtopic 取代為要建立的主題名稱。

    • 設定可供自動建立主題的叢集:此選項可讓 MirrorMaker 自動建立主題。 請注意,可能會以與主要主題不同的分割區數目或不同的複寫因數來建立它們。

      若要設定次要叢集來自動建立主題,請執行下列步驟:

      1. 移至次要叢集的 Ambari 儀表板:https://SECONDARYCLUSTERNAME.azurehdinsight.net
      2. 選取 [服務]> [Kafka]。 然後選取 [設定] 索引標籤。
      3. 在 [篩選] 欄位中,輸入 auto.create 的值。 這會篩選屬性清單並顯示 auto.create.topics.enable 設定。
      4. auto.create.topics.enable 的值變更為 true,然後選取 [儲存]。 新增附註,然後再次選取 [儲存]
      5. 依序選取 [Kafka] 服務、[重新啟動] 和 [重新啟動所有受影響的]。 出現提示時,選取 [確認全部重新啟動]

      Screenshot that shows how to enable auto create topics in the kafka service.

啟動 MirrorMaker

注意

本文會參考 Microsoft 不再使用的術語。 從軟體中移除該字詞時,我們也會將其從本文中移除。

  1. 在連往次要叢集的 SSH 連線中,使用下列命令來啟動 MirrorMaker 程序:

    /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
    

    此範例中使用的參數:

    參數 描述
    --consumer.config 指定包含取用者屬性的檔案。 您可以使用這些屬性來建立會從主要 Kafka 叢集讀取的取用者。
    --producer.config 指定包含產生者屬性的檔案。 您可以使用這些屬性來建立會寫入次要 Kafka 叢集的產生者。
    --whitelist MirrorMaker 從主要叢集複寫至次要叢集的主題清單。
    --num.streams 要建立的取用者執行緒數目。

    次要節點上的取用者現在正在等候接收訊息。

  2. 從連往主要叢集的 SSH 連線中,使用下列命令來啟動產生者,然後傳送訊息到主題:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $PRIMARY_BROKERHOSTS --topic testtopic
    

    當您抵達有游標的空白行時,請輸入一些文字訊息。 訊息會傳送到主要叢集上的主題。 完成後,使用 Ctrl + C 來結束產生者程序。

  3. 從連往次要叢集的 SSH 連線,按 Ctrl+C 來結束 MirrorMaker 程序。 這可能需要數秒鐘的時間才能完成程序。 若要驗證訊息已複寫到次要叢集,請使用下列命令:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_BROKERHOSTS --topic testtopic --from-beginning
    

    主題清單現在包含 testtopic,它是在 MirrorMaster 將主題從主要叢集鏡像至次要叢集時所建立。 從主題中擷取的訊息與在主要叢集上所輸入的內容相同。

選取叢集

警告

不論使用與否,HDInsight 叢集都是按分鐘計費。 請務必在使用完叢集後將它刪除。 請參閱如何刪除 HDInsight 叢集

本文中的步驟會在不同的 Azure 資源群組中建立叢集。 若要刪除所有建立的資源,您可以刪除建立的兩個資源群組:kafka-primary-rgkafka-secondary-rg。 刪除資源群組會移除遵循本文所建立的所有資源,包括叢集、虛擬網路和儲存體帳戶。

下一步

在本文中,您已學會如何使用 MirrorMaker 來建立 Apache Kafka 叢集的複本。 使用下列連結來探索使用 Kafka 的其他方式︰