Integrace podpory připojení Apache Kafka ve službě Azure Event Hubs

Apache Kafka Připojení je architektura pro připojení a import a export dat z/do jakéhokoli externího systému, jako je MySQL, HDFS a systém souborů prostřednictvím clusteru Kafka. Tento kurz vás provede používáním architektury kafka Připojení se službou Event Hubs.

Tento kurz vás provede integrací Připojení Kafka s centrem událostí a nasazením základních konektorů FileStreamSource a FileStreamSink. I když tyto konektory nejsou určené pro produkční použití, předvádějí ucelený scénář Připojení Kafka, ve kterém azure Event Hubs funguje jako zprostředkovatel Kafka.

Poznámka:

Tato ukázka je k dispozici na GitHubu.

V tomto kurzu provedete následující kroky:

  • Vytvoření oboru názvů služby Event Hubs
  • Naklonování ukázkového projektu
  • Konfigurace připojení Kafka pro službu Event Hubs
  • Spuštění připojení Kafka
  • Vytvoření konektorů

Požadavky

Abyste mohli dokončit tento návod, ujistěte se, že máte následující:

Vytvoření oboru názvů služby Event Hubs

K odesílání do jakékoli služby Event Hubs a příjmu z ní se vyžaduje obor názvů služby Event Hubs. Pokyny k vytvoření oboru názvů a centra událostí najdete v tématu Vytvoření centra událostí. Získejte plně kvalifikovaný název domény a připojovací řetězec služby Event Hubs pro pozdější použití. Pokyny najdete v tématu Získání připojovacího řetězce služby Event Hubs.

Naklonování ukázkového projektu

Naklonujte úložiště Azure Event Hubs a přejděte do podsložky tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Konfigurace připojení Kafka pro službu Event Hubs

Přesměrování propustnosti připojení Kafka z Kafka do služby Event Hubs vyžaduje minimální konfiguraci. Následující ukázka souboru connect-distributed.properties znázorňuje, jak pro připojení nakonfigurovat ověřování a komunikaci s koncovým bodem Kafka ve službě Event Hubs:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Důležité

Nahraďte {YOUR.EVENTHUBS.CONNECTION.STRING} připojovací řetězec pro obor názvů služby Event Hubs. Pokyny k získání připojovací řetězec najdete v tématu Získání připojovací řetězec služby Event Hubs. Tady je příklad konfigurace: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Spuštění připojení Kafka

V tomto kroku se místně spustí pracovní proces připojení Kafka v distribuovaném režimu a k zachování stavu clusteru s použije služba Event Hubs.

  1. Místně uložte výše uvedený soubor connect-distributed.properties. Nezapomeňte nahradit všechny hodnoty v závorkách.
  2. Na svém počítači přejděte do umístění verze Kafka.
  3. Spusťte ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Jakmile se zobrazí 'INFO Finished starting connectors and tasks', rozhraní REST API pracovního procesu připojení je připravené k interakci.

Poznámka:

Kafka Připojení používá rozhraní Kafka Správa Client API k automatickému vytváření témat s doporučenými konfiguracemi, včetně komprimace. Rychlou kontrolou oboru názvů na webu Azure Portal zjistíte, že se interní témata pracovního procesu připojení vytvořila automaticky.

Kafka Připojení interní témata musí používat komprimace. Tým služby Event Hubs neodpovídá za opravu nesprávných konfigurací, pokud jsou nesprávně nakonfigurovaná interní Připojení témata.

Vytvoření konektorů

Tato část vás provede aktivací konektorů FileStreamSource a FileStreamSink.

  1. Vytvořte adresář pro vstupní a výstupní datové soubory.

    mkdir ~/connect-quickstart
    
  2. Vytvořte dva soubory: jeden soubor s počátečními hodnotami, ze kterého bude číst konektor FileStreamSource, a druhý soubor, do kterého bude zapisovat konektor FileStreamSink.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Vytvořte konektor FileStreamSource. Nezapomeňte nahradit složené závorky cestou k vašemu domovskému adresáři.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Po spuštění výše uvedeného příkazu by se mělo zobrazit centrum connect-quickstart událostí ve vaší instanci služby Event Hubs.

  4. Zkontrolujte stav konektoru zdroje.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Volitelně můžete pomocí Service Bus Exploreru ověřit, že události dorazily do tématu connect-quickstart.

  5. Vytvořte konektor FileStreamSink. Nezapomeňte opět nahradit složené závorky cestou k vašemu domovskému adresáři.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Zkontrolujte stav konektoru jímky.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Ověřte, že se data replikovala mezi soubory a že jsou v obou souborech identická.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Vyčištění

Kafka Připojení vytváří témata služby Event Hubs pro ukládání konfigurací, posunů a stavu, které se uchovávají i po ukončení clusteru Připojení. Pokud toto trvalost není žádoucí, doporučujeme tato témata odstranit. Můžete také odstranit connect-quickstart službu Event Hubs, kterou jste vytvořili v tomto názorném postupu.

Další kroky

Další informace o službě Event Hubs pro Kafka najdete v následujících článcích: