البرنامج التعليمي: استخدام واجهات برمجة التطبيقات للمنتِج والمستهلك من Apache Kafka

تعرف على كيفية استخدام واجهات برمجة التطبيقات للمنتِج والمستهلك من Apache Kafka مع Kafka على HDInsight.

تسمح واجهة برمجة تطبيقات Kafka للمنتِج بإرسال تدفقات البيانات إلى مجموعة Kafka. واجهة برمجة تطبيقات المستهلك من Kafka تسمح للتطبيقات بقراءة تدفقات البيانات من المجموعة.

في هذا البرنامج التعليمي، تتعلم كيفية:

  • المتطلبات الأساسية
  • فهم الرمز
  • إنشاء التطبيق ونشره
  • تشغيل التطبيق على المجموعة

لمزيد من المعلومات حول واجهات برمجة التطبيقات، راجع وثائق Apache على واجهة برمجة تطبيقات المنتِج وواجهة برمجة تطبيقات المستهلك.

المتطلبات الأساسية

فهم الرمز

يوجد التطبيق المثال في https://github.com/Azure-Samples/hdinsight-kafka-java-get-started ، في دليل Producer-Consumer الفرعي. إذا كنت تستخدم مجموعة Kafka مع تمكين حزمة أمان المؤسسات (ESP)، فعليك استخدام إصدار التطبيق الموجود في دليل DomainJoined-Producer-Consumer الفرعي.

يتكون التطبيق بالأساس من أربعة ملفات:

  • pom.xml: يعرّف هذا الملف تبعيات المشروع، وإصدار Java، وأساليب التغليف.
  • Producer.java: يرسل هذا الملف عبارات عشوائية إلى Kafka باستخدام واجهة برمجة تطبيقات المنتِج.
  • Consumer.java: يستخدم هذا الملف واجهة برمجة تطبيقات المستهلك لقراءة البيانات من Kafka، ويرسلها إلى STDOUT.
  • AdminClientWrapper.java: يستخدم هذا الملف واجهة برمجة تطبيقات المسؤول لإنشاء موضوعات Kafka ووصفها وحذفها.
  • Run.java: واجهة سطر الأوامر المُستخدمة لتشغيل التعليمات البرمجية للمنتِج والمستهلك.

Pom.xml

الأمور التالية من المهم فهمها في ملف pom.xml:

  • التبعيات: يعتمد المشروع على واجهات برمجة تطبيقات المنتِج والمستهلك في Kafka، والتي توفرها حزمة kafka-clients. تعرّف تعليمات XML البرمجية التالية هذه التبعية:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    يُعلن إدخال ${kafka.version} في قسم <properties>..</properties> من pom.xml، ويتم تكوينه إلى إصدار Kafka من مجموعة HDInsight.

  • الوظائف الإضافية: توفر وظائف Maven الإضافية إمكانيات متنوعة. في هذا المشروع، تُستخدم الوظائف الإضافية التالية:

    • maven-compiler-plugin: تُستخدم لتعيين إصدار Java الذي يستخدمه المشروع إلى 8. هذا هو إصدار Java الذي يستخدمه الإصدار HDInsight 3.6.
    • maven-shade-plugin: تُستخدم لإنشاء ملف Uber JAR يحتوي على هذا التطبيق كما يحتوي على أية تبعيات. كما تُستخدم لتعيين نقطة الإدخال للتطبيق، بحيث يمكنك مباشرةً تشغيل ملف JAR بدون الحاجة إلى تحديد الفئة الرئيسية.

Producer.java

يتواصل المنتِج مع مضيفي وسيط Kafka (عُقد العمال)، ويرسل البيانات إلى موضوع Kafka. قصاصة التعليمات البرمجية التالية من ملف Producer.java من مستودع GitHub، وتستعرض كيفية إعداد خصائص المنتِج. بالنسبة للمجموعات التي تم تمكين "أمان المؤسسات" فيها، يجب إضافة خاصية إضافية "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.java

يتواصل المستهلك مع مضيفي وسيط Kafka (عُقد العمال)، ويقرأ السجلات في حلقة. تعمل قصاصة التعليمات البرمجية التالية من ملف Consumer.java على تعيين خصائص المستهلك. بالنسبة للمجموعات التي تم تمكين "أمان المؤسسات" فيها، يجب إضافة خاصية إضافية "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

في هذه التعليمة البرمجية، يتم تكوين المستهلك للقراءة من بداية الموضوع (يتم تعيينه auto.offset.reset إلى earliest).

Run.java

يوفر الملف Run.java واجهة لسطر الأوامر تشغّل المنتج التعليمة البرمجية للمنتِج أو المستهلك. يجب عليك توفير معلومات مضيف وسيط Kafka كمعلمة. يمكنك اختياريًا تضمين قيمة معرّف مجموعة، حيث تستخدمها عملية المستهلك. إذا أنشأت مثيلات متعددة للمستهلكين باستخدام معرّف المجموعة نفسه، فسوف تحمّل المثيلات قراءة الرصيد من الموضوع.

إنشاء المثال ونشره

استخدام ملفات JAR المُنشأة مسبقًا

نزّل ملفات JAR من عينة بدء تشغيل Kafka من Azure. إذا تم تمكين حزمة أمان المؤسسة (ESP) في مجموعتك، فاستخدم الملف kafka-producer-consumer-esp.jar. استخدم الأمر أدناه لنسخ ملفات JAR إلى مجموعتك.

scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

إنشاء ملفات JAR من التعليمات البرمجية

إذا أردت تخطي هذه الخطوة، يمكن تنزيل ملفات JAR المُنشأة مسبقًا من دليل Prebuilt-Jars الفرعي. نزّل الملف kafka-producer-consumer.jar. إذا تم تمكين حزمة أمان المؤسسة (ESP) في مجموعتك، فاستخدم الملف kafka-producer-consumer-esp.jar. نفّذ الخطوة 3 لنسخ ملف JAR إلى مجموعة HDInsight.

  1. نزّل الأمثلة واستخرجها من https://github.com/Azure-Samples/hdinsight-kafka-java-get-started .

  2. عيّن الدليل الحالي إلى موقع دليل hdinsight-kafka-java-get-started\Producer-Consumer. إذا كنت تستخدم مجموعة Kafka مع تمكين حزمة أمان المؤسسات (ESP)، فعليك تعيين الموقع إلى دليل DomainJoined-Producer-Consumer الفرعي. شغّل الأمر التالي لإنشاء التطبيق:

    mvn clean package
    

    ينشئ هذا الأمر دليلاً باسم target يحتوي على ملف باسم kafka-producer-consumer-1.0-SNAPSHOT.jar. بالنسبة لمجموعات حزمة أمان المؤسسات، سيكون الملف kafka-producer-consumer-esp-1.0-SNAPSHOT.jar

  3. استبدل sshuser بمستخدم Secure Shell لمجموعتك، واستبدل CLUSTERNAME باسم مجموعتك. أدخِل الأمر التالي لنسخ ملف kafka-producer-consumer-1.0-SNAPSHOT.jar إلى مجموعة HDInsight. أدخِل كلمة السر لمستخدم Secure Shell عند المطالبة.

    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

تشغيل المثال

  1. استبدل sshuser بمستخدم Secure Shell لمجموعتك، واستبدل CLUSTERNAME باسم مجموعتك. افتح اتصال SSH بالمجموعة، عن طريق إدخال الأمر التالي. أدخل كلمة المرور لحساب مستخدم Secure Shell إذا ظهرت لك مطالبة بذلك.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. للحصول على مضيفي وسيط Kafka، استبدل قيم <clustername> و<password> بالأوامر التالية ونفّذها. استخدم نفس حالة الأحرف في <clustername> كما هو موضح في مدخل Azure. استبدل <password> بكلمة مرور تسجيل الدخول إلى المجموعة، ثم نفّذ ما يلي:

    sudo apt -y install jq
    export CLUSTER_NAME='<clustername>'
    export PASSWORD='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    ملاحظة

    يتطلب هذا الأمر الوصول إلى Ambari. إذا كان نظام المجموعة خلف NSG، فقم بتشغيل هذا الأمر من جهاز يمكنه الوصول إلى Ambari.

  3. أنشئ موضوع Kafka، myTest، من خلال إدخال الأمر التالي:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. لتشغيل المنتِج وكتابة البيانات إلى الموضوع، استخدم الأمر التالي:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. بمجرد انتهاء مهمة المنتِج، استخدم الأمر التالي للقراءة من الموضوع:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    تُعرض السجلات المقروءة، بالإضافة إلى عدد السجلات.

  6. استخدم الاختصار Ctrl + C للخروج من المستهلك.

المستهلكون المتعددون

يستخدم مستهلكو Kafka مجموعة مستهلكين عند قراءة السجلات. يؤدي استخدام العديد من المستهلكين للمجموعة نفسها إلى قراءات متوازنة للتحميل من موضوع. يستلم كل مستهلك في المجموعة جزءًا من السجلات.

يقبل تطبيق المستهلك معلمة تُستخدم كمعرّف المجموعة. على سبيل المثال، يبدأ الأمر التالي مستهلكًا باستخدام معرّف مجموعة من myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

استخدم الاختصار Ctrl + C للخروج من المستهلك.

للاطلاع على هذه العملية قيد التنفيذ، استخدم الأمر التالي:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

يستخدم هذا الأمر tmux لتقسيم المحطة الطرفية إلى عمودين. يتم بدء تشغيل المستهلك في كل عمود، بنفس قيمة معرّف المجموعة. بمجرد انتهاء المستهلكين من القراءة، لاحظ أن كلاً منهم يقرأ جزءًا فقط من السجلات. استخدم الاختصار Ctrl + C مرتين للخروج من tmux.

تتم معالجة الاستهلاك من جانب العملاء الموجودين داخل نفس المجموعة من خلال أقسام الموضوع. في عينة التعليمات البرمجية هذه، يحتوي موضوع test المُنشأ سابقًا على ثمانية أقسام. إذا بدأت تشغيل ثمانية مستهلكين، فسوف يقرأ كل مستهلك السجلات من قسم واحد للموضوع.

هام

لا يمكن وجود عدد من مثيلات المستهلكين في مجموعة مستهلكين أكبر من عدد الأقسام. في هذا المثال، يمكن أن تحتوي مجموعة مستهلكين واحدة على ثمانية مستهلكين بحد أقصى؛ لأن هذا هو عدد الأقسام في الموضوع. أو يمكن أن تكون لديك مجموعات مستهلكين متعددة، لا تتضمن كلٌ منها أكثر من ثمانية مستهلكين.

يتم تخزين السجلات المخزنة في Kafka بترتيب استلامها داخل القسم. لتحقيق التسليم المطلوب للسجلات داخل القسم، أنشئ مجموعة مستهلكين يتطابق فيها عدد مثيلات المستهلكين مع عدد الأقسام. لتحقيق التسليم المطلوب للسجلات داخل الموضوع، أنشئ مجموعة مستهلكين تتضمن مثيل مستهلك واحدًا فقط.

المشكلات الشائعة

  1. إنشاء الموضوع يفشل في حال تمكين حزمة أمان المؤسسات في مجموعتك، استخدم ملفات JAR المُنشأة مسبقًا للمنتِج والمستهلك. يمكن بناء ملف JAR لحزمة أمان المؤسسات من التعليمات البرمجية الموجودة في دليل DomainJoined-Producer-Consumer الفرعي. تشمل خصائص المنتِج والمستهلك خاصية CommonClientConfigs.SECURITY_PROTOCOL_CONFIG إضافية للمجموعات التي يتم تمكين حزمة أمان المؤسسات فيها.

  2. فشل في المجموعات التي يتم تمكين حزمة أمان المؤسسات فيها: إذا فشل إنتاج العمليات واستهلاكها، وكنت تستخدم مجموعة تم تمكين حزمة أمان المؤسسات فيها، فتحقق من أن المستخدم kafka موجود في جميع نُهج Ranger. وإذا لم يكن موجودًا، فعليك إضافته إلى جميع نُهج Ranger.

تنظيف الموارد

لتنظيف الموارد المُنشأة خلال هذا البرنامج التعليمي، يمكنك حذف مجموعة الموارد. حذف مجموعة الموارد يحذف أيضاً نظام مجموعة HDInsight المقترن، وأي موارد أخرى مقترنة بمجموعة الموارد.

لإزالة مجموعة الموارد باستخدام مدخل Microsoft Azure:

  1. في المدخل Microsoft Azure، توسيع القائمة على الجانب الأيمن لفتح قائمة الخدمات، ثم اختيار Resource Groups لعرض قائمة مجموعات الموارد.
  2. تحديد موقع مجموعة الموارد لحذفها، ثم التحديد بزر الماوس الأيمن فوق زر المزيد (...) على الجانب الأيمن من القائمة.
  3. تحديد «Delete resource group» من شريط الأدوات.

الخطوات التالية

في هذا المستند، تعرفت على كيفية استخدام واجهة برمجة تطبيقات المنتِج والمستهلك من Kafka على HDInsight. استخدم ما يلي للتعرف على المزيد عن استخدام Kafka: