استخدم Apache Flink مع Azure Event Hubs لـ Apache Kafka
يوضح لك هذا البرنامج التعليمي كيفية توصيل Apache Flink بمركز أحداث دون تغيير عملاء البروتوكول أو تشغيل مجموعاتك الخاصة. لمزيد من المعلومات حول دعم Event Hubs لبروتوكول المستهلك Apache Kafka، راجع Event Hubs الخاص بـ Apache Kafka .
في هذا البرنامج التعليمي، تتعلم كيفية:
- إنشاء مساحة اسم لـ Event Hubs
- استنساخ مثال للمشروع
- تشغيل منتج Flink
- تشغيل مستهلك Flink
ملاحظة
تلك العينة متاحة على GitHub
المتطلبات الأساسية
لإكمال هذا البرنامج التعليمي، تأكد من توفر المتطلبات الأساسية التالية لديك:
- اقرأ مقالة مراكز الأحداث لمقالة Apache Kafka.
- اشتراك Azure. إذا لم يكن لديك حساب، فأنشئ حسابًا مجانيًّا قبل أن تبدأ.
- أداة تطوير Java (JDK) 1.7+
- فيما يتعلق بـ Ubuntu، قم بتشغيل
apt-get install default-jdk
لتثبيت JDK. - تأكد من ضبط متغير بيئة التشغيل JAVA_HOME ليشير إلى المجلد حيث تم تثبيت JDK.
- فيما يتعلق بـ Ubuntu، قم بتشغيل
- قم بتنزيل ثم تثبيت أرشيف Maven ثنائي
- على Ubuntu، يمكنك تشغيل
apt-get install maven
لتثبيت Maven.
- على Ubuntu، يمكنك تشغيل
- Git
- على Ubuntu، يمكنك تشغيل
sudo apt-get install git
لتثبيت Git.
- على Ubuntu، يمكنك تشغيل
إنشاء مساحة اسم مراكز الأحداث
مطلوب مساحة اسم Event Hubs للإرسال أو الاستلام من أي خدمة من خدمات Event Hubs. راجع إنشاء مركز أحداث للحصول على إرشادات لإنشاء مساحة اسم ومركز أحداث. تأكد من نسخ سلسلة اتصال Event Hubs لاستخدامها لاحقاً.
استنساخ مثال للمشروع
الآن بعد أن أصبحت لديك سلسلة اتصال Event Hubs، قم باستنساخ Azure Event Hubs لمستودع Kafka وانتقل إلى المجلد الفرعي flink
:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink
تشغيل منتج Flink
باستخدام مثال منتج Flink المتوفر، أرسل رسائل إلى خدمة Event Hubs.
قم بتوفير نقطة نهاية Event Hubs Kafka
producer.config
قم بتحديث قيمتَي bootstrap.servers
وsasl.jaas.config
في producer/src/main/resources/producer.config
لتوجيه المنتج إلى نقطة نهاية Event Hubs Kafka باستخدام المصادقة الصحيحة.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
هام
استبدل {YOUR.EVENTHUBS.CONNECTION.STRING}
بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
قم بتشغيل المنتج من سطر الأوامر
لتشغيل المنتج من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"
سيبدأ المنتج الآن في إرسال الأحداث إلى مركز الحدث في الموضوع test
وطباعة الأحداث إلى stdout.
تشغيل مستهلك Flink
باستخدام مثال المستهلك المقدم، استقبل الرسائل من مركز الحدث.
قم بتوفير نقطة نهاية Event Hubs Kafka
consumer.config
حدِّث قيمتَي bootstrap.servers
وsasl.jaas.config
في consumer/src/main/resources/consumer.config
لتوجيه المستهلك إلى نقطة نهاية Event Hubs Kafka باستخدام المصادقة الصحيحة.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
هام
استبدل {YOUR.EVENTHUBS.CONNECTION.STRING}
بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
قم بتشغيل المستهلك من سطر الأوامر
لتشغيل المستهلك من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"
إذا كان مركز الأحداث يحتوي على أحداث (على سبيل المثال، إذا كان منتجك يعمل أيضاً)، فسيبدأ المستهلك الآن في تلقي الأحداث من الموضوع test
.
تحقق من دليل موصل Kafka Flink للحصول على مزيد من المعلومات التفصيلية حول ربط Flink إلى Kafka.
الخطوات التالية
لمعرفة المزيد حول Event Hubs لـ Kafka، راجع المقالات التالية: