Integrare il supporto di Apache Kafka Connect in Hub eventi di Azure

Apache Kafka Connessione è un framework per connettersi ed importare/esportare dati da/a qualsiasi sistema esterno, ad esempio MySQL, HDFS e file system tramite un cluster Kafka. Questa esercitazione illustra in dettaglio come usare il framework di Kafka Connect con Hub eventi.

L'esercitazione mostra come integrare Kafka Connect con un hub eventi di Azure e come distribuire i connettori di base FileStreamSource e FileStreamSink. Anche se questi connettori non sono destinati all'uso in produzione, dimostrano uno scenario kafka end-to-end Connessione in cui Hub eventi di Azure funge da broker Kafka.

Nota

Questo esempio è disponibile in GitHub.

In questa esercitazione vengono completati i passaggi seguenti:

  • Creare uno spazio dei nomi di Hub eventi
  • Clonare il progetto di esempio
  • Configurare Kafka Connect per Hub eventi
  • Eseguire Kafka Connect
  • Creare i connettori

Prerequisiti

Per completare questa procedura dettagliata, verificare di disporre dei prerequisiti seguenti:

Creare uno spazio dei nomi di Hub eventi

Per l'invio e la ricezione da qualsiasi servizio Hub eventi è richiesto uno spazio dei nomi di Hub eventi. Per istruzioni su come creare uno spazio dei nomi e un hub eventi, vedere Creazione di un hub eventi. Ottenere la stringa di connessione di Hub eventi e il nome di dominio completo (FQDN) da usare successivamente. Per istruzioni, vedere Ottenere una stringa di connessione ad Hub eventi.

Clonare il progetto di esempio

Clonare il repository di Hub eventi di Azure e passare alla sottocartella tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Configurare Kafka Connect per Hub eventi

Quando si reindirizza la velocità effettiva di Kafka Connect da Kafka a Hub eventi, è necessaria una riconfigurazione minima. Il codice di esempio connect-distributed.properties seguente illustra come configurare Connect per autenticare e comunicare con l'endpoint Kafka in Hub eventi:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Importante

Sostituire {YOUR.EVENTHUBS.CONNECTION.STRING} con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere la stringa di connessione, vedere Ottenere una stringa di connessione ad Hub eventi. Ecco un esempio di configurazione: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Eseguire Kafka Connect

In questo passaggio un ruolo di lavoro Kafka Connect viene avviato localmente in modalità distribuita, usando Hub eventi per gestire lo stato del cluster.

  1. Salvare il file connect-distributed.properties in locale. Assicurarsi di sostituire tutti i valori racchiusi tra parentesi graffe.
  2. Passare alla posizione della versione di Kafka nel computer.
  3. Eseguire ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Quando viene visualizzato l'avviso 'INFO Finished starting connectors and tasks', l'API REST del ruolo di lavoro Connect è pronta per l'interazione.

Nota

Kafka Connect usa l'API Kafka AdminClient per creare automaticamente argomenti con le configurazioni consigliate, inclusa la compattazione. Un rapido controllo dello spazio dei nomi nel portale di Azure conforma che gli argomenti interni del ruolo di lavoro Connect sono stati creati automaticamente.

Gli argomenti interni di Kafka Connect devono usare la compattazione. Il team di Hub eventi non è responsabile della correzione di configurazioni non corrette se gli argomenti interni di Kafka Connect non sono configurati correttamente.

Creare i connettori

Questa sezione illustra come configurare i connettori FileStreamSource e FileStreamSink.

  1. Creare una directory per i file di dati di input e output.

    mkdir ~/connect-quickstart
    
  2. Creare due file: uno con i dati di inizializzazione che vengono letti dal connettore FileStreamSource e l'altro in cui il connettore FileStreamSink scrive.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Creare un connettore FileStreamSource. Assicurarsi di sostituire i valori nelle parentesi graffe con il percorso della home directory.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    L'hub connect-quickstart eventi dovrebbe essere visualizzato nell'istanza di Hub eventi dopo aver eseguito il comando precedente.

  4. Controllare lo stato del connettore di origine.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Facoltativamente, è possibile usare Service Bus Explorer per verificare che gli eventi abbiamo raggiunto l'argomento connect-quickstart.

  5. Creare un connettore FileStreamSink. Anche in questo caso, assicurarsi di sostituire i valori nelle parentesi graffe con il percorso della home directory.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Controllare lo stato del connettore sink.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Verificare che i dati siano stati replicati tra i file e che siano identici in entrambi i file.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Pulizia

Kafka Connessione crea argomenti di Hub eventi per archiviare configurazioni, offset e stato persistenti anche dopo che il cluster Connessione è stato arrestato. A meno che non si desideri questa persistenza, è consigliabile eliminare questi argomenti. È anche possibile eliminare gli connect-quickstart hub eventi creati durante questa procedura dettagliata.

Passaggi successivi

Per altre informazioni su Hub eventi per Kafka, vedere gli articoli seguenti: