قم بتوصيل تطبيق Apache Spark الخاص بك مع Azure Event Hubs

يرشدك هذا البرنامج التعليمي خلال توصيل تطبيق Spark الخاص بك إلى Event Hubs للبث في الوقت الفعلي. يتيح هذا التكامل البث دون الحاجة إلى تغيير عملاء البروتوكول أو تشغيل مجموعات Kafka أو Zookeeper الخاصة بك. يتطلب هذا البرنامج التعليمي Apache Spark v2.4 + و Apache Kafka v2.0 +.

ملاحظة

تلك العينة متاحة على GitHub

في هذا البرنامج التعليمي، تتعلم كيفية:

  • إنشاء مساحة اسم لـ Event Hubs
  • استنساخ مثال للمشروع
  • تشغيل Spark
  • اقرأ من Event Hubs لـ Kafka
  • اكتب إلى Event Hubs لـ Kafka

المتطلبات الأساسية

قبل أن تبدأ هذا البرنامج التعليمي، تأكد من أن لديك:

ملاحظة

تم تحديث مهايئ Spark-Kafka لدعم Kafka v2.0 اعتبارًا من Spark v2.4. في الإصدارات السابقة من Spark، كان المهايئ يدعم Kafka v0.10 وما بعده لكنه اعتمد بشكل خاص على واجهات برمجة تطبيقات Kafka v0.10. نظرًا لأن Event Hubs for Kafka لا يدعم Kafka v0.10، فإن مهايئ Spark-Kafka من إصدارات Spark السابقة للإصدار v2.4 غير مدعومة من قبل Event Hubs لأنظمة Kafka البيئية.

إنشاء مساحة اسم مراكز الأحداث

مطلوب مساحة اسم Event Hubs للإرسال والاستلام من أي خدمة Event Hubs. راجع إنشاء مركز أحداث للحصول على إرشادات حول إنشاء مساحة اسم ومركز أحداث. احصل على سلسلة اتصال Event Hubs واسم المجال المؤهل بالكامل (FQDN) للاستخدام لاحقاً. للحصول على مفتاح الوصول، راجع الحصول على سلسلة اتصال مراكز الأحداث.

استنساخ مثال للمشروع

انسخ مستودع Azure Event Hubs وانتقل إلى المجلد الفرعي tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

اقرأ من Event Hubs لـ Kafka

مع بعض التغييرات في التكوين، يمكنك البدء في القراءة من Event Hubs for Kafka. قم بتحديث BOOTSTRAP_SERVERS و EH_SASL بتفاصيل من مساحة الاسم الخاصة بك ويمكنك بدء البث باستخدام Event Hubs كما تفعل مع Kafka. للحصول على نموذج التعليمات البرمجية الكامل، راجع ملف sparkConsumer.scala على GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

إذا تلقيت خطأ مشابها للخطأ التالي، أضف .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") إلى spark.readStream المكالمة وحاول مرة أخرى.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

اكتب إلى Event Hubs لـ Kafka

يمكنك أيضًا الكتابة إلى Event Hubs بنفس الطريقة التي تكتب بها إلى Kafka. لا تنس تحديث التكوين لتغيير BOOTSTRAP_SERVERS و EH_SASL بمعلومات من مساحة اسم Event Hubs. للحصول على نموذج التعليمات البرمجية الكامل، راجع ملف sparkProducer.scala على GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

الخطوات التالية

لمعرفة المزيد حول مراكز الأحداث ومراكز الأحداث لـ Kafka، راجع المقالات التالية: