Berinteraksi dengan kluster Apache Kafka di Azure HDInsight menggunakan proksi REST

Kafka REST Proxy memungkinkan Anda berinteraksi dengan kluster Kafka melalui REST API lewat HTTPS. Tindakan ini artinya klien Kafka Anda dapat berada di luar jaringan virtual Anda. Klien dapat melakukan perintah HTTPS yang sederhana dan aman ke kluster Kafka, daripada mengandalkan pustaka Kafka. Artikel ini menunjukkan kepada Anda cara membuat kluster Kafka yang diaktifkan proksi REST. Juga menyediakan kode sampel yang menunjukkan cara melakukan perintah ke proksi REST.

Referensi REST API

Untuk operasi yang didukung oleh Kafka REST API, lihat Referensi HDInsight Kafka REST Proxy API.

Latar belakang

Kafka REST proxy design.

Untuk spesifikasi lengkap operasi yang didukung oleh API, lihat Apache Kafka REST Proxy API.

Titik akhir REST Proxy

Pembuatan kluster HDInsight Kafka dengan proksi REST membuat titik akhir publik baru untuk kluster Anda, yang dapat Anda temukan di Properti kluster HDInsight Anda pada portal Microsoft Azure.

Keamanan

Akses ke proksi Kafka REST yang dikelola dengan grup keamanan Microsoft Entra. Saat membuat kluster Kafka, sediakan grup keamanan Microsoft Entra dengan akses titik akhir REST. Klien Kafka yang membutuhkan akses ke proksi REST harus didaftarkan ke grup ini oleh pemilik grup. Pemilik grup dapat mendaftar melalui Portal atau melalui PowerShell.

Untuk permintaan titik akhir proksi REST, aplikasi klien harus mendapatkan token OAuth. Token menggunakan untuk memverifikasi keanggotaan grup keamanan. Temukan sampel aplikasi Klien menunjukkan cara mendapatkan token OAuth. Aplikasi klien meneruskan token OAuth dalam permintaan HTTPS ke proksi REST.

Catatan

Lihat Mengelola akses aplikasi dan sumber daya menggunakan grup Microsoft Entra, untuk mempelajari selengkapnya tentang grup keamanan Microsoft Entra. Untuk informasi selengkapnya tentang cara kerja token OAuth, lihat Mengotorisasi akses ke aplikasi web Microsoft Entra menggunakan alur pemberian kode OAuth 2.0.

Proksi Kafka REST dengan Grup Keamanan Jaringan

Jika Anda membawa VNet Anda sendiri dan mengontrol lalu lintas jaringan dengan grup keamanan jaringan, perbolehkan lalu lintas masuk pada port 9400 selain ke port 443. Ini memastikan bahwa server proksi Kafka REST dapat dijangkau.

Prasyarat

  1. Daftarkan aplikasi dengan ID Microsoft Entra. Aplikasi klien yang Anda tulis untuk berinteraksi dengan proksi Kafka REST menggunakan ID dan rahasia aplikasi ini untuk mengautentikasi ke Azure.

  2. Buat grup keamanan Microsoft Entra. Tambahkan aplikasi yang telah Anda daftarkan dengan ID Microsoft Entra ke grup keamanan sebagai anggota grup. Grup keamanan ini akan digunakan untuk mengontrol aplikasi mana yang diizinkan untuk berinteraksi dengan proksi REST. Untuk informasi selengkapnya tentang membuat grup Microsoft Entra, lihat Membuat grup dasar dan menambahkan anggota menggunakan ID Microsoft Entra.

    Validasikan apakah grup merupakan jenis Keamanan. Security Group.

    Validasikan apakah aplikasi adalah anggota Grup. Check Membership.

Membuat kluster Kafka dengan proksi REST diaktifkan

Langkah-langkahnya menggunakan portal Azure. Untuk contoh penggunaan Azure CLI, lihat Membuat kluster proksi Apache Kafka REST menggunakan Azure CLI.

  1. Selama alur kerja pembuatan kluster Kafka, di tab Keamanan + jaringan, centang opsi Aktifkan proksi Kafka REST.

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. Klik Pilih Grup Keamanan. Dari daftar grup keamanan, pilih grup keamanan tempat Anda ingin memiliki akses ke proksi REST. Anda bisa menggunakan kotak pencarian untuk menemukan grup keamanan yang sesuai. Klik tombol Pilih di bagian bawah.

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. Selesaikan langkah-langkah yang tersisa untuk membuat kluster Anda seperti yang dijelaskan di Membuat kluster Apache Kafka di Azure HDInsight menggunakan portal Microsoft Azure.

  4. Setelah kluster dibuat, buka properti kluster untuk merekam URL proksi Kafka REST.

    view REST proxy URL.

Sampel aplikasi klien

Anda dapat menggunakan kode Python untuk berinteraksi dengan proksi REST di kluster Kafka Anda. Untuk menggunakan sampel kode, ikuti langkah-langkah berikut:

  1. Simpan kode sampel pada mesin dengan Python terpasang.

  2. Instal dependensi Python yang diperlukan dengan menjalankan pip3 install msal.

  3. Ubah bagian kode Konfigurasikan properti ini dan perbarui properti berikut untuk lingkungan Anda:

    Properti Deskripsi
    ID Penyewa Penyewa Azure tempat langganan Anda berada.
    ID klien ID untuk aplikasi yang Anda daftarkan di grup keamanan.
    Rahasia Klien Rahasia untuk aplikasi yang Anda daftarkan di grup keamanan.
    Kafkarest_endpoint Dapatkan nilai ini dari tab Properti di ikhtisar kluster seperti yang dijelaskan di bagian penyebaran. Nilai harus dalam format berikut – https://<clustername>-kafkarest.azurehdinsight.net
  4. Dari baris perintah, jalankan file Python dengan menjalankan sudo python3 <filename.py>

Kode ini melakukan tindakan berikut:

  1. Mengambil token OAuth dari ID Microsoft Entra.
  2. Menampilkan cara membuat permintaan ke proksi Kafka REST.

Untuk informasi selengkapnya tentang mendapatkan token OAuth di Python, lihat Kelas Python AuthenticationContext. Anda mungkin melihat penundaan saat topics tidak dibuat atau dihapus melalui proksi Kafka REST tercermin di sana. Penundaan ini dikarenakan refresh tembolok. Bidang nilai Producer API telah ditingkatkan. Sekarang, ia menerima objek JSON dan bentuk serial apa pun.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Temukan di bawah sampel lain tentang cara mendapatkan token dari Azure untuk proksi REST menggunakan perintah curl. Perhatikan bahwa kita membutuhkan scope=https://hib.azurehdinsight.net/.default yang ditentukan saat mendapatkan token.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Langkah berikutnya