تفاعل مع مجموعات Apache Kafka في Azure HDInsight باستخدام وكيل REST

يتيح لك Kafka REST Proxy التفاعل مع نظام مجموعة Kafka عبر واجهة برمجة تطبيقات REST عبر HTTPS. يعني هذا الإجراء أن عملاء Kafka الخاصين بك يمكن أن يكونوا خارج شبكتك الافتراضية. يمكن للعملاء إجراء مكالمات HTTPS بسيطة وآمنة إلى مجموعة Kafka بدلاً من الاعتماد على مكتبات Kafka. توضح هذه المقالة كيفية إنشاء نظام مجموعة Kafka ممكن لوكيل REST. يوفر أيضاً نموذج رمز يوضح كيفية إجراء مكالمات إلى وكيل REST.

مرجع REST API

للعمليات التي تدعمها واجهة برمجة تطبيقات Kafka REST API، راجع مرجع واجهة برمجة تطبيقات وكيل HDInsight Kafka REST .

خلفية

Kafka REST proxy design.

للحصول على المواصفات الكاملة للعمليات التي تدعمها واجهة برمجة التطبيقات، راجع Apache Kafka REST Proxy API .

نقطة نهاية وكيل REST

يؤدي إنشاء مجموعة HDInsight Kafka باستخدام وكيل REST إلى إنشاء نقطة نهاية عامة جديدة لمجموعتك، والتي يمكنك العثور عليها في مجموعة HDInsight الخصائص على مدخل Microsoft Azure.

Security

الوصول إلى وكيل Kafka REST المدار مع مجموعات أمان Microsoft Entra. عند إنشاء نظام مجموعة Kafka، قم بتوفير مجموعة أمان Microsoft Entra مع الوصول إلى نقطة نهاية REST. يجب أن يتم تسجيل عملاء Kafka الذين يحتاجون إلى الوصول إلى وكيل REST في هذه المجموعة بواسطة مالك المجموعة. يمكن لمالك المجموعة التسجيل عبر البوابة أو عبر PowerShell.

بالنسبة لطلبات نقطة نهاية وكيل REST، يجب أن تحصل تطبيقات العميل على رمز OAuth المميز. يستخدم الرمز المميز للتحقق من عضوية مجموعة الأمان. البحث عن نموذج تطبيق العميل يوضح كيفية الحصول على رمز OAuth المميز. يمرر تطبيق العميل رمز OAuth المميز في طلب HTTPS إلى وكيل REST.

إشعار

راجع إدارة الوصول إلى التطبيقات والموارد باستخدام مجموعات Microsoft Entra، لمعرفة المزيد حول مجموعات أمان Microsoft Entra. لمزيد من المعلومات حول كيفية عمل رموز OAuth المميزة، راجع تخويل الوصول إلى تطبيقات الويب Microsoft Entra باستخدام تدفق منح التعليمات البرمجية OAuth 2.0.

وكيل Kafka REST مع مجموعات أمان الشبكة

إذا قمت بإحضار VNet الخاص بك والتحكم في حركة مرور الشبكة باستخدام مجموعات أمان الشبكة، فقم بالسماح بحركة المرور الواردة على المنفذ 9400 بالإضافة إلى المنفذ 443. وهذا يضمن إمكانية الوصول إلى خادم وكيل Kafka REST.

المتطلبات الأساسية

  1. تسجيل تطبيق باستخدام معرف Microsoft Entra. تستخدم تطبيقات العميل التي تكتبها للتفاعل مع وكيل Kafka REST معرف التطبيق والسر للمصادقة على Azure.

  2. إنشاء مجموعة أمان Microsoft Entra. أضف التطبيق الذي قمت بتسجيله باستخدام معرف Microsoft Entra إلى مجموعة الأمان كعضو في المجموعة. سيتم استخدام مجموعة الأمان هذه للتحكم في التطبيقات التي تسمح بالتفاعل مع وكيل REST. لمزيد من المعلومات حول إنشاء مجموعات Microsoft Entra، راجع إنشاء مجموعة أساسية وإضافة أعضاء باستخدام معرف Microsoft Entra.

    تحقق من أن المجموعة من النوع الأمان . Security Group.

    تحقق من أن التطبيق عضو في المجموعة. Check Membership.

إنشاء كتلة Kafka مع تمكين وكيل REST

تستخدم الخطوات مدخل Microsoft Azure. للحصول على مثال باستخدام Azure CLI، راجع إنشاء مجموعة وكيل Apache Kafka REST باستخدام Azure CLI .

  1. أثناء سير عمل إنشاء مجموعة Kafka، في علامة التبويب الأمان + الشبكات ، حدد الخيار تمكين خادم وكيل Kafka REST.

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. انقر فوق تحديد مجموعة الأمان . من قائمة مجموعات الأمان، حدد مجموعة الأمان التي تريد أن يكون لها وصول إلى وكيل REST. يمكنك استخدام مربع البحث للعثور على مجموعة الأمان المناسبة. انقر فوق الزر تحديد في الجزء السفلي.

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. أكمل الخطوات المتبقية لإنشاء المجموعة الخاصة بك كما هو موضح في إنشاء مجموعة Apache Kafka في Azure HDInsight باستخدام مدخل Microsoft Azure .

  4. بمجرد إنشاء الكتلة، انتقل إلى خصائص الكتلة لتسجيل عنوان URL لوكيل Kafka REST.

    view REST proxy URL.

نموذج تطبيق العميل

يمكنك استخدام التعليمات البرمجية ل Python للتفاعل مع وكيل REST على نظام مجموعة Kafka. لاستخدام نموذج التعليمات البرمجية، اتبع الخطوات التالية:

  1. احفظ نموذج التعليمات البرمجية على جهاز مثبت عليه Python.

  2. ثبت تبعيات Python المطلوبة بتنفيذ pip3 install msal.

  3. قم بتعديل قسم الرمز قم بتكوين هذه الخصائص وقم بتحديث الخصائص التالية لبيئتك:

    الخاصية ‏‏الوصف
    معرف المستأجر مستأجر Azure حيث يوجد اشتراكك.
    معرف العميل معرّف التطبيق الذي قمت بتسجيله في مجموعة الأمان.
    سر العميل سر التطبيق الذي قمت بتسجيله في مجموعة الأمان. بتسجيله في مجموعة الأمان.
    Kafkarest_endpoint احصل على هذه القيمة من علامة التبويب الخصائص في نظرة عامة على المجموعة كما هو موضح في قسم النشر . يجب أن يكون بالتنسيق التالي - https://<clustername>-kafkarest.azurehdinsight.net
  4. من سطر الأوامر، قم بتنفيذ ملف python بتنفيذ sudo python3 <filename.py>

يقوم هذا الرمز بالإجراء التالي:

  1. إحضار رمز OAuth المميز من معرف Microsoft Entra.
  2. يوضح كيفية تقديم طلب إلى وكيلKafka REST.

لمزيد من المعلومات حول الحصول على رموز OAuth المميزة في Python، راجع فئة Python AuthenticationContext . قد ترى تأخيرا أثناء topics عدم إنشاء ذلك أو حذفه من خلال وكيل Kafka REST تنعكس هناك. هذا التأخير بسبب تحديث ذاكرة التخزين المؤقت. تم تحسين حقل القيمة لـ Producer API. بات يقبل الآن عناصر JSON وأي شكل متسلسل.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

ابحث أدناه عن نموذج آخر حول كيفية الحصول على رمز مميز من وكيل Azure لـ REST باستخدام أمر curl. لاحظ أننا نحتاج إلى scope=https://hib.azurehdinsight.net/.default المحدد أثناء الحصول على رمز.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

الخطوات التالية