Verwenden von MirrorMaker zum Replizieren von Apache Kafka-Themen mit Kafka in HDInsight

Erfahren Sie, wie Sie die Spiegelungsfunktion von Apache Kafka verwenden, um Themen in einen sekundären Cluster zu replizieren. Sie können die Spiegelung als kontinuierlichen Prozess oder intermittierend ausführen, um Daten von einem Cluster in einen anderen zu migrieren.

In diesem Artikel werden Sie die Spiegelung verwenden, um Themen zwischen zwei HDInsight-Clustern zu replizieren. Diese Cluster befinden sich in verschiedenen virtuellen Netzen in unterschiedlichen Rechenzentren.

Warnung

Verwenden Sie die Spiegelung nicht als Mittel, um Fehlertoleranz zu erreichen. Der Versatz zu Elementen innerhalb eines Themas ist zwischen dem primären und dem sekundären Cluster unterschiedlich, so dass die Kunden die beiden nicht austauschbar verwenden können. Falls Sie Bedenken wegen der Fehlertoleranz haben, sollten Sie die Replikation für die Themen in Ihrem Cluster festlegen. Weitere Informationen finden Sie unter Schnellstart: Erstellen eines Apache Kafka-Clusters in HDInsight.

Funktionsweise der Apache Kafka-Spiegelung

Das Mirroring funktioniert mit dem Tool MirrorMaker, das Teil von Apache Kafka ist. MirrorMaker konsumiert Datensätze von Themen auf dem primären Cluster und erstellt dann eine lokale Kopie auf dem sekundären Cluster. MirrorMaker nutzt einen (oder mehrere) Consumer zum Lesen von Daten aus dem primären Cluster und einen Producer, der in den lokalen Cluster (sekundären Cluster) schreibt.

Das nützlichste Mirroring-Setup für die Notfallwiederherstellung verwendet Kafka-Cluster in verschiedenen Azure-Regionen. Zu diesem Zweck werden die virtuellen Netzwerke, in denen sich die Cluster befinden, zu einem Peeringnetzwerk zusammengeschlossen.

Das folgende Diagramm veranschaulicht den Spiegelungsprozess und den Kommunikationsfluss zwischen den Clustern:

Diagramm des Spiegelungsprozesses.

Der primäre und der sekundäre Cluster können sich in Bezug auf die Anzahl von Knoten und Partitionen unterscheiden, und auch der Versatz in den Themen ist unterschiedlich. Beim Spiegeln wird der Schlüsselwert beibehalten, der für die Partitionierung verwendet wird, sodass die Datensatzreihenfolge pro Schlüssel beibehalten wird.

Spiegelung über Netzwerkgrenzen hinweg

Wenn Sie eine Spiegelung zwischen Kafka-Clustern in unterschiedlichen Netzwerken durchführen müssen, sollten Sie außerdem Folgendes beachten:

  • Gateways: Die Netzwerke müssen auf TCP/IP-Ebene kommunizieren können.

  • Server-Adressierung: Sie können Ihre Clusterknoten über ihre IP-Adressen oder vollständig qualifizierte Domänennamen adressieren.

    • IP-Adressen: Wenn Sie Ihre Kafka-Cluster für die Verwendung von IP-Adresswerbung konfigurieren, können Sie mit der Einrichtung der Spiegelung fortfahren, indem Sie die IP-Adressen der Broker-Knoten und ZooKeeper-Knoten verwenden.

    • Domänennamen: Wenn Sie Ihre Kafka-Cluster nicht für die Bekanntgabe von IP-Adressen konfigurieren, müssen die Cluster in der Lage sein, über vollständig qualifizierte Domänennamen (FQDNs) eine Verbindung zueinander herzustellen. Dazu ist in jedem Netz ein DNS-Server (Domain Name System) erforderlich, der so konfiguriert ist, dass er Anfragen an die anderen Netze weiterleitet. Wenn Sie ein virtuelles Azure-Netzwerk erstellen, müssen Sie anstelle des automatischen DNS, das mit dem Netzwerk bereitgestellt wird, einen benutzerdefinierten DNS-Server und die IP-Adresse für den Server angeben. Nachdem Sie das virtuelle Netzwerk erstellt haben, müssen Sie anschließend eine virtuelle Azure-Maschine erstellen, die diese IP-Adresse verwendet. Dann installieren und konfigurieren Sie die DNS-Software auf dem Gerät.

    Wichtig

    Erstellen und konfigurieren Sie den benutzerdefinierten DNS-Server, bevor Sie HDInsight im virtuellen Netzwerk installieren. Es ist keine zusätzliche Konfiguration erforderlich, damit HDInsight den für das virtuelle Netzwerk konfigurierten DNS-Server verwendet.

Weitere Informationen zur Verbindung zweier virtueller Azure-Netzwerke finden Sie unter Konfigurieren einer Verbindung.

Architektur der Spiegelung

Diese Architektur umfasst zwei Cluster in verschiedenen Ressourcengruppen und virtuellen Netzen: einen primären und einen sekundären.

Schritte zur Erstellung

  1. Erstellen Sie zwei neue Ressourcengruppen:

    Resource group Standort
    kafka-primary-rg USA, Mitte
    kafka-secondary-rg USA Nord Mitte
  2. Erstellen Sie in kafka-primary-rg ein neues virtuelles Netzwerk namens kafka-primary-vnet. Übernehmen Sie die Standardeinstellungen.

  3. Erstellen Sie – ebenfalls mit den Standardeinstellungen – in kafka-secondary-rg ein neues virtuelles Netzwerk namens kafka-secondary-vnet.

  4. Erstellen Sie zwei neue Kafka-Cluster:

    Clustername Resource group Virtuelles Netzwerk Speicherkonto
    kafka-primary-cluster kafka-primary-rg kafka-primary-vnet kafkaprimarystorage
    kafka-secondary-cluster kafka-secondary-rg kafka-secondary-vnet kafkasecondarystorage
  5. Erstellen Sie virtuelle Netzwerkpeerings. Dieser Schritt erzeugt zwei Peerings: eines von kafka-primär-vnet zu kafka-secondary-vnet und eines zurück von kafka-secondary-vnet zu kafka-primär-vnet.

    1. Wählen Sie das virtuelle Netzwerk kafka-primary-vnet aus.

    2. Klicken Sie unter Einstellungen auf Peerings.

    3. Wählen Sie Hinzufügen.

    4. Geben Sie auf dem Bildschirm Peering hinzufügen die Details ein, wie in der folgenden Abbildung gezeigt.

      Screenshot, der zeigt, dass H D Insight Kafka ein virtuelles Netzwerk-Peering hinzufügt.

Konfigurieren der Ankündigung der IP-Adresse

Konfigurieren Sie die IP-Werbung, damit ein Client eine Verbindung über Broker-IP-Adressen anstelle von Domänennamen herstellen kann.

  1. Wechseln Sie zum Ambari-Dashboard für den primären Cluster: https://PRIMARYCLUSTERNAME.azurehdinsight.net.

  2. Wählen Sie Dienste>Kafka aus. Wählen Sie die Registerkarte Configs aus.

  3. Fügen Sie dem unteren Abschnitt kafka-env template folgende Konfigurationszeilen hinzu. Wählen Sie Speichern aus.

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  4. Geben Sie auf dem Bildschirm Konfiguration speichern einen Hinweis ein, und wählen Sie Speichern.

  5. Wenn Sie eine Konfigurationswarnung erhalten, wählen Sie Beständig fortfahren.

  6. Wählen Sie auf Konfigurationsänderungen speichern die Option Ok.

  7. Wählen Sie in der Meldung Neustart erforderlich die Optionen Neustart>Neustart für alle Betroffenen. Wählen Sie dann Bestätigen Sie den Neustart aller.

    Screenshot, der die Apache Ambari-Option zum Neustart aller Betroffenen zeigt.

Konfigurieren Sie Kafka so, dass es auf allen Netzwerkschnittstellen lauscht

  1. Bleiben Sie auf der Registerkarte Configs (Konfigurationen) unter Services>Kafka (Dienste > Kafka). Setzen Sie im Abschnitt Kafka Broker die Eigenschaft listeners auf PLAINTEXT://0.0.0.0:9092.
  2. Wählen Sie Speichern aus.
  3. Wählen Sie Neustart>Bestätigen Sie Neustart alle.

Aufzeichnung der Broker-IP-Adressen und ZooKeeper-Adressen für den primären Cluster

  1. Wählen Sie auf dem Ambari-Dashboard Hosts aus.

  2. Notieren Sie sich die IP-Adressen für die Broker und ZooKeeper. Die Broker-Knoten haben wn als die ersten beiden Buchstaben des Host-Namens, und die ZooKeeper-Knoten haben zk als die ersten beiden Buchstaben des Host-Namens.

    Screenshot, der die Apache Ambari-Ansicht node i p addresses zeigt.

  3. Wiederholen Sie die vorherigen drei Schritte für den zweiten Cluster, kafka-secondary-cluster: Konfigurieren Sie die IP-Werbung, legen Sie Listener fest und notieren Sie sich die IP-Adressen von Broker und ZooKeeper.

Erstellen von Themen

  1. Verbinden Sie sich mit dem primären Cluster über SSH:

    ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.net
    

    Ersetzen Sie sshuser durch den SSH-Benutzernamen, den Sie bei der Erstellung des Clusters verwendet haben. Ersetzen Sie PRIMARYCLUSTER durch den Basisnamen, den Sie bei der Erstellung des Clusters verwendet haben.

    Weitere Informationen finden Sie unter Verwenden von SSH mit Linux-basiertem Hadoop in HDInsight unter Linux, Unix oder OS X.

  2. Verwenden Sie den folgenden Befehl, um zwei Umgebungsvariablen mit den Apache ZooKeeper-Hosts und Broker-Hosts für den primären Cluster zu erstellen. Ersetzen Sie Zeichenfolgen wie ZOOKEEPER_IP_ADDRESS1 durch die tatsächlichen IP-Adressen, die zuvor aufgezeichnet wurden, z. B. 10.23.0.11 und 10.23.0.7. Dasselbe gilt für BROKER_IP_ADDRESS1. Wenn Sie die FQDN-Auflösung mit einem benutzerdefinierten DNS-Server verwenden, führen Sie die folgenden Schritte aus, um Broker- und ZooKeeper-Namen zu erhalten.

    # get the ZooKeeper hosts for the primary cluster
    export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181'
    
    # get the broker hosts for the primary cluster
    export PRIMARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    
  3. Um ein Thema mit Namen testtopic zu erstellen, nutzen Sie den folgenden Befehl:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
    
  4. Verwenden Sie den folgenden Befehl, um zu bestätigen, dass das Thema erstellt wurde:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
    

    Die Antwort enthält testtopic.

  5. Gehen Sie wie folgt vor, um die Broker-Host-Informationen für diesen (den primären) Cluster anzuzeigen:

    echo $PRIMARY_BROKERHOSTS
    

    Die Ausgabe sieht in etwa wie folgt aus:

    10.23.0.11:9092,10.23.0.7:9092,10.23.0.9:9092

    Speichern Sie diese Informationen. Sie werden im nächsten Abschnitt verwendet.

Konfigurieren der Spiegelung

  1. Verbinden Sie sich mit dem sekundären Cluster, indem Sie eine andere SSH-Sitzung verwenden:

    ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.net
    

    Ersetzen Sie sshuser durch den SSH-Benutzernamen, den Sie bei der Erstellung des Clusters verwendet haben. Ersetzen Sie SECONDARYCLUSTER durch den Namen, den Sie bei der Erstellung des Clusters verwendet haben.

    Weitere Informationen finden Sie unter Verwenden von SSH mit Linux-basiertem Hadoop in HDInsight unter Linux, Unix oder OS X.

  2. Verwenden Sie eine consumer.properties-Datei, um die Kommunikation mit dem Primärcluster zu konfigurieren. Verwenden Sie zum Erstellen der Datei den folgenden Befehl:

    nano consumer.properties
    

    Verwenden Sie als Inhalt der Datei consumer.properties den folgenden Text:

    bootstrap.servers=PRIMARY_BROKERHOSTS
    group.id=mirrorgroup
    

    Ersetzen Sie PRIMARY_BROKERHOSTS durch die IP-Adressen der Broker-Hosts des primären Clusters.

    In dieser Datei werden die Consumerinformationen beschrieben, die beim Lesen aus dem primären Kafka-Cluster verwendet werden sollten. Weitere Informationen finden Sie unter Verbraucherkonfigurationen auf kafka.apache.org.

    Um die Datei zu speichern, drücken Sie Strg+X, drücken Sie Y und dann die Eingabetaste.

  3. Bevor Sie den Producer konfigurieren, der mit dem sekundären Cluster kommuniziert, richten Sie eine Variable für die IP-Adressen der Broker des sekundären Clusters ein. Verwenden Sie die folgenden Befehle, um diese Variable zu erstellen:

    export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    

    Der Befehl echo $SECONDARY_BROKERHOSTS gibt Informationen ähnlich dem folgenden Text zurück:

    10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092

  4. Verwenden Sie eine producer.properties-Datei, um den sekundären Cluster zu kommunizieren. Verwenden Sie zum Erstellen der Datei den folgenden Befehl:

    nano producer.properties
    

    Verwenden Sie als Inhalt der Datei producer.properties den folgenden Text:

    bootstrap.servers=SECONDARY_BROKERHOSTS
    compression.type=none
    

    Ersetzen Sie SECONDARY_BROKERHOSTS durch die im vorherigen Schritt verwendeten Broker-IP-Adressen.

    Weitere Informationen finden Sie unter Producer Configs auf kafka.apache.org.

  5. Verwenden Sie die folgenden Befehle, um eine Umgebungsvariable mit den IP-Adressen der ZooKeeper-Hosts für den sekundären Cluster zu erstellen:

    # get the ZooKeeper hosts for the secondary cluster
    export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
    
  6. Die Standardkonfiguration für Kafka auf HDInsight erlaubt keine automatische Erstellung von Themen. Sie müssen eine der folgenden Optionen verwenden, bevor Sie den Spiegelungsprozess starten:

    • Erstellen der Themen im sekundären Cluster: Bei dieser Option haben Sie auch die Möglichkeit, die Anzahl von Partitionen und den Replikationsfaktor festzulegen.

      Mit folgendem Befehl können Sie Themen vorab erstellen:

      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
      

      Ersetzen Sie testtopic durch den Namen des zu erstellenden Themas.

    • Konfigurieren Sie den Cluster für die automatische Themenerstellung: Diese Option ermöglicht es MirrorMaker, automatisch Themen zu erstellen. Beachten Sie, dass es diese mit einer anderen Anzahl von Partitionen oder einem anderen Replikationsfaktor als das primäre Thema erstellen kann.

      Um den sekundären Cluster für das automatische Erstellen von Themen zu konfigurieren, führen Sie die folgenden Schritte aus:

      1. Wechseln Sie zum Ambari-Dashboard für den sekundären Cluster: https://SECONDARYCLUSTERNAME.azurehdinsight.net.
      2. Wählen Sie Dienste>Kafka aus. Wählen Sie dann die Registerkarte Konfigurationen.
      3. Geben Sie in das Feld Filter den Wert auto.create ein. Dies filtert die Liste der Eigenschaften und zeigt die Einstellung auto.create.topics.enable.
      4. Ändern Sie den Wert von auto.create.topics.enable zu true, und klicken Sie dann auf Speichern. Fügen Sie einen Hinweis hinzu, und wählen Sie dann erneut Speichern.
      5. Wählen Sie den Dienst Kafka, dann die Option Neu starten und abschließend die Option Neustart aller betroffenen. Klicken Sie bei entsprechender Aufforderung auf Neustart aller Dienste bestätigen.

      Screenshot, der zeigt, wie man die automatische Erstellung von Themen im Kafka-Dienst aktiviert.

Starten von MirrorMaker

Hinweis

In diesem Artikel wird ein Begriff verwendet, der von Microsoft nicht mehr genutzt wird. Sobald der Begriff aus der Software entfernt wurde, wird er auch aus diesem Artikel entfernt.

  1. Verwenden Sie den folgenden Befehl über die SSH-Verbindung mit dem sekundären Cluster, um den MirrorMaker-Prozess zu starten:

    /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
    

    Die in diesem Beispiel verwendeten Parameter sind:

    Parameter BESCHREIBUNG
    --consumer.config Gibt die Datei an, in der die Consumereigenschaften enthalten sind. Sie verwenden diese Eigenschaften, um einen Verbraucher zu erstellen, der aus dem primären Kafka-Cluster liest.
    --producer.config Gibt die Datei an, in der die Producereigenschaften enthalten sind. Sie verwenden diese Eigenschaften, um einen Produzenten zu erstellen, der in den sekundären Kafka-Cluster schreibt.
    --whitelist Eine Liste mit Themen, die von MirrorMaker aus dem primären in den sekundären Cluster repliziert werden.
    --num.streams Die Anzahl von Consumerthreads, die erstellt werden sollen.

    Der Consumer auf dem sekundären Knoten wartet jetzt auf Nachrichten.

  2. Verwenden Sie den folgenden Befehl über die SSH-Verbindung mit dem primären Cluster, um einen Producer zu starten und Nachrichten an das Thema zu senden:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $PRIMARY_BROKERHOSTS --topic testtopic
    

    Sobald der Cursor an einer leeren Zeile steht, geben Sie einige Textnachrichten ein. Diese Nachrichten werden an das Thema im primären Cluster gesendet. Wenn Sie fertig sind, drücken Sie Strg+C, um den Erzeugungsprozess zu beenden.

  3. Drücken Sie bei der SSH-Verbindung zum sekundären Cluster die Tastenkombination Strg+C, um den MirrorMaker-Prozess zu beenden. Es kann einige Sekunden dauern, bis der Vorgang abgeschlossen ist. Um sicherzustellen, dass die Nachrichten im sekundären Cluster repliziert wurden, verwenden Sie den folgenden Befehl:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_BROKERHOSTS --topic testtopic --from-beginning
    

    Die Themenliste enthält nun auch das Thema testtopic, das erstellt wurde, als MirrorMaker das Thema vom primären an den sekundären Cluster gespiegelt hat. Die aus dem Thema abgerufenen Nachrichten sind identisch mit den Nachrichten, die Sie im primären Cluster eingegeben haben.

Löschen des Clusters

Warnung

Die Abrechnung für die HDInsight-Cluster erfolgt anteilsmäßig auf Minutenbasis und ist unabhängig von der Verwendung. Daher sollten Sie Ihren Cluster nach der Verwendung unbedingt wieder löschen. Sehen Sie sich die Informationen zum Löschen eines HDInsight-Clusters an.

Mit den Schritten in diesem Artikel wurden Cluster in verschiedenen Azure-Ressourcengruppen erstellt. Um alle erstellten Ressourcen zu löschen, können Sie die beiden erstellten Ressourcengruppen löschen: kafka-primary-rg und kafka-secondary-rg. Durch das Löschen der Ressourcengruppen werden alle Ressourcen entfernt, die nach diesem Artikel erstellt wurden, einschließlich Cluster, virtuelle Netzwerke und Speicherkonten.

Nächste Schritte

In diesem Artikel haben Sie gelernt, wie man MirrorMaker verwendet, um ein Replikat eines Apache Kafka-Clusters zu erstellen. Verwenden Sie die folgenden Links, um weitere Möglichkeiten zur Arbeit mit Kafka kennenzulernen: