Integrare il supporto di Apache Kafka Connect in Hub eventi di Azure
Apache Kafka Connessione è un framework per connettersi ed importare/esportare dati da/a qualsiasi sistema esterno, ad esempio MySQL, HDFS e file system tramite un cluster Kafka. Questa esercitazione illustra in dettaglio come usare il framework di Kafka Connect con Hub eventi.
L'esercitazione mostra come integrare Kafka Connect con un hub eventi di Azure e come distribuire i connettori di base FileStreamSource e FileStreamSink. Anche se questi connettori non sono destinati all'uso in produzione, dimostrano uno scenario kafka end-to-end Connessione in cui Hub eventi di Azure funge da broker Kafka.
Nota
Questo esempio è disponibile in GitHub.
In questa esercitazione vengono completati i passaggi seguenti:
- Creare uno spazio dei nomi di Hub eventi
- Clonare il progetto di esempio
- Configurare Kafka Connect per Hub eventi
- Eseguire Kafka Connect
- Creare i connettori
Prerequisiti
Per completare questa procedura dettagliata, verificare di disporre dei prerequisiti seguenti:
- Abbonamento di Azure. Se non se ne ha una, creare un account gratuito.
- Git
- Linux/MacOS
- Versione più recente di Kafka disponibile da kafka.apache.org
- Leggere con attenzione l'articolo introduttivo Hub eventi per Apache Kafka
Creare uno spazio dei nomi di Hub eventi
Per l'invio e la ricezione da qualsiasi servizio Hub eventi è richiesto uno spazio dei nomi di Hub eventi. Per istruzioni su come creare uno spazio dei nomi e un hub eventi, vedere Creazione di un hub eventi. Ottenere la stringa di connessione di Hub eventi e il nome di dominio completo (FQDN) da usare successivamente. Per istruzioni, vedere Ottenere una stringa di connessione ad Hub eventi.
Clonare il progetto di esempio
Clonare il repository di Hub eventi di Azure e passare alla sottocartella tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Configurare Kafka Connect per Hub eventi
Quando si reindirizza la velocità effettiva di Kafka Connect da Kafka a Hub eventi, è necessaria una riconfigurazione minima. Il codice di esempio connect-distributed.properties
seguente illustra come configurare Connect per autenticare e comunicare con l'endpoint Kafka in Hub eventi:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Importante
Sostituire {YOUR.EVENTHUBS.CONNECTION.STRING}
con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere la stringa di connessione, vedere Ottenere una stringa di connessione ad Hub eventi. Ecco un esempio di configurazione: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Eseguire Kafka Connect
In questo passaggio un ruolo di lavoro Kafka Connect viene avviato localmente in modalità distribuita, usando Hub eventi per gestire lo stato del cluster.
- Salvare il file
connect-distributed.properties
in locale. Assicurarsi di sostituire tutti i valori racchiusi tra parentesi graffe. - Passare alla posizione della versione di Kafka nel computer.
- Eseguire
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. Quando viene visualizzato l'avviso'INFO Finished starting connectors and tasks'
, l'API REST del ruolo di lavoro Connect è pronta per l'interazione.
Nota
Kafka Connect usa l'API Kafka AdminClient per creare automaticamente argomenti con le configurazioni consigliate, inclusa la compattazione. Un rapido controllo dello spazio dei nomi nel portale di Azure conforma che gli argomenti interni del ruolo di lavoro Connect sono stati creati automaticamente.
Gli argomenti interni di Kafka Connect devono usare la compattazione. Il team di Hub eventi non è responsabile della correzione di configurazioni non corrette se gli argomenti interni di Kafka Connect non sono configurati correttamente.
Creare i connettori
Questa sezione illustra come configurare i connettori FileStreamSource e FileStreamSink.
Creare una directory per i file di dati di input e output.
mkdir ~/connect-quickstart
Creare due file: uno con i dati di inizializzazione che vengono letti dal connettore FileStreamSource e l'altro in cui il connettore FileStreamSink scrive.
seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Creare un connettore FileStreamSource. Assicurarsi di sostituire i valori nelle parentesi graffe con il percorso della home directory.
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
L'hub
connect-quickstart
eventi dovrebbe essere visualizzato nell'istanza di Hub eventi dopo aver eseguito il comando precedente.Controllare lo stato del connettore di origine.
curl -s http://localhost:8083/connectors/file-source/status
Facoltativamente, è possibile usare Service Bus Explorer per verificare che gli eventi abbiamo raggiunto l'argomento
connect-quickstart
.Creare un connettore FileStreamSink. Anche in questo caso, assicurarsi di sostituire i valori nelle parentesi graffe con il percorso della home directory.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Controllare lo stato del connettore sink.
curl -s http://localhost:8083/connectors/file-sink/status
Verificare che i dati siano stati replicati tra i file e che siano identici in entrambi i file.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Pulizia
Kafka Connessione crea argomenti di Hub eventi per archiviare configurazioni, offset e stato persistenti anche dopo che il cluster Connessione è stato arrestato. A meno che non si desideri questa persistenza, è consigliabile eliminare questi argomenti. È anche possibile eliminare gli connect-quickstart
hub eventi creati durante questa procedura dettagliata.
Passaggi successivi
Per altre informazioni su Hub eventi per Kafka, vedere gli articoli seguenti: