تكامل دعم Apache Kafka Connect في مراكز الأحداث

Apache Kafka Connect هو إطار عمل للاتصال واستيراد/تصدير البيانات من/إلى أي نظام خارجي مثل MySQL وHDFS ونظام الملفات من خلال نظام مجموعة Kafka. يرشدك هذا البرنامج التعليمي إلى استخدام إطار عمل Kafka Connect مع Event Hubs.

يرشدك هذا البرنامج التعليمي خلال دمج Kafka Connect مع مركز أحداث ونشر موصلات FileStreamSource الأساسية وFileStreamSink. على الرغم من أن هذه الموصلات ليست مخصصة للاستخدام الإنتاجي، فإنها توضح سيناريو Kafka الاتصال الشامل حيث تعمل مراكز أحداث Azure كوسيط Kafka.

إشعار

هذا النموذج متاح على GitHub.

في هذا البرنامج التعليمي، نُفذت الخطوات التالية:

  • إنشاء مساحة اسم مراكز الأحداث
  • استنساخ مثال للمشروع
  • تكوين Kafka Connect لمراكز الأحداث
  • تشغيل Kafka Connect
  • أنشئ موصلات

المتطلبات الأساسية

لإكمال هذه الإرشادات التفصيلية، تأكد من توفر المتطلبات الأساسية التالية لديك:

إنشاء مساحة اسم مراكز الأحداث

مطلوب مساحة اسم مراكز الأحداث للإرسال والتلقي من أي خدمة مراكز أحداث. راجع إنشاء مركز أحداث للحصول على إرشادات لإنشاء مساحة اسم ومركز أحداث. الحصول على سلسلة اتصال مراكز الأحداث واسم المجال المؤهل بالكامل (FQDN) لاستخدامها لاحقاً. للحصول على مفتاح الوصول، راجع الحصول على سلسلة اتصال مراكز الأحداث.

استنساخ مثال للمشروع

انسخ مستودع مراكز الأحداث وانتقل إلى البرامج التعليمية / ووصل المجلد الفرعي:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

تكوين Kafka Connect لمراكز الأحداث

إعادة التشكيل الأدنى ضروري عند إعادة توجيه معدل نقل Kafka Connect من Kafka إلى مراكز الأحداث. يوضح النموذج التالي connect-distributed.properties كيفية تكوين الاتصال للمصادقة والاتصال مع نقطة نهاية Kafka على مراكز الأحداث:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

هام

استبدل {YOUR.EVENTHUBS.CONNECTION.STRING} بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

تشغيل Kafka Connect

في هذه الخطوة، يتم بدء عامل Kafka Connect محليا في وضع الموزع، باستخدام "مراكز الأحداث" للحفاظ على حالة المجموعة.

  1. حفظ الملف أعلاه connect-distributed.properties محلياً. تأكد من استبدال كافة القيم في الأقواس.
  2. انتقل إلى موقع إصدار Kafka على جهازك.
  3. شغّل ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. يكون Connect worker REST API جاهزاً للتفاعل عندما ترى 'INFO Finished starting connectors and tasks'.

إشعار

يستخدم Kafka Connect واجهة برمجة تطبيقات Kafka AdminClient لإنشاء مواضيع ذات تكوينات موصى بها تلقائيا، بما في ذلك الضغط. يكشف فحص سريع لمساحة الاسم في مدخل Microsoft Azure أنه تم إنشاء الموضوعات الداخلية الاتصال للعامل تلقائياً.

المواضيع الداخلية لـ Kafka Connectيجب استخدام الضغط. فريق "مراكز الأحداث" غير مسؤول عن إصلاح تكوينات غير صحيحة إذا تم تكوين مواضيع الاتصال الداخلية بشكل غير صحيح.

أنشئ موصلات

يرشدك هذا القسم خلال تدوير موصلات FileStreamSource وFileStreamSink.

  1. أنشئ دليلاً لملفات بيانات الإدخال والإخراج.

    mkdir ~/connect-quickstart
    
  2. أنشئ ملفين: ملفاً يحتوي على بيانات أولية يقرأ منها موصل FileStreamSource، والآخر يكتب عليه موصل FileStreamSink.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. أنشئ موصل FileStreamSource. تأكد من استبدال الأقواس المتعرجة بمسار الدليل الرئيسي.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    يجب أن تشاهد مركز connect-quickstart الأحداث على مثيل Event Hubs بعد تشغيل الأمر أعلاه.

  4. تحقق من حالة موصل المصدر.

    curl -s http://localhost:8083/connectors/file-source/status
    

    اختيارياً، يمكنك استخدام Service Bus Explorer للتحقق من وصول الأحداث إلى موضوعconnect-quickstart.

  5. أنشئ رابط FileStreamSink. مرة أخرى، تأكد من استبدال الأقواس المتعرجة بمسار الدليل الرئيسي.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. تحقق من حالة موصل الجهة.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. تحقق من أن البيانات نُسخت بشكل متماثل بين الملفات وأن البيانات متطابقة عبر كلا الملفين.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

التنظيف

ينشئ Kafka الاتصال مواضيع مراكز الأحداث لتخزين التكوينات والإزاحات والحالة التي تستمر حتى بعد إيقاف نظام مجموعة الاتصال. ما لم يكن هذا الثبات مرغوباً، فمن المستحسن حذف هذه الموضوعات. قد تحتاج أيضا إلى حذف connect-quickstart مراكز الأحداث التي تم إنشاؤها أثناء هذه المعاينة.

الخطوات التالية

لمعرفة المزيد حول Event Hubs for Kafka، راجع المقالات التالية: