البرنامج التعليمي: معالجة Apache Kafka لأحداث مراكز الأحداث باستخدام Stream Analytics
توضح هذه المقالة كيفية تدفق البيانات إلى مراكز الأحداث ومعالجتها باستخدام Azure Stream Analytics. حيث إنه يرشدك من خلال الخطوات التالية:
- إنشاء مساحة اسم "مراكز الأحداث".
- إنشاء عميل Kafka الذي يرسل رسائل إلى مركز الحدث.
- إنشاء مشروع Stream Analytics التي تنسخ البيانات من مركز الأحداث إلى مخزن البيانات الثنائية كبيرة الحجم لـ Azure.
لا تحتاج إلى تغيير عملاء البروتوكول أو تشغيل أنظمة المجموعات الخاصة بك عند استخدام نقطة النهاية Kafka التي كشفها Event Hub. يدعم Azure Event Hubs الإصدار 1.0 من Apache Kafka. والإصدارات الأحدث.
المتطلبات الأساسية
لإكمال هذا التشغيل السريع، تأكد من أن لديك المتطلبات الأساسية التالية:
- اشتراك Azure. إذا لم يكن لديك حساب، فأنشئ حسابًا مجانيًّا قبل أن تبدأ.
- Java Development Kit (JDK) 1.7+.
- قم بتنزيل ثم تثبيت أرشيف Maven ثنائي.
- Git
- حساب Azure Storage. إذا لم يكن لديك حساب فأنشئ حسابًا قبل المضي قُدما. تكون مهمة Stream Analytics في هذه المعاينة تخزين بيانات الإخراج في مخزن البيانات الثنائية كبيرة الحجم لـ Azure.
إنشاء مساحة اسم لـ Event Hubs
عند إنشاء مساحة اسم مراكز الأحداث، يتم تمكين نقطة النهاية Kafka لمساحة الاسم تلقائيًا. يمكنك بث الأحداث من تطبيقاتك التي تستخدم بروتوكول Kafka في مراكز الأحداث. اتبع الإرشادات خطوة بخطوة في إنشاء مركز حدث باستخدام مدخل Microsoft Azure لإنشاء مساحة اسم مراكز أحداث. إذا كنت تستخدم مجموعة مخصصة، فراجع إنشاء مساحة اسم ومركز حدث في نظام مجموعة مخصصة.
ملاحظة
مراكز الأحداث لـ Kafka غير مدعوم في المستوى الأساسي.
إرسال الرسائل عبر Kafka في مراكز الأحداث
استنساخ مستودع مراكز الأحداث Azure لـ Kafka إلى جهازك.
انتقل إلى المجلد:
azure-event-hubs-for-kafka/quickstart/java/producer
.حدِّث تفاصيل التكوين للمنتِج في
src/main/resources/producer.config
. حدد الاسم وأيضًا سلسلة الاتصالمن أجل مساحة الاسم لمركز الحدث.bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
انتقل إلى
azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/
ملف TestDataReporter.java وافتحه في محرر من اختيارك.التعليق خارج السطر التالي من التعليمات البرمجية:
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
أضف السطر التالي من التعليمات البرمجية بدلاً من التعليمات البرمجية المعلقة:
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");
يرسل هذا الرمز بيانات الحدث بتنسيق JSON. عند تكوين الإدخال لوظيفة Stream Analytics، يمكنك تحديد JSON كتنسيق لبيانات الإدخال.
تشغيل المنتج والتدفق إلى "مراكز الأحداث". على جهاز Windows، عند استخدام Node.js command promptقم بالتبديل إلى
azure-event-hubs-for-kafka/quickstart/java/producer
المجلد قبل تشغيل هذه الأوامر.mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
تحقق من أن مركز الحدث يتلقى البيانات
حدد مراكز الأحداث ضمن ENTITIES. تأكد من أنك ترى مركز حدث باسم اختبار.
تأكد من أنك ترى رسائل قادمة إلى مركز الحدث.
معالجة بيانات الحدث باستخدام وظيفة Stream Analytics
في هذا القسم، يمكنك إنشاء وظيفة Azure Stream Analytics. يرسل عميل Kafka الأحداث إلى مركز الحدث. إنشاء وظيفة Stream Analytics والتي تأخذ بيانات الحدث كمدخلات ومخرجات إلى مخزن البيانات الثنائية كبيرة الحجم لـ Azure. إذا لم يكن لديك حساب Azure Storage، فأنشئ حساباً.
يمر الاستعلام في وظيفة Stream Analytics عبر البيانات دون إجراء أي تحليلات. يمكنك إنشاء استعلام يحول بيانات الإدخال لإنتاج بيانات الإخراج بتنسيق مختلف أو من خلال رؤى مكتسبة.
إنشاء مشروع Stream Analytics
- حدد + Create a resource في مدخل Microsoft Azure.
- حدد Analytics في قائمة Microsoft Azure Marketplace وحدد Stream Analytics job.
- في صفحة New Stream Analytics نفذ الإجراءات التالية:
أدخل name للوظيفة.
حدد Subscription الخاص بك.
حدد Create new من أجل resource group وأدخل الاسم. يمكنك أيضا استخدام مجموعة موارد موجودة.
حدد موقعاً للمجموعة.
حدد إنشاء لإنشاء الوظيفة.
تكوين إدخال المهمة
في رسالة الإعلام، حدد Go to resource لرؤية صفحة Stream Analytics job.
حدد Inputs في قسم JOB TOPOLOGY في القائمة اليسرى.
حدد Add stream input وEvent hub.
في صفحة تكوين Event Hub input نفذ الإجراءات التالية:
حدد alias للإدخال.
حدد اشتراك Azure الخاص بك.
حدد مساحة اسم مركز الأحداث التي تم إنشاؤها مسبقا.
حدد اختبارمن أجل مركز الأحداث.
حدد حفظ.
تكوين بيانات الإخراج
- حدد Outputs في قسم JOB TOPOLOGY في القائمة.
- حدد + Add على شريط الأدوات، وحدد تخزين Blob storage
- في صفحة إعدادات إخراج تخزين كائن ثنائي كبير الحجم، قم بالإجراءات التالية:
حدد alias للإخراج.
حدد اشتراك Azure.
حدد Azure Storage accountالخاص بك.
أدخل اسما للحاوية التي تخزن بيانات الإخراج من استعلام Stream Analytics.
حدد حفظ.
حدد الاستعلام
بعد أن يكون لديك إعداد مهمة Stream Analytics لقراءة تدفق بيانات واردة، تكون الخطوة التالية هي إنشاء تحويل يحلل البيانات في الوقت الفعلي. يمكنك تعريف استعلام التحويل باستخدام Stream Analytics Query Language. في هذه المعاينة التفصيلية، يمكنك تحديد استعلام يمر عبر البيانات دون إجراء أي تحويل.
حدد استعلام.
في إطار الاستعلام، استبدل
[YourOutputAlias]
بالاسم المستعار للإخراج الذي أنشأته سابقًا.استبدل
[YourInputAlias]
بالاسم المستعار للإدخال الذي أنشأته سابقا.تحديد Save في شريط الأدوات.
تشغيل وظيفة Stream Analytics
حدد Overview على القائمة اليمنى.
حدد Start.
في صفحة Start job حدد Start.
انتظر حتى تتغير حالة المهمة من Starting إلى running.
اختبار السيناريو
تشغيل Kafka producer مرة أخرى لإرسال الأحداث إلى مركز الحدث.
mvn exec:java -Dexec.mainClass="TestProducer"
تأكد من أن ترى بيانات الإخراج تُنشأ في مخزن البيانات الثنائية كبيرة الحجم لـ Azure. تشاهد ملف JSON في الحاوية مع 100 صف والتي تبدو وكأنها الصفوف العينة التالية:
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
تلقت وظيفة Azure Stream Analytics بيانات إدخال من مركز الأحداث وخزنتها في مخزن البيانات الثنائية كبيرة الحجم لـ Azure في هذا السيناريو.
الخطوات التالية
في هذه المقالة، تعلمت كيف تبث إلى مراكز الأحداث دون تغيير عملاء بروتوكولك أو تشغيل مجموعاتك الخاصة. لمعرفة المزيد حول مراكز الأحداث ل Apache Kafka، راجع دليل مطور Apache Kafka لمراكز أحداث Azure.