استخدم Apache Flink مع Azure Event Hubs لـ Apache Kafka

يوضح لك هذا البرنامج التعليمي كيفية توصيل Apache Flink بمركز أحداث دون تغيير عملاء البروتوكول أو تشغيل مجموعاتك الخاصة. لمزيد من المعلومات حول دعم Event Hubs لبروتوكول المستهلك Apache Kafka، راجع Event Hubs الخاص بـ Apache Kafka .

في هذا البرنامج التعليمي، تتعلم كيفية:

  • إنشاء مساحة اسم لـ Event Hubs
  • استنساخ مثال للمشروع
  • تشغيل منتج Flink
  • تشغيل مستهلك Flink

ملاحظة

تلك العينة متاحة على GitHub

المتطلبات الأساسية

لإكمال هذا البرنامج التعليمي، تأكد من توفر المتطلبات الأساسية التالية لديك:

إنشاء مساحة اسم مراكز الأحداث

مطلوب مساحة اسم Event Hubs للإرسال أو الاستلام من أي خدمة من خدمات Event Hubs. راجع إنشاء مركز أحداث للحصول على إرشادات لإنشاء مساحة اسم ومركز أحداث. تأكد من نسخ سلسلة اتصال Event Hubs لاستخدامها لاحقاً.

استنساخ مثال للمشروع

الآن بعد أن أصبحت لديك سلسلة اتصال Event Hubs، قم باستنساخ Azure Event Hubs لمستودع Kafka وانتقل إلى المجلد الفرعي flink:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink

باستخدام مثال منتج Flink المتوفر، أرسل رسائل إلى خدمة Event Hubs.

قم بتوفير نقطة نهاية Event Hubs Kafka

producer.config

قم بتحديث قيمتَي bootstrap.servers وsasl.jaas.config في producer/src/main/resources/producer.config لتوجيه المنتج إلى نقطة نهاية Event Hubs Kafka باستخدام المصادقة الصحيحة.

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

هام

استبدل {YOUR.EVENTHUBS.CONNECTION.STRING} بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

قم بتشغيل المنتج من سطر الأوامر

لتشغيل المنتج من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"

سيبدأ المنتج الآن في إرسال الأحداث إلى مركز الحدث في الموضوع testوطباعة الأحداث إلى stdout.

باستخدام مثال المستهلك المقدم، استقبل الرسائل من مركز الحدث.

قم بتوفير نقطة نهاية Event Hubs Kafka

consumer.config

حدِّث قيمتَي bootstrap.servers وsasl.jaas.config في consumer/src/main/resources/consumer.config لتوجيه المستهلك إلى نقطة نهاية Event Hubs Kafka باستخدام المصادقة الصحيحة.

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

هام

استبدل {YOUR.EVENTHUBS.CONNECTION.STRING} بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

قم بتشغيل المستهلك من سطر الأوامر

لتشغيل المستهلك من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"

إذا كان مركز الأحداث يحتوي على أحداث (على سبيل المثال، إذا كان منتجك يعمل أيضاً)، فسيبدأ المستهلك الآن في تلقي الأحداث من الموضوع test.

تحقق من دليل موصل Kafka Flink للحصول على مزيد من المعلومات التفصيلية حول ربط Flink إلى Kafka.

الخطوات التالية

لمعرفة المزيد حول Event Hubs لـ Kafka، راجع المقالات التالية: