REST ara sunucusu kullanarak Azure HDInsight'ta Apache Kafka kümeleriyle etkileşim kurma

Kafka REST Proxy, HTTPS üzerinden bir REST API aracılığıyla Kafka kümenizle etkileşim kurmanızı sağlar. Bu eylem, Kafka istemcilerinizin sanal ağınızın dışında olabileceği anlamına gelir. İstemciler Kafka kitaplıklarına güvenmek yerine Kafka kümesine basit ve güvenli HTTPS çağrıları yapabilir. Bu makalede REST proxy özellikli Kafka kümesinin nasıl oluşturulacağı gösterilmektedir. Ayrıca REST ara sunucusuna nasıl çağrı yapılacağını gösteren örnek bir kod sağlar.

REST API başvurusu

Kafka REST API tarafından desteklenen işlemler için bkz . HDInsight Kafka REST Proxy API Başvurusu.

Background

Kafka REST proxy design.

API tarafından desteklenen işlemlerin tam belirtimi için bkz . Apache Kafka REST Proxy API'si.

REST Proxy uç noktası

REST proxy ile HDInsight Kafka kümesi oluşturmak, kümeniz için azure portalındaki HDInsight küme özelliklerinde bulabileceğiniz yeni bir genel uç nokta oluşturur.

Güvenlik

Microsoft Entra güvenlik gruplarıyla yönetilen Kafka REST proxy'sine erişim. Kafka kümesini oluştururken Microsoft Entra güvenlik grubuna REST uç nokta erişimi sağlayın. REST proxy'sine erişmesi gereken Kafka istemcileri, grup sahibi tarafından bu gruba kaydedilmelidir. Grup sahibi Portal veya PowerShell aracılığıyla kaydolabilir.

REST proxy uç noktası istekleri için istemci uygulamaları bir OAuth belirteci almalıdır. Belirteç, güvenlik grubu üyeliğini doğrulamak için kullanır. İstemci uygulaması örneği bulma, OAuth belirtecinin nasıl alınacaklarını gösterir. İstemci uygulaması HTTPS isteğindeki OAuth belirtecini REST proxy'ye geçirir.

Not

Microsoft Entra güvenlik grupları hakkında daha fazla bilgi edinmek için bkz . Microsoft Entra gruplarını kullanarak uygulama ve kaynak erişimini yönetme. OAuth belirteçlerinin nasıl çalıştığı hakkında daha fazla bilgi için bkz . OAuth 2.0 kod verme akışını kullanarak Microsoft Entra web uygulamalarına erişimi yetkilendirme.

Ağ Güvenlik Grupları ile Kafka REST proxy

Kendi sanal ağınızı getirir ve ağ güvenlik gruplarıyla ağ trafiğini denetlerseniz, 443 numaralı bağlantı noktasına ek olarak 9400 numaralı bağlantı noktasında gelen trafiğe izin verin. Bu, Kafka REST proxy sunucusuna ulaşılabilir olmasını sağlar.

Önkoşullar

  1. Microsoft Entra ID ile bir uygulamayı kaydetme. Kafka REST proxy'si ile etkileşime geçmek için yazdığınız istemci uygulamaları, Azure'da kimlik doğrulaması yapmak için bu uygulamanın kimliğini ve gizli dizisini kullanır.

  2. Bir Microsoft Entra güvenlik grubu oluşturun. Microsoft Entra Id ile kaydettiğiniz uygulamayı güvenlik grubuna grubun bir üyesi olarak ekleyin. Bu güvenlik grubu, REST proxy ile etkileşime izin veren uygulamaları denetlemek için kullanılır. Microsoft Entra grupları oluşturma hakkında daha fazla bilgi için bkz . Temel grup oluşturma ve Microsoft Entra Id kullanarak üye ekleme.

    Grubun Güvenlik türünde olduğunu doğrulayın. Security Group.

    Uygulamanın Grubun üyesi olduğunu doğrulayın. Check Membership.

REST proxy'si etkin bir Kafka kümesi oluşturma

Bu adımlarda Azure portalı kullanılır. Azure CLI kullanan bir örnek için bkz . Azure CLI kullanarak Apache Kafka REST proxy kümesi oluşturma.

  1. Kafka kümesi oluşturma iş akışı sırasında, Güvenlik + ağ sekmesinde Kafka REST proxy'sini etkinleştir seçeneğini işaretleyin.

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. Güvenlik Grubu Seç'e tıklayın. Güvenlik grupları listesinden REST ara sunucusuna erişimi olmasını istediğiniz güvenlik grubunu seçin. Uygun güvenlik grubunu bulmak için arama kutusunu kullanabilirsiniz. Alttaki Seç düğmesine tıklayın.

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. Azure portalını kullanarak Azure HDInsight'ta Apache Kafka kümesi oluşturma bölümünde açıklandığı gibi kümenizi oluşturmak için kalan adımları tamamlayın.

  4. Küme oluşturulduktan sonra, Kafka REST proxy URL'sini kaydetmek için küme özelliklerine gidin.

    view REST proxy URL.

İstemci uygulaması örneği

Python kodunu kullanarak Kafka kümenizdeki REST proxy ile etkileşim kurabilirsiniz. Kod örneğini kullanmak için şu adımları izleyin:

  1. Örnek kodu Python yüklü bir makineye kaydedin.

  2. yürüterek pip3 install msalgerekli Python bağımlılıklarını yükleyin.

  3. Kod bölümünü değiştirin Bu özellikleri yapılandırın ve ortamınız için aşağıdaki özellikleri güncelleştirin:

    Özellik Açıklama
    Kiracı kimliği Aboneliğinizin bulunduğu Azure kiracısı.
    Client ID Güvenlik grubuna kaydettiğiniz uygulamanın kimliği.
    İstemci Gizli Anahtarı Güvenlik grubuna kaydettiğiniz uygulamanın gizli dizisi.
    Kafkarest_endpoint Bu değeri dağıtım bölümünde açıklandığı gibi kümeye genel bakış bölümündeki Özellikler sekmesinden alın. Aşağıdaki biçimde olmalıdır – https://<clustername>-kafkarest.azurehdinsight.net
  4. Komut satırından python dosyasını yürüterek sudo python3 <filename.py>

Bu kod aşağıdaki eylemi yapar:

  1. Microsoft Entra Id'den bir OAuth belirteci getirir.
  2. Kafka REST proxy'sine istekte bulunmayı gösterir.

Python'da OAuth belirteçlerini alma hakkında daha fazla bilgi için bkz . Python AuthenticationContext sınıfı. Kafka REST proxy'si aracılığıyla oluşturulmayan veya silinmemiş bir gecikmenin topics buraya yansıtıldığını görebilirsiniz. Bu gecikmenin nedeni önbellek yenilemesidir. Üretici API'sinin değer alanı geliştirildi. Şimdi JSON nesnelerini ve herhangi bir serileştirilmiş formu kabul eder.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Curl komutu kullanarak REST için Azure proxy'sinden belirteç alma hakkında başka bir örneği aşağıda bulabilirsiniz. Belirteç alırken belirtilene scope=https://hib.azurehdinsight.net/.default ihtiyacımız olduğuna dikkat edin.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Sonraki adımlar