Interagera med Apache Kafka-kluster i Azure HDInsight med hjälp av en REST-proxy

Med Kafka REST Proxy kan du interagera med ditt Kafka-kluster via ett REST API via HTTPS. Den här åtgärden innebär att dina Kafka-klienter kan finnas utanför ditt virtuella nätverk. Klienter kan göra enkla, säkra HTTPS-anrop till Kafka-klustret i stället för att förlita sig på Kafka-bibliotek. Den här artikeln visar hur du skapar ett REST-proxyaktiverat Kafka-kluster. Innehåller också en exempelkod som visar hur du gör anrop till REST-proxy.

Referens för REST-API

Åtgärder som stöds av Kafka REST API finns i REFERENS för HDInsight Kafka REST Proxy API.

Bakgrund

Kafka REST proxy design.

Fullständig specifikation av åtgärder som stöds av API:et finns i Apache Kafka REST Proxy API.

REST Proxy-slutpunkt

När du skapar ett HDInsight Kafka-kluster med REST-proxy skapas en ny offentlig slutpunkt för klustret, som du hittar i hdinsight-klusteregenskaperna på Azure-portalen.

Säkerhet

Åtkomst till Kafka REST-proxyn som hanteras med Microsoft Entra-säkerhetsgrupper. När du skapar Kafka-klustret anger du säkerhetsgruppen Microsoft Entra med REST-slutpunktsåtkomst. Kafka-klienter som behöver åtkomst till REST-proxyn bör registreras i den här gruppen av gruppägaren. Gruppägaren kan registrera sig via portalen eller via PowerShell.

För REST-proxyslutpunktsbegäranden bör klientprogram hämta en OAuth-token. Token används för att verifiera medlemskap i säkerhetsgrupper. Leta upp ett klientprogramexempel som visar hur du hämtar en OAuth-token. Klientprogrammet skickar OAuth-token i HTTPS-begäran till REST-proxyn.

Kafka REST-proxy med nätverkssäkerhetsgrupper

Om du tar med ditt eget virtuella nätverk och styr nätverkstrafiken med nätverkssäkerhetsgrupper tillåter du inkommande trafik på port 9400 utöver port 443. Detta säkerställer att Kafka REST-proxyservern kan nås.

Förutsättningar

  1. Registrera ett program med Microsoft Entra ID. De klientprogram som du skriver för att interagera med Kafka REST-proxyn använder programmets ID och hemlighet för att autentisera till Azure.

  2. Skapa en Microsoft Entra-säkerhetsgrupp. Lägg till programmet som du har registrerat med Microsoft Entra-ID i säkerhetsgruppen som medlem i gruppen. Den här säkerhetsgruppen används för att styra vilka program som tillåts interagera med REST-proxyn. Mer information om hur du skapar Microsoft Entra-grupper finns i Skapa en grundläggande grupp och lägga till medlemmar med hjälp av Microsoft Entra-ID.

    Kontrollera att gruppen är av typen Säkerhet. Security Group.

    Verifiera att programmet är medlem i Grupp. Check Membership.

Skapa ett Kafka-kluster med REST-proxy aktiverat

Stegen använder Azure-portalen. Ett exempel på hur du använder Azure CLI finns i Skapa Apache Kafka REST-proxykluster med Azure CLI.

  1. Under arbetsflödet för att skapa Kafka-kluster går du till fliken Säkerhet + nätverk och markerar alternativet Aktivera Kafka REST-proxy .

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. Klicka på Välj säkerhetsgrupp. I listan över säkerhetsgrupper väljer du den säkerhetsgrupp som du vill ha åtkomst till REST-proxyn. Du kan använda sökrutan för att hitta rätt säkerhetsgrupp. Klicka på knappen Välj längst ned.

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. Slutför de återstående stegen för att skapa klustret enligt beskrivningen i Skapa Apache Kafka-kluster i Azure HDInsight med Hjälp av Azure-portalen.

  4. När klustret har skapats går du till klusteregenskaperna för att registrera Kafka REST-proxy-URL:en.

    view REST proxy URL.

Exempel på klientprogram

Du kan använda Python-koden för att interagera med REST-proxyn i ditt Kafka-kluster. Följ dessa steg om du vill använda kodexemplet:

  1. Spara exempelkoden på en dator med Python installerat.

  2. Installera nödvändiga Python-beroenden genom att pip3 install msalköra .

  3. Ändra kodavsnittet Konfigurera dessa egenskaper och uppdatera följande egenskaper för din miljö:

    Property beskrivning
    Klientorganisations-ID Den Azure-klientorganisation där din prenumeration finns.
    Client ID ID:t för programmet som du registrerade i säkerhetsgruppen.
    Klienthemlighet Hemligheten för programmet som du registrerade i säkerhetsgruppen.
    Kafkarest_endpoint Hämta det här värdet från fliken Egenskaper i klusteröversikten enligt beskrivningen i distributionsavsnittet. Det bör vara i följande format – https://<clustername>-kafkarest.azurehdinsight.net
  4. Från kommandoraden kör du Python-filen genom att köra sudo python3 <filename.py>

Den här koden utför följande åtgärd:

  1. Hämtar en OAuth-token från Microsoft Entra-ID.
  2. Visar hur du gör en begäran till Kafka REST-proxyn.

Mer information om hur du hämtar OAuth-token i Python finns i Klassen Python AuthenticationContext. Du kan se en fördröjning när topics den inte skapas eller tas bort via Kafka REST-proxyn återspeglas där. Den här fördröjningen beror på cacheuppdatering. Värdefältet för producent-API:et har förbättrats. Nu accepterar den JSON-objekt och alla serialiserade formulär.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Nedan hittar du ett annat exempel på hur du hämtar en token från Azure for REST-proxyn med hjälp av ett curl-kommando. Observera att vi behöver den scope=https://hib.azurehdinsight.net/.default angivna när du hämtar en token.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Nästa steg