إرسال الأحداث إلى أو استقبالها من مراكز الأحداث باستخدام Python

يوضح هذا التشغيل السريع كيفية إرسال الأحداث إلى مركز الأحداث وتلقيها منه باستخدام حزمة azure-eventhub Python.

المتطلبات الأساسية

إذا كنت مستخدماً جديداً لـ Azure Event Hubs، فراجع نظرة عامة على Event Hubs قبل إجراء هذا التشغيل السريع.

تحتاج إلى المتطلبات الأساسية التالية لإكمال هذا التشغيل السريع:

  • الاشتراك في Microsoft Azure. تحتاج إلى اشتراك لاستخدام خدمات Azure، بما في ذلك مراكز الأحداث في Azure. إذا لم يكن لديك حساب Azure موجود، فسجل للحصول على نسخة تجريبية مجانية.
  • Python 3.8 أو أحدث، مع تثبيت pip وتحديثه.
  • Visual Studio Code (موصى به) أو أي بيئة تطوير متكاملة أخرى (IDE).
  • أنشئ مساحة اسم لـ Event Hubs ومركز الأحداث. الخطوة الأولى هي استخدام مدخل Microsoft Azure لإنشاء مساحة اسم مراكز الأحداث، والحصول على بيانات اعتماد الإدارة التي يحتاجها تطبيقك للاتصال بمركز الأحداث. لإنشاء مساحة اسم ومركز أحداث، اتبع الإجراء الوارد في هذه المقالة.

تثبيت الحزم لإرسال الأحداث

لتثبيت حزم Python لمراكز الأحداث، افتح موجه أوامر يحتوي على Python في مساره. قم بتغيير الدليل إلى المجلد حيث تريد الاحتفاظ بالعينات الخاصة بك.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

مصادقة التطبيق إلى Azure

يوضح لك هذا التشغيل السريع طريقتين للاتصال بمراكز أحداث Azure: بدون كلمة مرور سلسلة الاتصال. يوضح لك الخيار الأول كيفية استخدام أساس الأمان في معرف Microsoft Entra والتحكم في الوصول المستند إلى الدور (RBAC) للاتصال بمساحة اسم مراكز الأحداث. لا داعي للقلق بشأن وجود سلسلة الاتصال ذات تعليمات برمجية مضمنة في التعليمات البرمجية الخاصة بك أو في ملف تكوين أو في تخزين آمن مثل Azure Key Vault. يوضح لك الخيار الثاني كيفية استخدام سلسلة الاتصال للاتصال بمساحة اسم مراكز الأحداث. إذا كنت جديدا على Azure، فقد تجد خيار سلسلة الاتصال أسهل في المتابعة. نوصي باستخدام الخيار بدون كلمة مرور في التطبيقات وبيئات الإنتاج في العالم الحقيقي. لمزيد من المعلومات، راجع المصادقة والتخويل. يمكنك أيضا قراءة المزيد حول المصادقة بدون كلمة مرور في صفحة النظرة العامة.

تعيين أدوار لمستخدم Microsoft Entra

عند التطوير محليا، تأكد من أن حساب المستخدم الذي يتصل ب Azure Event Hubs لديه الأذونات الصحيحة. ستحتاج إلى دور مالك بيانات مراكز الأحداث من أجل إرسال الرسائل وتلقيها. لتعيين هذا الدور لنفسك، ستحتاج إلى دور المستخدم Access مسؤول istrator أو دور آخر يتضمن Microsoft.Authorization/roleAssignments/write الإجراء. يمكنك تعيين أدوار Azure RBAC لمستخدم باستخدام مدخل Microsoft Azure أو Azure CLI أو Azure PowerShell. تعرف على المزيد حول النطاقات المتوفرة لتعيينات الأدوار في صفحة نظرة عامة على النطاق.

يعين Azure Event Hubs Data Owner المثال التالي الدور إلى حساب المستخدم الخاص بك، والذي يوفر الوصول الكامل إلى موارد مراكز الأحداث Azure. في سيناريو حقيقي، اتبع مبدأ الامتياز الأقل لمنح المستخدمين الحد الأدنى فقط من الأذونات اللازمة لبيئة إنتاج أكثر أمانا.

أدوار Azure مضمنة لمراكز أحداث Azure

بالنسبة لمراكز أحداث Azure، فإن إدارة مساحات الأسماء وجميع الموارد ذات الصلة من خلال مدخل Microsoft Azure وواجهة برمجة تطبيقات إدارة موارد Azure محمية بالفعل باستخدام نموذج Azure RBAC. يوفر Azure الأدوار المضمنة أدناه ل Azure لتخويل الوصول إلى مساحة اسم مراكز الأحداث:

  • مالك بيانات مراكز الأحداث: تمكين الوصول إلى البيانات إلى مساحة اسم مراكز الأحداث وكياناتها (قوائم الانتظار والموضوعات والاشتراكات وعوامل التصفية)
  • مرسل بيانات Azure Event Hubs: استخدم هذا الدور لمنح المرسل حق الوصول إلى مساحة اسم مراكز الأحداث وكياناتها.
  • Azure Event Hubs Data Receiver: استخدم هذا الدور لمنح المتلقي حق الوصول إلى مساحة اسم مراكز الأحداث وكياناتها.

إذا كنت ترغب في إنشاء دور مخصص، فشاهد الحقوق المطلوبة لعمليات مراكز الأحداث.

هام

في معظم الحالات، سيستغرق نشر تعيين الدور في Azure دقيقة أو دقيقتين. في حالات نادرة، قد يستغرق الأمر ما يصل إلى ثماني دقائق. إذا تلقيت أخطاء المصادقة عند تشغيل التعليمات البرمجية لأول مرة، فانتظر بضع لحظات وحاول مرة أخرى.

  1. في مدخل Microsoft Azure، حدد موقع مساحة اسم Event Hubs باستخدام شريط البحث الرئيسي أو التنقل الأيسر.

  2. في صفحة النظرة العامة، حدد Access control (IAM) من القائمة اليسرى.

  3. حدد صفحة التحكم بالوصول (IAM)، وحدد علامة تبويب تعيينات الدور.

  4. حدد + إضافة من القائمة العلوية ثم إضافة تعيين الدور من القائمة المنسدلة الناتجة.

    A screenshot showing how to assign a role.

  5. استخدم مربع البحث لتصفية النتائج إلى الدور المطلوب. في هذا المثال، ابحث Azure Event Hubs Data Owner عن النتيجة المطابقة وحددها. ثم اختر التالي.

  6. ضمن تعيين الوصول إلى، حدد المستخدم أو المجموعة أو كيان الخدمة، ثم اختر + تحديد الأعضاء.

  7. في مربع الحوار، ابحث عن اسم مستخدم Microsoft Entra (عنوان بريدك الإلكتروني user@domain عادة) ثم اختر تحديد في أسفل مربع الحوار.

  8. حدد مراجعة + تعيين للانتقال إلى الصفحة النهائية، ثم مراجعة + تعيين مرة أخرى لإكمال العملية.

إرسال الأحداث

في هذا القسم، قم بإنشاء برنامج نصي Python لإرسال الأحداث إلى مركز الأحداث الذي قمت بإنشائه سابقا.

  1. افتح محرر Python المفضل لديك، مثل Visual Studio Code.

  2. أنشئ برنامجًا نصيًّا باسم send.py. يرسل هذا البرنامج النصي دفعة من الأحداث إلى مركز الحدث الذي قمت بإنشائه سابقًا.

  3. ألصق التعليمات البرمجية التالية في send.py.

    في التعليمات البرمجية، استخدم القيم الحقيقية لاستبدال العناصر النائبة التالية:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    إشعار

    للحصول على أمثلة لخيارات أخرى لإرسال الأحداث إلى Event Hub بشكل غير متزامن باستخدام سلسلة الاتصال، راجع صفحة send_async.py GitHub. الأنماط المعروضة هناك قابلة للتطبيق أيضا على إرسال الأحداث بدون كلمة مرور.

استقبال الأحداث

يستخدم quickstart هذا مخزن Azure Blob كمخزن نقطة تحقق. يتم استخدام مخزن نقطة التحقق لاستمرار نقاط التحقق (أي مواقف القراءة الأخيرة).

اتبع هذه التوصيات عند استخدام Azure Blob Storage كمخزن نقطة تحقق:

  • استخدم حاوية منفصلة لكل مجموعة مستهلكين. يمكنك استخدام نفس حساب التخزين، ولكن استخدام حاوية واحدة لكل مجموعة.
  • لا تستخدم الحاوية لأي شيء آخر، ولا تستخدم حساب التخزين لأي شيء آخر.
  • يجب أن يكون حساب التخزين في نفس المنطقة التي يوجد فيها التطبيق المنشور. إذا كان التطبيق محليا، فحاول اختيار أقرب منطقة ممكنة.

في صفحة Storage account في مدخل Microsoft Azure، في قسم Blob service ، تأكد من تعطيل الإعدادات التالية.

  • مساحة الاسم الهرمية
  • حذف مبدئي لكائن ثنائي كبير الحجم
  • تعيين الإصدار

إنشاء حساب مخزن Azure وحاوية الكائنات الثنائية كبيرة الحجم

قم بإنشاء حساب تخزين Azure و حاوية الكائنات الثنائية كبيرة الحجم فيه عن طريق القيام بالخطوات التالية:

  1. أنشئ حساب Azure Storage
  2. إنشاء حاوية كائن ثنائي كبير الحجم.
  3. المصادقة على حاوية الكائن الثنائي كبير الحجم.

تأكد من تسجيل سلسلة الاتصال واسم الحاوية لاستخدامها لاحقًا في التعليمات البرمجية الخاصة بالاستلام.

عند التطوير محليًا، تأكد من أن حساب المستخدم الذي يصل إلى بيانات الكائن الثنائي كبير الحجم لديه الأذونات الصحيحة. ستحتاج إلى Storage Blob Data Contributor لقراءة بيانات الكائن الثنائي كبير الحجم وكتابتها. لتعيين هذا الدور لنفسك، ستحتاج إلى تعيين دور المستخدم Access مسؤول istrator، أو دور آخر يتضمن إجراء Microsoft.Authorization/roleAssignments/write. يمكنك تعيين أدوار Azure RBAC لمستخدم باستخدام مدخل Microsoft Azure أو Azure CLI أو Azure PowerShell. يمكنك معرفة المزيد حول النطاقات المتوفرة لتعيينات الأدوار في صفحة نظرة عامة على النطاق.

في هذا السيناريو، ستقوم بتعيين أذونات لحساب المستخدم الخاص بك، محددة النطاق إلى حساب التخزين، لاتباع مبدأ أقل الامتيازات. تمنح هذه الممارسة المستخدمين الحد الأدنى من الأذونات المطلوبة فقط وتنشئ بيئات تشغيل أكثر أمانًا.

سيقوم المثال التالي بتعيين دور Storage Blob Data Contributor إلى حساب المستخدم الخاص بك، والذي يوفر حق الوصول للقراءة والكتابة إلى بيانات blob في حساب التخزين الخاص بك.

هام

في معظم الحالات، سيستغرق نشر تعيين الدور في Azure دقيقة أو دقيقتين، ولكن في حالات نادرة قد يستغرق الأمر ما يصل إلى ثماني دقائق. إذا تلقيت أخطاء المصادقة عند تشغيل التعليمات البرمجية لأول مرة، فانتظر بضع لحظات وحاول مرة أخرى.

  1. في مدخل Microsoft Azure، حدد موقع حساب التخزين الخاص بك باستخدام شريط البحث الرئيسي أو شريط التنقل الأيسر.

  2. في صفحة نظرة عامة على حساب التخزين، حدد التحكم بالوصول (IAM) من القائمة اليسرى.

  3. حدد صفحة التحكم بالوصول (IAM)، وحدد علامة تبويب تعيينات الدور.

  4. حدد + إضافة من القائمة العلوية ثم إضافة تعيين الدور من القائمة المنسدلة الناتجة.

    A screenshot showing how to assign a storage account role.

  5. استخدم مربع البحث لتصفية النتائج إلى الدور المطلوب. في هذا المثال، ابحث عن مساهم بيانات Storage Blob وحدد النتيجة المطابقة ثم اختر التالي.

  6. ضمن تعيين الوصول إلى، حدد المستخدم أو المجموعة أو كيان الخدمة، ثم اختر + تحديد الأعضاء.

  7. في مربع الحوار، ابحث عن اسم مستخدم Microsoft Entra (عنوان بريدك الإلكتروني user@domain عادة) ثم اختر تحديد في أسفل مربع الحوار.

  8. حدد مراجعة + تعيين للانتقال إلى الصفحة النهائية، ثم مراجعة + تعيين مرة أخرى لإكمال العملية.

تثبيت الحزم لتلقي الأحداث

بالنسبة للجانب المتلقي، تحتاج إلى تثبيت حزمة واحدة أو أكثر. في هذا التشغيل السريع، يمكنك استخدام تخزين Azure Blob لاستمرار نقاط التحقق حتى لا يقرأ البرنامج الأحداث التي قرأها بالفعل. ينفذ نقاط فحص بيانات التعريف على الرسائل المستلمة على فترات منتظمة في النقطة. يسهّل هذا الأسلوب متابعة تلقي الرسائل لاحقًا من حيث توقفت.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

قم بإنشاء برنامج Python النصي لتلقي الأحداث

في هذا المقطع، يمكنك إنشاء برنامج Python النصي لتلقي الأحداث من مركز الأحداث الخاص بك:

  1. افتح محرر Python المفضل لديك، مثل Visual Studio Code.

  2. قم بإنشاء برنامج نصي باسم recv.py.

  3. ألصق التعليمات البرمجية التالية في recv.py.

    في التعليمات البرمجية، استخدم القيم الحقيقية لاستبدال العناصر النائبة التالية:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    إشعار

    للحصول على أمثلة لخيارات أخرى لتلقي الأحداث من Event Hub بشكل غير متزامن باستخدام سلسلة الاتصال، راجع صفحة recv_with_checkpoint_store_async.py GitHub. الأنماط المعروضة هناك قابلة للتطبيق أيضا على تلقي الأحداث بدون كلمة مرور.

استخدم تطبيق جهاز الاستقبال.

لتشغيل البرنامج النصي، افتح موجه الأوامر الذي يحتوي على Python في المسار الخاص به، ثم قم بتشغيل هذا الأمر:

python recv.py

قم بتشغيل تطبيق المرسل

لتشغيل البرنامج النصي، افتح موجه الأوامر الذي يحتوي على Python في المسار الخاص به، ثم قم بتشغيل هذا الأمر:

python send.py

يجب أن يعرض إطار المتلقي الرسائل التي تم إرسالها إلى مركز الأحداث.

استكشاف الأخطاء وإصلاحها

إذا كنت لا ترى أحداثا في نافذة المتلقي أو كانت التعليمات البرمجية تبلغ عن خطأ، فجرب تلميحات استكشاف الأخطاء وإصلاحها التالية:

  • إذا لم تشاهد نتائج من recy.py، فقم بتشغيل send.py عدة مرات.

  • إذا رأيت أخطاء حول "coroutine" عند استخدام التعليمات البرمجية بدون كلمة مرور (مع بيانات الاعتماد)، فتأكد من أنك تستخدم الاستيراد من azure.identity.aio.

  • إذا رأيت "جلسة عمل العميل غير المقفلة" مع رمز بدون كلمة مرور (مع بيانات الاعتماد)، فتأكد من إغلاق بيانات الاعتماد عند الانتهاء. لمزيد من المعلومات، راجع بيانات الاعتماد غير المتزامنة.

  • إذا رأيت أخطاء التخويل مع recv.py عند الوصول إلى التخزين، فتأكد من اتباع الخطوات الواردة في إنشاء حساب تخزين Azure وحاوية كائن ثنائي كبير الحجم وتعيين دور Storage Blob Data Contributor إلى كيان الخدمة.

  • إذا تلقيت أحداثا بمعرفات أقسام مختلفة، فمن المتوقع الحصول على هذه النتيجة. الأقسام هي آلية تنظيم البيانات التي تتعلق بالتوازي النازل والمطلوبة في التطبيقات المستهلكة. يتعلق عدد الأقسام في مركز الأحداث مباشرةً بعدد القراء المتزامنين الذي تتوقع أن يكونوا موجودين. لمزيد من المعلومات، راجع معرفة المزيد حول الأقسام.

الخطوات التالية

في هذه البداية السريعة، قمت بإرسال الأحداث وتلقيها بشكل غير متزامن. لمعرفة كيفية إرسال الأحداث وتلقيها بشكل متزامن، انتقل إلى صفحة GitHub sync_samples.

لكافة النماذج (متزامن وغير متزامن) على GitHub، انتقل إلى مكتبة عملاء Azure Event Hubs لنماذج Python.