تكامل دعم Apache Kafka Connect في مراكز الأحداث
Apache Kafka Connect هو إطار عمل للاتصال واستيراد/تصدير البيانات من/إلى أي نظام خارجي مثل MySQL وHDFS ونظام الملفات من خلال نظام مجموعة Kafka. يرشدك هذا البرنامج التعليمي إلى استخدام إطار عمل Kafka Connect مع Event Hubs.
يرشدك هذا البرنامج التعليمي خلال دمج Kafka Connect مع مركز أحداث ونشر موصلات FileStreamSource الأساسية وFileStreamSink. على الرغم من أن هذه الموصلات ليست مخصصة للاستخدام الإنتاجي، فإنها توضح سيناريو Kafka الاتصال الشامل حيث تعمل مراكز أحداث Azure كوسيط Kafka.
إشعار
هذا النموذج متاح على GitHub.
في هذا البرنامج التعليمي، نُفذت الخطوات التالية:
- إنشاء مساحة اسم مراكز الأحداث
- استنساخ مثال للمشروع
- تكوين Kafka Connect لمراكز الأحداث
- تشغيل Kafka Connect
- أنشئ موصلات
المتطلبات الأساسية
لإكمال هذه الإرشادات التفصيلية، تأكد من توفر المتطلبات الأساسية التالية لديك:
- اشتراك Azure. في حال لم يكن لديك اشتراك Azure، أنشئ حسابًا مجّانيًّا.
- بوابه
- Linux/MacOS
- أحدث إصدار من Kafka متوفر من kafka.apache.org
- اقرأ مقالة مقدمة مراكز أحداث Apache Kafka
إنشاء مساحة اسم مراكز الأحداث
مطلوب مساحة اسم مراكز الأحداث للإرسال والتلقي من أي خدمة مراكز أحداث. راجع إنشاء مركز أحداث للحصول على إرشادات لإنشاء مساحة اسم ومركز أحداث. الحصول على سلسلة اتصال مراكز الأحداث واسم المجال المؤهل بالكامل (FQDN) لاستخدامها لاحقاً. للحصول على مفتاح الوصول، راجع الحصول على سلسلة اتصال مراكز الأحداث.
استنساخ مثال للمشروع
انسخ مستودع مراكز الأحداث وانتقل إلى البرامج التعليمية / ووصل المجلد الفرعي:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
تكوين Kafka Connect لمراكز الأحداث
إعادة التشكيل الأدنى ضروري عند إعادة توجيه معدل نقل Kafka Connect من Kafka إلى مراكز الأحداث. يوضح النموذج التالي connect-distributed.properties
كيفية تكوين الاتصال للمصادقة والاتصال مع نقطة نهاية Kafka على مراكز الأحداث:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
هام
استبدل {YOUR.EVENTHUBS.CONNECTION.STRING}
بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
تشغيل Kafka Connect
في هذه الخطوة، يتم بدء عامل Kafka Connect محليا في وضع الموزع، باستخدام "مراكز الأحداث" للحفاظ على حالة المجموعة.
- حفظ الملف أعلاه
connect-distributed.properties
محلياً. تأكد من استبدال كافة القيم في الأقواس. - انتقل إلى موقع إصدار Kafka على جهازك.
- شغّل
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. يكون Connect worker REST API جاهزاً للتفاعل عندما ترى'INFO Finished starting connectors and tasks'
.
إشعار
يستخدم Kafka Connect واجهة برمجة تطبيقات Kafka AdminClient لإنشاء مواضيع ذات تكوينات موصى بها تلقائيا، بما في ذلك الضغط. يكشف فحص سريع لمساحة الاسم في مدخل Microsoft Azure أنه تم إنشاء الموضوعات الداخلية الاتصال للعامل تلقائياً.
المواضيع الداخلية لـ Kafka Connectيجب استخدام الضغط. فريق "مراكز الأحداث" غير مسؤول عن إصلاح تكوينات غير صحيحة إذا تم تكوين مواضيع الاتصال الداخلية بشكل غير صحيح.
أنشئ موصلات
يرشدك هذا القسم خلال تدوير موصلات FileStreamSource وFileStreamSink.
أنشئ دليلاً لملفات بيانات الإدخال والإخراج.
mkdir ~/connect-quickstart
أنشئ ملفين: ملفاً يحتوي على بيانات أولية يقرأ منها موصل FileStreamSource، والآخر يكتب عليه موصل FileStreamSink.
seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
أنشئ موصل FileStreamSource. تأكد من استبدال الأقواس المتعرجة بمسار الدليل الرئيسي.
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
يجب أن تشاهد مركز
connect-quickstart
الأحداث على مثيل Event Hubs بعد تشغيل الأمر أعلاه.تحقق من حالة موصل المصدر.
curl -s http://localhost:8083/connectors/file-source/status
اختيارياً، يمكنك استخدام Service Bus Explorer للتحقق من وصول الأحداث إلى موضوع
connect-quickstart
.أنشئ رابط FileStreamSink. مرة أخرى، تأكد من استبدال الأقواس المتعرجة بمسار الدليل الرئيسي.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
تحقق من حالة موصل الجهة.
curl -s http://localhost:8083/connectors/file-sink/status
تحقق من أن البيانات نُسخت بشكل متماثل بين الملفات وأن البيانات متطابقة عبر كلا الملفين.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
التنظيف
ينشئ Kafka الاتصال مواضيع مراكز الأحداث لتخزين التكوينات والإزاحات والحالة التي تستمر حتى بعد إيقاف نظام مجموعة الاتصال. ما لم يكن هذا الثبات مرغوباً، فمن المستحسن حذف هذه الموضوعات. قد تحتاج أيضا إلى حذف connect-quickstart
مراكز الأحداث التي تم إنشاؤها أثناء هذه المعاينة.
الخطوات التالية
لمعرفة المزيد حول Event Hubs for Kafka، راجع المقالات التالية: