Verbinding maken uw Apache Spark toepassing met Azure Event Hubs

In deze zelfstudie wordt u door het verbinden van uw Spark-toepassing Event Hubs voor realtime streaming. Deze integratie maakt streaming mogelijk zonder dat u uw protocol-clients moet wijzigen of uw eigen Kafka- of Zookeeper-clusters moet uitvoeren. Voor deze zelfstudie Apache Spark v2.4+ en Apache Kafka v2.0+ vereist.

Notitie

Dit voorbeeld is beschikbaar op GitHub

In deze zelfstudie leert u het volgende:

  • Een Event Hubs-naamruimte maken
  • Het voorbeeldproject klonen
  • Spark uitvoeren
  • Lezen van Event Hubs voor Kafka
  • Schrijven naar Event Hubs voor Kafka

Vereisten

Zorg ervoor dat u het volgende hebt voordat u aan deze zelfstudie begint:

Notitie

De Spark-Kafka-adapter is vanaf Spark v2.4 bijgewerkt ter ondersteuning van Kafka v2.0. In vorige release van Spark werden Kafka v0.10 en nieuwere versies door de adapter ondersteund, maar werd specifiek vertrouwd op Kafka v0.10-API's. Omdat Event Hubs voor Kafka geen ondersteuning biedt voor Kafka v0.10, worden de Spark-Kafka-adapters van Spark-versies ouder dan v2.4 niet ondersteund door Event Hubs voor Kafka Ecosystems.

Een Event Hubs-naamruimte maken

Er is een Event Hubs-naamruimte vereist om gegevens te verzenden naar en te ontvangen van Event Hubs-services. Zie Een Event Hub maken voor instructies voor het maken van een naamruimte en een Event Hub. Haal de Event Hubs-verbindingsreeks en de Fully Qualified Domain Name (FQDN) op voor later gebruik. Zie Get an Event Hubs connection string. (Een Event Hubs-verbindingsreeks ophalen).

Het voorbeeldproject klonen

Kloon de Azure Event Hubs-opslagplaats en ga naar submap tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Lezen van Event Hubs voor Kafka

Na enkele configuratiewijzigingen kunt u lezen van Event Hubs voor Kafka. Werk BOOTSTRAP_SERVERS en EH_SASL bij met details van uw naamruimte. Vervolgens kunt u met Event Hubs met streamen beginnen, net zoals u dat met Kafka zou doen. Zie het bestand sparkConsumer.scala op GitHub voor de volledige voorbeeldcode.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Schrijven naar Event Hubs voor Kafka

U kunt op dezelfde manier naar Event Hubs schrijven als u dat naar Kafka zou doen. Vergeet niet uw configuratie bij te werken door BOOTSTRAP_SERVERS en EH_SASL te wijzigen met gegevens uit uw Event Hubs-naamruimte. Zie het bestand sparkProducer.scala op GitHub voor de volledige voorbeeldcode.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Volgende stappen

Zie de volgende artikelen Event Hubs meer Event Hubs en Event Hubs voor Kafka: