مراكز أحداث Azure

Azure Event Hubs هي خدمة استيعاب بيانات تتبع الاستخدام واسعة النطاق تجمع ملايين الأحداث وتحولها وتخزنها. كمنصة تدفق موزعة، فإنه يمنحك زمن انتقال منخفض واستبقاء وقت قابل للتكوين، ما يمكنك من إدخال كميات هائلة من بيانات تتبع الاستخدام في السحابة وقراءة البيانات من تطبيقات متعددة باستخدام دلالات النشر والاشتراك.

توضح هذه المقالة كيفية استخدام Structured Streaming مع مراكز الأحداث ومجموعات Azure Databricks.

إشعار

توفر Azure Event Hubs نقطة نهاية متوافقة مع Apache Kafka التي يمكنك استخدامها مع موصل Structured Streaming Kafka، المتوفر في Databricks Runtime، لمعالجة الرسائل من Azure Event Hubs. توصي Databricks باستخدام موصل Structured Streaming Kafka لمعالجة الرسائل من Azure Event Hubs.

المتطلبات

للحصول على دعم الإصدار الحالي، راجع "أحدث الإصدارات" في Azure Event Hubs Spark الاتصال أو ملف readme للمشروع.

  1. إنشاء مكتبة في مساحة عمل Azure Databricks باستخدام إحداثيات com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17Maven .

    إشعار

    يتم تحديث هذا الموصل بانتظام، وقد يتوفر إصدار أحدث: نوصي بسحب أحدث موصل من مستودع Maven

  2. تثبيت المكتبة التي تم إنشاؤها في نظام المجموعة.

مخطط

مخطط السجلات هو:

Column النوع
body binary
partition سلسلة
offset سلسلة
sequenceNumber طويل
enqueuedTime الطابع الزمني
publisher سلسلة
partitionKey سلسلة
properties map[string,json]

body يتم توفير دائما كصفيف بايت. استخدم cast("string") لإلغاء تسلسل body العمود بشكل صريح.

بداية سريعة

لنبدأ بمثال سريع: WordCount. دفتر الملاحظات التالي هو كل ما يلزم لتشغيل WordCount باستخدام Structured Streaming مع Azure Event Hubs.

Azure Event Hubs WordCount مع دفتر ملاحظات Structured Streaming

الحصول على دفتر الملاحظات

التكوين

يناقش هذا القسم إعدادات التكوين التي تحتاجها للعمل مع مراكز الأحداث.

للحصول على إرشادات مفصلة حول تكوين Structured Streaming باستخدام Azure Event Hubs، راجع دليل تكامل Structured Streaming وAzure Event Hubs الذي طورته Microsoft.

للحصول على إرشادات مفصلة حول استخدام Structured Streaming، راجع البث على Azure Databricks.

سلسلة الاتصال

مطلوب سلسلة الاتصال مراكز الأحداث للاتصال بخدمة مراكز الأحداث. يمكنك الحصول على سلسلة الاتصال لمثيل مراكز الأحداث من مدخل Microsoft Azure أو باستخدام ConnectionStringBuilder في المكتبة.

مدخل Azure

عند الحصول على سلسلة الاتصال من مدخل Microsoft Azure، قد يكون لديه أو لا يحتوي على EntityPath المفتاح. اعتبر:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

للاتصال ب EventHubs الخاص بك، EntityPath يجب أن يكون موجودا. إذا لم يكن سلسلة الاتصال لديك واحدا، فلا تقلق. هذا سوف يعتني به:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

الاتصال ionStringBuilder

بدلا من ذلك، يمكنك استخدام ConnectionStringBuilder لجعل سلسلة الاتصال الخاص بك.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

تحدث جميع التكوينات المتعلقة بمراكز الأحداث في .EventHubsConf لإنشاء EventHubsConf، يجب تمرير سلسلة الاتصال:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

راجع سلسلة الاتصال ion للحصول على مزيد من المعلومات حول الحصول على سلسلة الاتصال صالح.

للحصول على قائمة كاملة بالتكوينات، راجع EventHubsConf. فيما يلي مجموعة فرعية من التكوينات لمساعدتك على البدء:

خيار القيمة‬ Default نوع الاستعلام ‏‏الوصف
consumerGroup السلسلة‬ "$Default" الدفق والدفعة مجموعة المستهلكين هي طريقة عرض لمركز أحداث بأكمله. وتمكن مجموعات المستهلكين التطبيقات المستهلكة المتعددة ليحصل كل منها على عرض منفصل لدفق الحدث وقراءة الدفق بشكل مستقل بسرعتها وبإزاحتها الخاصة. تتوفر المزيد من المعلومات في وثائق Microsoft.
startingPosition موضع الحدث بداية الدفق الدفق والدفعة موضع البداية لمهمة Structured Streaming. راجع بدء تشغيل الموضع للحصول على معلومات حول الترتيب الذي تتم به قراءة الخيارات.
maxEventsPerTrigger طويل حساب عدد الأقسام

* 1000
تدفق الاستعلام حد المعدل على الحد الأقصى لعدد الأحداث التي تمت معالجتها لكل فاصل زمني للمشغل. سيتم تقسيم العدد الإجمالي المحدد للأحداث بشكل متناسب عبر أقسام وحدة تخزين مختلفة.

لكل خيار، يوجد إعداد مطابق في EventHubsConf. على سبيل المثال:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

موضع الحدث

EventHubsConf يسمح للمستخدمين بتحديد مواضع البدء (والنهاية) مع EventPosition الفئة . EventPosition يحدد موضع حدث في قسم Event Hub. يمكن أن يكون الموضع وقتا مدرجا في قائمة الانتظار أو إزاحة أو رقم تسلسل أو بداية الدفق أو نهاية الدفق.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

إذا كنت ترغب في البدء (أو الانتهاء) في موضع معين، فما عليك سوى إنشاء الصحيح EventPosition وتعيينه في EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

تدفق منظم للإنتاج باستخدام مراكز أحداث Azure

عند تشغيل استعلامات الدفق في الإنتاج، من المحتمل أنك تريد ضمانات أكثر قوة ووقت تشغيل مما كنت تريد عندما تقوم ببساطة بإرفاق دفتر ملاحظات بمجموعة وتشغيل استعلامات الدفق بشكل تفاعلي. قم باستيراد وتشغيل دفتر الملاحظات التالي لعرض توضيحي لكيفية تكوين وتشغيل Structured Streaming في الإنتاج باستخدام Azure Event Hubs وAzure Databricks.

لمزيد من المعلومات، راجع اعتبارات الإنتاج للبث المنظم.

Production Structured Streaming مع دفتر ملاحظات Azure Event Hubs

الحصول على دفتر الملاحظات