Integrace podpory připojení Apache Kafka ve službě Azure Event Hubs
Apache Kafka Připojení je architektura pro připojení a import a export dat z/do jakéhokoli externího systému, jako je MySQL, HDFS a systém souborů prostřednictvím clusteru Kafka. Tento kurz vás provede používáním architektury kafka Připojení se službou Event Hubs.
Tento kurz vás provede integrací Připojení Kafka s centrem událostí a nasazením základních konektorů FileStreamSource a FileStreamSink. I když tyto konektory nejsou určené pro produkční použití, předvádějí ucelený scénář Připojení Kafka, ve kterém azure Event Hubs funguje jako zprostředkovatel Kafka.
Poznámka:
Tato ukázka je k dispozici na GitHubu.
V tomto kurzu provedete následující kroky:
- Vytvoření oboru názvů služby Event Hubs
- Naklonování ukázkového projektu
- Konfigurace připojení Kafka pro službu Event Hubs
- Spuštění připojení Kafka
- Vytvoření konektorů
Požadavky
Abyste mohli dokončit tento návod, ujistěte se, že máte následující:
- Předplatné Azure. Pokud žádné nemáte, vytvořte si bezplatný účet.
- Git
- Linux/MacOS
- Nejnovější vydaná verze Kafka z kafka.apache.org
- Přečtěte si úvodní článek Event Hubs pro Apache Kafka.
Vytvoření oboru názvů služby Event Hubs
K odesílání do jakékoli služby Event Hubs a příjmu z ní se vyžaduje obor názvů služby Event Hubs. Pokyny k vytvoření oboru názvů a centra událostí najdete v tématu Vytvoření centra událostí. Získejte plně kvalifikovaný název domény a připojovací řetězec služby Event Hubs pro pozdější použití. Pokyny najdete v tématu Získání připojovacího řetězce služby Event Hubs.
Naklonování ukázkového projektu
Naklonujte úložiště Azure Event Hubs a přejděte do podsložky tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Konfigurace připojení Kafka pro službu Event Hubs
Přesměrování propustnosti připojení Kafka z Kafka do služby Event Hubs vyžaduje minimální konfiguraci. Následující ukázka souboru connect-distributed.properties
znázorňuje, jak pro připojení nakonfigurovat ověřování a komunikaci s koncovým bodem Kafka ve službě Event Hubs:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Důležité
Nahraďte {YOUR.EVENTHUBS.CONNECTION.STRING}
připojovací řetězec pro obor názvů služby Event Hubs. Pokyny k získání připojovací řetězec najdete v tématu Získání připojovací řetězec služby Event Hubs. Tady je příklad konfigurace: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Spuštění připojení Kafka
V tomto kroku se místně spustí pracovní proces připojení Kafka v distribuovaném režimu a k zachování stavu clusteru s použije služba Event Hubs.
- Místně uložte výše uvedený soubor
connect-distributed.properties
. Nezapomeňte nahradit všechny hodnoty v závorkách. - Na svém počítači přejděte do umístění verze Kafka.
- Spusťte
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. Jakmile se zobrazí'INFO Finished starting connectors and tasks'
, rozhraní REST API pracovního procesu připojení je připravené k interakci.
Poznámka:
Kafka Připojení používá rozhraní Kafka Správa Client API k automatickému vytváření témat s doporučenými konfiguracemi, včetně komprimace. Rychlou kontrolou oboru názvů na webu Azure Portal zjistíte, že se interní témata pracovního procesu připojení vytvořila automaticky.
Kafka Připojení interní témata musí používat komprimace. Tým služby Event Hubs neodpovídá za opravu nesprávných konfigurací, pokud jsou nesprávně nakonfigurovaná interní Připojení témata.
Vytvoření konektorů
Tato část vás provede aktivací konektorů FileStreamSource a FileStreamSink.
Vytvořte adresář pro vstupní a výstupní datové soubory.
mkdir ~/connect-quickstart
Vytvořte dva soubory: jeden soubor s počátečními hodnotami, ze kterého bude číst konektor FileStreamSource, a druhý soubor, do kterého bude zapisovat konektor FileStreamSink.
seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Vytvořte konektor FileStreamSource. Nezapomeňte nahradit složené závorky cestou k vašemu domovskému adresáři.
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Po spuštění výše uvedeného příkazu by se mělo zobrazit centrum
connect-quickstart
událostí ve vaší instanci služby Event Hubs.Zkontrolujte stav konektoru zdroje.
curl -s http://localhost:8083/connectors/file-source/status
Volitelně můžete pomocí Service Bus Exploreru ověřit, že události dorazily do tématu
connect-quickstart
.Vytvořte konektor FileStreamSink. Nezapomeňte opět nahradit složené závorky cestou k vašemu domovskému adresáři.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Zkontrolujte stav konektoru jímky.
curl -s http://localhost:8083/connectors/file-sink/status
Ověřte, že se data replikovala mezi soubory a že jsou v obou souborech identická.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Vyčištění
Kafka Připojení vytváří témata služby Event Hubs pro ukládání konfigurací, posunů a stavu, které se uchovávají i po ukončení clusteru Připojení. Pokud toto trvalost není žádoucí, doporučujeme tato témata odstranit. Můžete také odstranit connect-quickstart
službu Event Hubs, kterou jste vytvořili v tomto názorném postupu.
Další kroky
Další informace o službě Event Hubs pro Kafka najdete v následujících článcích: