استخدام Java لإرسال الأحداث إلى أو استقبالها من مراكز أحداث Azure

توضح هذه البداية السريعة كيفية إرسال الأحداث إلى مركز الأحداث وتلقيها منه باستخدام مكتبة حزمة azure-messaging-eventhubs Java.

تلميح

إذا كنت تعمل مع موارد Azure Event Hubs في تطبيق Spring، نوصيك بمراعاة Spring Cloud Azure كبديل. Spring Cloud Azure هو مشروع مفتوح المصدر يوفر تكامل Spring سلس مع خدمات Azure. لمعرفة المزيد حول Spring Cloud Azure، والاطلاع على مثال باستخدام مراكز الأحداث، راجع Spring Cloud Stream مع Azure Event Hubs.

المتطلبات الأساسية

إذا كنت مستخدماً جديداً لـ Azure Event Hubs، فراجع نظرة عامة على Event Hubs قبل إجراء هذا التشغيل السريع.

تحتاج إلى المتطلبات الأساسية التالية لإكمال هذا التشغيل السريع:

  • الاشتراك في Microsoft Azure. تحتاج إلى اشتراك لاستخدام خدمات Azure، بما في ذلك مراكز الأحداث في Azure. إذا لم يكن حساب Azure متوفرًا لديك، يمكنك التسجيل للحصول على نسخة تجريبية مجانية، أو استخدام مزايا الاشتراك في MSDN عند إنشاء حساب.
  • بيئة تطوير Java. يستخدم هذا التشغيل السريع "القطع". مطلوب Java Development Kit (SDK) الإصدار 8 أو أعلى.
  • أنشئ مساحة اسم لـ Event Hubs ومركز الأحداث. تتمثل الخطوة الأولى في استخدام مدخل Azure لإنشاء مساحة اسم من نوع Event Hubs، والحصول على بيانات اعتماد الإدارة التي يحتاجها التطبيق للتواصل مع "مركز الأحداث". لإنشاء مساحة اسم ومركز أحداث، اتبع الإجراء الوارد في هذه المقالة. بعد ذلك، يلزم الحصول على سلسلة الاتصال لمساحة اسم Event Hubs باتباع الإرشادات الواردة في مقالة: الحصول على سلسلة اتصال. ويمكنك استخدام سلسلة الاتصال بعد ذلك في هذا التشغيل السريع.

إرسال الأحداث

يوضح هذا القسم كيفية إنشاء تطبيق Java لإرسال الأحداث إلى "مركز أحداث".

أضف مرجعاً إلى مكتبة Azure Event Hubs

أولا، قم بإنشاء مشروع Maven جديد لتطبيق وحدة تحكم/shell في بيئة تطوير Java المفضلة لديك. قم بتحديث pom.xml الملف كما يلي. مكتبة عميل Java لـ"مراكز الأحداث" متاحة في Maven Central Repository .

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

إشعار

تحديث الإصدار إلى أحدث إصدار منشور في مستودع Maven.

مصادقة التطبيق إلى Azure

يوضح لك هذا التشغيل السريع طريقتين للاتصال بمراكز أحداث Azure: بدون كلمة مرور سلسلة الاتصال. يوضح لك الخيار الأول كيفية استخدام أساس الأمان في معرف Microsoft Entra والتحكم في الوصول المستند إلى الدور (RBAC) للاتصال بمساحة اسم مراكز الأحداث. لا داعي للقلق بشأن وجود سلسلة الاتصال ذات تعليمات برمجية مضمنة في التعليمات البرمجية الخاصة بك أو في ملف تكوين أو في تخزين آمن مثل Azure Key Vault. يوضح لك الخيار الثاني كيفية استخدام سلسلة الاتصال للاتصال بمساحة اسم مراكز الأحداث. إذا كنت جديدا على Azure، فقد تجد خيار سلسلة الاتصال أسهل في المتابعة. نوصي باستخدام الخيار بدون كلمة مرور في التطبيقات وبيئات الإنتاج في العالم الحقيقي. لمزيد من المعلومات، راجع المصادقة والتخويل. يمكنك أيضا قراءة المزيد حول المصادقة بدون كلمة مرور في صفحة النظرة العامة.

تعيين أدوار لمستخدم Microsoft Entra

عند التطوير محليا، تأكد من أن حساب المستخدم الذي يتصل ب Azure Event Hubs لديه الأذونات الصحيحة. ستحتاج إلى دور مالك بيانات مراكز الأحداث من أجل إرسال الرسائل وتلقيها. لتعيين هذا الدور لنفسك، ستحتاج إلى دور المستخدم Access مسؤول istrator أو دور آخر يتضمن Microsoft.Authorization/roleAssignments/write الإجراء. يمكنك تعيين أدوار Azure RBAC لمستخدم باستخدام مدخل Microsoft Azure أو Azure CLI أو Azure PowerShell. تعرف على المزيد حول النطاقات المتوفرة لتعيينات الأدوار في صفحة نظرة عامة على النطاق.

يعين Azure Event Hubs Data Owner المثال التالي الدور إلى حساب المستخدم الخاص بك، والذي يوفر الوصول الكامل إلى موارد مراكز الأحداث Azure. في سيناريو حقيقي، اتبع مبدأ الامتياز الأقل لمنح المستخدمين الحد الأدنى فقط من الأذونات اللازمة لبيئة إنتاج أكثر أمانا.

أدوار Azure مضمنة لمراكز أحداث Azure

بالنسبة لمراكز أحداث Azure، فإن إدارة مساحات الأسماء وجميع الموارد ذات الصلة من خلال مدخل Microsoft Azure وواجهة برمجة تطبيقات إدارة موارد Azure محمية بالفعل باستخدام نموذج Azure RBAC. يوفر Azure الأدوار المضمنة أدناه ل Azure لتخويل الوصول إلى مساحة اسم مراكز الأحداث:

  • مالك بيانات مراكز الأحداث: تمكين الوصول إلى البيانات إلى مساحة اسم مراكز الأحداث وكياناتها (قوائم الانتظار والموضوعات والاشتراكات وعوامل التصفية)
  • مرسل بيانات Azure Event Hubs: استخدم هذا الدور لمنح المرسل حق الوصول إلى مساحة اسم مراكز الأحداث وكياناتها.
  • Azure Event Hubs Data Receiver: استخدم هذا الدور لمنح المتلقي حق الوصول إلى مساحة اسم مراكز الأحداث وكياناتها.

إذا كنت ترغب في إنشاء دور مخصص، فشاهد الحقوق المطلوبة لعمليات مراكز الأحداث.

هام

في معظم الحالات، سيستغرق نشر تعيين الدور في Azure دقيقة أو دقيقتين. في حالات نادرة، قد يستغرق الأمر ما يصل إلى ثماني دقائق. إذا تلقيت أخطاء المصادقة عند تشغيل التعليمات البرمجية لأول مرة، فانتظر بضع لحظات وحاول مرة أخرى.

  1. في مدخل Microsoft Azure، حدد موقع مساحة اسم Event Hubs باستخدام شريط البحث الرئيسي أو التنقل الأيسر.

  2. في صفحة النظرة العامة، حدد Access control (IAM) من القائمة اليسرى.

  3. حدد صفحة التحكم بالوصول (IAM)، وحدد علامة تبويب تعيينات الدور.

  4. حدد + إضافة من القائمة العلوية ثم إضافة تعيين الدور من القائمة المنسدلة الناتجة.

    A screenshot showing how to assign a role.

  5. استخدم مربع البحث لتصفية النتائج إلى الدور المطلوب. في هذا المثال، ابحث Azure Event Hubs Data Owner عن النتيجة المطابقة وحددها. ثم اختر التالي.

  6. ضمن تعيين الوصول إلى، حدد المستخدم أو المجموعة أو كيان الخدمة، ثم اختر + تحديد الأعضاء.

  7. في مربع الحوار، ابحث عن اسم مستخدم Microsoft Entra (عنوان بريدك الإلكتروني user@domain عادة) ثم اختر تحديد في أسفل مربع الحوار.

  8. حدد مراجعة + تعيين للانتقال إلى الصفحة النهائية، ثم مراجعة + تعيين مرة أخرى لإكمال العملية.

كتابة التعليمة البرمجية لإرسال رسائل إلى "مركز الأحداث"

إضافة فئة باسم Sender، وأضف الكود التالي إلى الفئة:

هام

  • قم بالتحديث <NAMESPACE NAME> باسم مساحة اسم Event Hubs.
  • قم بالتحديث <EVENT HUB NAME> باسم مركز الأحداث الخاص بك.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

بناء البرنامج، والتأكد من عدم وجود أخطاء. ستقوم بتشغيل هذا البرنامج بعد تشغيل برنامج الاستقبال.

استقبال الأحداث

تستند الشفرة في هذا البرنامج التعليمي إلى رمز EventProcessorClient sample on GitHub، والذي يمكن فحصه لمشاهدة تطبيق العمل الكامل.

اتبع هذه التوصيات عند استخدام Azure Blob Storage كمخزن نقطة تحقق:

  • استخدم حاوية منفصلة لكل مجموعة مستهلكين. يمكنك استخدام نفس حساب التخزين، ولكن استخدام حاوية واحدة لكل مجموعة.
  • لا تستخدم الحاوية لأي شيء آخر، ولا تستخدم حساب التخزين لأي شيء آخر.
  • يجب أن يكون حساب التخزين في نفس المنطقة التي يوجد فيها التطبيق المنشور. إذا كان التطبيق محليا، فحاول اختيار أقرب منطقة ممكنة.

في صفحة Storage account في مدخل Microsoft Azure، في قسم Blob service ، تأكد من تعطيل الإعدادات التالية.

  • مساحة الاسم الهرمية
  • حذف مبدئي لكائن ثنائي كبير الحجم
  • تعيين الإصدار

إنشاء Azure Storage وحاوية الكائنات الثنائية كبيرة الحجم

يمكنك في هذا التشغيل السريع استخدام Azure Storage (خاصة، Blob Storage) كمخزن نقطة تحقق. نقطة التحقق هي عملية يقوم معالج الأحداث من خلالها بوضع علامة على موضع آخر حدث تمت معالجته بنجاح أو الالتزام به داخل قسم. يتم وضع علامة نقطة اختبار عادة ضمن الدالة التي تعالج الأحداث. لمعرفة المزيد حول نقاط التفتيش، راجع معالج الأحداث.

اتبع هذه الخطوات لإنشاء حساب Azure Storage.

  1. أنشئ حساب Azure Storage
  2. إنشاء حاوية كائن ثنائي كبير الحجم
  3. المصادقة على حاوية الكائن الثنائي كبير الحجم

عند التطوير محليًا، تأكد من أن حساب المستخدم الذي يصل إلى بيانات الكائن الثنائي كبير الحجم لديه الأذونات الصحيحة. ستحتاج إلى Storage Blob Data Contributor لقراءة بيانات الكائن الثنائي كبير الحجم وكتابتها. لتعيين هذا الدور لنفسك، ستحتاج إلى تعيين دور المستخدم Access مسؤول istrator، أو دور آخر يتضمن إجراء Microsoft.Authorization/roleAssignments/write. يمكنك تعيين أدوار Azure RBAC لمستخدم باستخدام مدخل Microsoft Azure أو Azure CLI أو Azure PowerShell. يمكنك معرفة المزيد حول النطاقات المتوفرة لتعيينات الأدوار في صفحة نظرة عامة على النطاق.

في هذا السيناريو، ستقوم بتعيين أذونات لحساب المستخدم الخاص بك، محددة النطاق إلى حساب التخزين، لاتباع مبدأ أقل الامتيازات. تمنح هذه الممارسة المستخدمين الحد الأدنى من الأذونات المطلوبة فقط وتنشئ بيئات تشغيل أكثر أمانًا.

سيقوم المثال التالي بتعيين دور Storage Blob Data Contributor إلى حساب المستخدم الخاص بك، والذي يوفر حق الوصول للقراءة والكتابة إلى بيانات blob في حساب التخزين الخاص بك.

هام

في معظم الحالات، سيستغرق نشر تعيين الدور في Azure دقيقة أو دقيقتين، ولكن في حالات نادرة قد يستغرق الأمر ما يصل إلى ثماني دقائق. إذا تلقيت أخطاء المصادقة عند تشغيل التعليمات البرمجية لأول مرة، فانتظر بضع لحظات وحاول مرة أخرى.

  1. في مدخل Microsoft Azure، حدد موقع حساب التخزين الخاص بك باستخدام شريط البحث الرئيسي أو شريط التنقل الأيسر.

  2. في صفحة نظرة عامة على حساب التخزين، حدد التحكم بالوصول (IAM) من القائمة اليسرى.

  3. حدد صفحة التحكم بالوصول (IAM)، وحدد علامة تبويب تعيينات الدور.

  4. حدد + إضافة من القائمة العلوية ثم إضافة تعيين الدور من القائمة المنسدلة الناتجة.

    A screenshot showing how to assign a storage account role.

  5. استخدم مربع البحث لتصفية النتائج إلى الدور المطلوب. في هذا المثال، ابحث عن مساهم بيانات Storage Blob وحدد النتيجة المطابقة ثم اختر التالي.

  6. ضمن تعيين الوصول إلى، حدد المستخدم أو المجموعة أو كيان الخدمة، ثم اختر + تحديد الأعضاء.

  7. في مربع الحوار، ابحث عن اسم مستخدم Microsoft Entra (عنوان بريدك الإلكتروني user@domain عادة) ثم اختر تحديد في أسفل مربع الحوار.

  8. حدد مراجعة + تعيين للانتقال إلى الصفحة النهائية، ثم مراجعة + تعيين مرة أخرى لإكمال العملية.

إضافة مكتبات مراكز الأحداث إلى مشروع Java

إضافة التبعيات التالية في ملف pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. أضف العبارات التالية import في أعلى ملف Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. قم بإنشاء فئة باسم Receiver، وأضف الكود التالي إلى الفئة. استبدل العناصر النائبة بالقيم الصحيحة.

    هام

    استبدل العناصر النائبة بالقيم الصحيحة.

    • <NAMESPACE NAME> باسم مساحة اسم Event Hub.
    • <EVENT HUB NAME> باسم لوحة وصل الحدث في مساحة الاسم.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. إضافة main الأسلوب التالي إلى الفئة.

    هام

    استبدل العناصر النائبة بالقيم الصحيحة.

    • <STORAGE ACCOUNT NAME> باسم حساب Azure Storage الخاص بك.
    • <CONTAINER NAME> باسم حاوية الكائن الثنائي كبير الحجم في حساب التخزين
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. إضافة أساليب المساعد اثنين (PARTITION_PROCESSOR وERROR_HANDLER) التي تعالج الأحداث والأخطاء إلى Receiver الفئة.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. بناء البرنامج، والتأكد من عدم وجود أخطاء.

تشغيل التطبيقات

  1. استخدم تطبيق المتلقي.

  2. ثم شغل تطبيق المرسل.

  3. في إطار تطبيق المتلقي تأكد من مشاهدة الأحداث التي تم نشرها بواسطة تطبيق المرسل.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. اضغط إدخال في إطار تطبيق المتلقي لإيقاف التطبيق.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

الخطوات التالية

راجع العينات التالية على GitHub: