استخدام Akka Streams مع Event Hubs الخاصة بـ Apache Kafka

يوضح لك هذا البرنامج التعليمي كيفية توصيل Akka Streams من خلال دعم Event Hubs لـ Apache Kafka دون تغيير عملاء البروتوكول أو تشغيل مجموعاتك الخاصة.

في هذا البرنامج التعليمي، تتعلم كيفية:

  • إنشاء مساحة اسم لـ Event Hubs
  • استنساخ مثال للمشروع
  • تشغيل منتج Akka Streams
  • تشغيل مستهلك Akka Streams

ملاحظة

تلك العينة متاحة على GitHub

المتطلبات الأساسية

لإكمال هذا البرنامج التعليمي، تأكد من توفر المتطلبات الأساسية التالية لديك:

إنشاء مساحة اسم مراكز الأحداث

مطلوب مساحة اسم Event Hubs للإرسال أو الاستلام من أي خدمة من خدمات Event Hubs. راجع إنشاء مركز أحداث للحصول على معلومات مفصلة. تأكد من نسخ سلسلة اتصال Event Hubs لاستخدامها لاحقاً.

استنساخ مثال للمشروع

الآن بعد أن أصبحت لديك سلسلة اتصال Event Hubs، قم باستنساخ Azure Event Hubs لمستودع Kafka وانتقل إلى المجلد الفرعي akka:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java

تشغيل منتج Akka Streams

باستخدام مثال منتج Akka Streams المقدم، أرسل رسائل إلى خدمة Event Hubs.

قم بتوفير نقطة نهاية Event Hubs Kafka

تطبيق المنتِج .conf

قم بتحديث قيمتَي bootstrap.servers وsasl.jaas.config في producer/src/main/resources/application.conf لتوجيه المنتج إلى نقطة نهاية Event Hubs Kafka باستخدام المصادقة الصحيحة.

akka.kafka.producer {
    #Akka Kafka producer properties can be defined here


    # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
    # can be defined in this configuration section.
    kafka-clients {
        bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
        sasl.mechanism=PLAIN
        security.protocol=SASL_SSL
        sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

هام

استبدل {YOUR.EVENTHUBS.CONNECTION.STRING} بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

قم بتشغيل المنتج من سطر الأوامر

لتشغيل المنتج من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"

يبدأ المنتج في إرسال الأحداث إلى مركز الحدث في الموضوع test، ويطبع الأحداث إلى stdout.

تشغيل مستهلك Akka Streams

باستخدام مثال المستهلك المقدم، استقبل الرسائل من مركز الحدث.

قم بتوفير نقطة نهاية Event Hubs Kafka

تطبيق المستهلك

حدِّث قيمتَي bootstrap.servers وsasl.jaas.config في consumer/src/main/resources/application.conf لتوجيه المستهلك إلى نقطة نهاية Event Hubs Kafka باستخدام المصادقة الصحيحة.

akka.kafka.consumer {
    #Akka Kafka consumer properties defined here
    wakeup-timeout=60s

    # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
    # defined in this configuration section.
    kafka-clients {
       request.timeout.ms=60000
       group.id=akka-example-consumer

       bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
       sasl.mechanism=PLAIN
       security.protocol=SASL_SSL
       sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

هام

استبدل {YOUR.EVENTHUBS.CONNECTION.STRING} بسلسلة الاتصال لمساحة اسم مراكز أحداث. لإرشادات حول الحصول على سلسلة الاتصال، راجع الحصول على سلسلة اتصال مراكز أحداث. هنا مثال على التكوين: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

قم بتشغيل المستهلك من سطر الأوامر

لتشغيل المستهلك من سطر الأوامر، قم بإنشاء JAR ثم قم بتشغيله من داخل Maven (أو قم بإنشاء JAR باستخدام Maven، ثم قم بتشغيل Java عن طريق إضافة Kafka JAR (s) الضرورية إلى classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"

إذا كان مركز الأحداث يحتوي على أحداث (على سبيل المثال، إذا كان منتجك يعمل أيضاً)، فسيبدأ المستهلك في تلقي الأحداث من الموضوع test.

راجع Akka Streams Kafka Guide لمزيد من المعلومات التفصيلية حول Akka Streams.

الخطوات التالية

لمعرفة المزيد حول Event Hubs لـ Kafka، راجع المقالات التالية: