البرنامج التعليمي: معالجة Apache Kafka لأحداث مراكز الأحداث باستخدام Stream Analytics

توضح هذه المقالة كيفية تدفق البيانات إلى مراكز الأحداث ومعالجتها باستخدام Azure Stream Analytics. حيث إنه يرشدك من خلال الخطوات التالية:

  1. إنشاء مساحة اسم "مراكز الأحداث".
  2. إنشاء عميل Kafka الذي يرسل رسائل إلى مركز الحدث.
  3. إنشاء مشروع Stream Analytics التي تنسخ البيانات من مركز الأحداث إلى مخزن البيانات الثنائية كبيرة الحجم لـ Azure.

لا تحتاج إلى تغيير عملاء البروتوكول أو تشغيل أنظمة المجموعات الخاصة بك عند استخدام نقطة النهاية Kafka التي كشفها Event Hub. يدعم Azure Event Hubs الإصدار 1.0 من Apache Kafka. والإصدارات الأحدث.

المتطلبات الأساسية

لإكمال هذا التشغيل السريع، تأكد من أن لديك المتطلبات الأساسية التالية:

إنشاء مساحة اسم لـ Event Hubs

عند إنشاء مساحة اسم مراكز الأحداث، يتم تمكين نقطة النهاية Kafka لمساحة الاسم تلقائيًا. يمكنك بث الأحداث من تطبيقاتك التي تستخدم بروتوكول Kafka في مراكز الأحداث. اتبع الإرشادات خطوة بخطوة في إنشاء مركز حدث باستخدام مدخل Microsoft Azure لإنشاء مساحة اسم مراكز أحداث. إذا كنت تستخدم مجموعة مخصصة، فراجع إنشاء مساحة اسم ومركز حدث في نظام مجموعة مخصصة.

ملاحظة

مراكز الأحداث لـ Kafka غير مدعوم في المستوى الأساسي.

إرسال الرسائل عبر Kafka في مراكز الأحداث

  1. استنساخ مستودع مراكز الأحداث Azure لـ Kafka إلى جهازك.

  2. انتقل إلى المجلد: azure-event-hubs-for-kafka/quickstart/java/producer.

  3. حدِّث تفاصيل التكوين للمنتِج في src/main/resources/producer.config. حدد الاسم وأيضًا سلسلة الاتصالمن أجل مساحة الاسم لمركز الحدث.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. انتقل إلى azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/ ملف TestDataReporter.java وافتحه في محرر من اختيارك.

  5. التعليق خارج السطر التالي من التعليمات البرمجية:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. أضف السطر التالي من التعليمات البرمجية بدلاً من التعليمات البرمجية المعلقة:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    يرسل هذا الرمز بيانات الحدث بتنسيق JSON. عند تكوين الإدخال لوظيفة Stream Analytics، يمكنك تحديد JSON كتنسيق لبيانات الإدخال.

  7. تشغيل المنتج والتدفق إلى "مراكز الأحداث". على جهاز Windows، عند استخدام Node.js command promptقم بالتبديل إلى azure-event-hubs-for-kafka/quickstart/java/producer المجلد قبل تشغيل هذه الأوامر.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

تحقق من أن مركز الحدث يتلقى البيانات

  1. حدد مراكز الأحداث ضمن ENTITIES. تأكد من أنك ترى مركز حدث باسم اختبار.

    مركز الحدث - اختبار

  2. تأكد من أنك ترى رسائل قادمة إلى مركز الحدث.

    مركز الأحداث - رسائل

معالجة بيانات الحدث باستخدام وظيفة Stream Analytics

في هذا القسم، يمكنك إنشاء وظيفة Azure Stream Analytics. يرسل عميل Kafka الأحداث إلى مركز الحدث. إنشاء وظيفة Stream Analytics والتي تأخذ بيانات الحدث كمدخلات ومخرجات إلى مخزن البيانات الثنائية كبيرة الحجم لـ Azure. إذا لم يكن لديك حساب Azure Storage، فأنشئ حساباً.

يمر الاستعلام في وظيفة Stream Analytics عبر البيانات دون إجراء أي تحليلات. يمكنك إنشاء استعلام يحول بيانات الإدخال لإنتاج بيانات الإخراج بتنسيق مختلف أو من خلال رؤى مكتسبة.

إنشاء مشروع Stream Analytics

  1. حدد + Create a resource في مدخل Microsoft Azure.
  2. حدد Analytics في قائمة Microsoft Azure Marketplace وحدد Stream Analytics job.
  3. في صفحة New Stream Analytics نفذ الإجراءات التالية:
    1. أدخل name للوظيفة.

    2. حدد Subscription الخاص بك.

    3. حدد Create new من أجل resource group وأدخل الاسم. يمكنك أيضا استخدام مجموعة موارد موجودة.

    4. حدد موقعاً للمجموعة.

    5. حدد إنشاء لإنشاء الوظيفة.

      بدء وظيفة Stream Analytics جديدة

تكوين إدخال المهمة

  1. في رسالة الإعلام، حدد Go to resource لرؤية صفحة Stream Analytics job.

  2. حدد Inputs في قسم JOB TOPOLOGY في القائمة اليسرى.

  3. حدد Add stream input وEvent hub.

    إضافة مركز حدث كمدخل

  4. في صفحة تكوين Event Hub input نفذ الإجراءات التالية:

    1. حدد alias للإدخال.

    2. حدد اشتراك Azure الخاص بك.

    3. حدد مساحة اسم مركز الأحداث التي تم إنشاؤها مسبقا.

    4. حدد اختبارمن أجل مركز الأحداث.

    5. حدد ⁧⁩حفظ⁧⁩.

      تكوين إدخال مركز الأحداث

تكوين بيانات الإخراج

  1. حدد Outputs في قسم JOB TOPOLOGY في القائمة.
  2. حدد + Add على شريط الأدوات، وحدد تخزين Blob storage
  3. في صفحة إعدادات إخراج تخزين كائن ثنائي كبير الحجم، قم بالإجراءات التالية:
    1. حدد alias للإخراج.

    2. حدد اشتراك Azure.

    3. حدد Azure Storage accountالخاص بك.

    4. أدخل اسما للحاوية التي تخزن بيانات الإخراج من استعلام Stream Analytics.

    5. حدد ⁧⁩حفظ⁧⁩.

      تكوين إخراج تخزين كائن ثنائي كبير الحجم

حدد الاستعلام

بعد أن يكون لديك إعداد مهمة Stream Analytics لقراءة تدفق بيانات واردة، تكون الخطوة التالية هي إنشاء تحويل يحلل البيانات في الوقت الفعلي. يمكنك تعريف استعلام التحويل باستخدام Stream Analytics Query Language. في هذه المعاينة التفصيلية، يمكنك تحديد استعلام يمر عبر البيانات دون إجراء أي تحويل.

  1. حدد استعلام.

  2. في إطار الاستعلام، استبدل [YourOutputAlias] بالاسم المستعار للإخراج الذي أنشأته سابقًا.

  3. استبدل [YourInputAlias] بالاسم المستعار للإدخال الذي أنشأته سابقا.

  4. تحديد Save في شريط الأدوات.

    يظهر التقاط الشاشة إطار الاستعلام مع قيم لمتغيرات الإدخال والإخراج.

تشغيل وظيفة Stream Analytics

  1. حدد Overview على القائمة اليمنى.

  2. حدد Start.

    قائمة البدء

  3. في صفحة Start job حدد Start.

    صفحة بدء الوظيفة

  4. انتظر حتى تتغير حالة المهمة من Starting إلى running.

    حالة الوظيفة - قيد التشغيل

اختبار السيناريو

  1. تشغيل Kafka producer مرة أخرى لإرسال الأحداث إلى مركز الحدث.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. تأكد من أن ترى بيانات الإخراج تُنشأ في مخزن البيانات الثنائية كبيرة الحجم لـ Azure. تشاهد ملف JSON في الحاوية مع 100 صف والتي تبدو وكأنها الصفوف العينة التالية:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    تلقت وظيفة Azure Stream Analytics بيانات إدخال من مركز الأحداث وخزنتها في مخزن البيانات الثنائية كبيرة الحجم لـ Azure في هذا السيناريو.

الخطوات التالية

في هذه المقالة، تعلمت كيف تبث إلى مراكز الأحداث دون تغيير عملاء بروتوكولك أو تشغيل مجموعاتك الخاصة. لمعرفة المزيد حول مراكز الأحداث ل Apache Kafka، راجع دليل مطور Apache Kafka لمراكز أحداث Azure.