Bağlan Apache Spark ile Azure Event Hubs
Bu öğretici, gerçek zamanlı akış için Spark Event Hubs bağlamada size yol sağlar. Bu tümleştirme, protokol istemcilerinizi değiştirmek veya kendi Kafka ya da Zookeeper kümelerinizi çalıştırmak zorunda kalmadan akışa olanak sağlar. Bu öğretici, Apache Spark v2.4+ ve Apache Kafka v2.0+ gerektirir.
Bu öğreticide şunların nasıl yapıldığını öğreneceksiniz:
- Event Hubs ad alanı oluşturma
- Örnek projeyi kopyalama
- Spark'ı çalıştırma
- Kafka için Event Hubs'dan okuma
- Kafka için Event Hubs'a yazma
Önkoşullar
Bu öğreticiye başlamadan önce şunlara sahip olduğunuzdan emin olun:
- Azure aboneliği. Aboneliğiniz yoksa ücretsiz bir hesap oluşturun.
- Apache Spark v2.4
- Apache Kafka v2.0
- Git
Not
Spark-Kafka bağdaştırıcısı Spark v2.4'ten itibaren Kafka v2.0'ı destekleyecek şekilde güncelleştirildi. Spark'ın önceki sürümlerinde, bağdaştırıcı Kafka v0.10 ve üstünü destekliyordu ama özel olarak Kafka v0.10 API'lerine dayanıyordu. Kafka için Event Hubs Kafka v0.10'u desteklemediğinden, Spark'ın v2.4'ten önceki sürümlerinden Spark-Kafka bağdaştırıcıları Kafka için Event Hubs Ekosistemlerinde desteklenmez.
Event Hubs ad alanı oluşturma
Herhangi bir Event Hubs hizmetinden göndermek ve almak için Event Hubs ad alanı gereklidir. Ad alanı ve olay hub'ı oluşturma yönergeleri için bkz. Olay hub'ı oluşturma. Daha sonra kullanmak üzere Event Hubs bağlantı dizesini ve tam etki alanı adını (FQDN) alın. Yönergeler için bkz. Event Hubs bağlantı dizesi alma.
Örnek projeyi kopyalama
Azure Event Hubs deposunu kopyalayın ve tutorials/spark alt klasörüne gidin:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Kafka için Event Hubs'dan okuma
Birkaç yapılandırma değişikliğiyle, Kafka için Event Hubs'dan okumaya başlayabilirsiniz. BOOTSTRAP_SERVERS ve EH_SASL öğelerini ad alanınızdan gelen ayrıntılarla güncelleştirin. Bundan sonra aynı Kafka'yla yaptığınız gibi Event Hubs ile akışı başlatabilirsiniz. Örnek kodun tamamı için GitHub'da sparkConsumer.scala dosyasına bakın.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Kafka için Event Hubs'a yazma
Kafka'ya Event Hubs aynı şekilde bu Event Hubs da yazabilir. Yapılandırmanızı güncelleştirip BOOTSTRAP_SERVERS ve EH_SASL öğelerini Event Hubs ad alanınızdan gelen bilgilerle değiştirmeyi unutmayın. Örnek kodun tamamı için GitHub'da sparkProducer.scala dosyasına bakın.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Sonraki adımlar
Kafka'ya Event Hubs ve Event Hubs daha fazla bilgi edinmek için aşağıdaki makalelere bakın: