Interactie met Apache Kafka clusters in Azure HDInsight met behulp van een REST-proxy

Met de Kafka REST-proxy kunt u communiceren met uw Kafka-cluster via een REST API via HTTPS. Deze actie betekent dat uw Kafka-clients buiten uw virtuele netwerk kunnen zijn. Clients kunnen eenvoudige, veilige HTTPS-aanroepen naar het Kafka-cluster maken in plaats van te vertrouwen op Kafka-bibliotheken. In dit artikel wordt beschreven hoe u een Kafka-cluster met REST-proxy kunt maken. Bevat ook een voorbeeldcode die laat zien hoe u REST-proxy kunt aanroepen.

Naslaginformatie over REST-API

Zie HDInsight Kafka REST Proxy API Reference (Naslaginformatie over de REST-API voor HDInsight Kafka REST)voor bewerkingen die worden ondersteund door de Kafka-REST API.

Achtergrond

Kafka REST-proxyontwerp

Zie rest proxy-API voor de volledige specificatie van de bewerkingen die door de API worden Apache Kafka.

REST Proxy-eindpunt

Als u een HDInsight Kafka-cluster met REST-proxy maakt, wordt er een nieuw openbaar eindpunt voor uw cluster gemaakt. U vindt dit in de eigenschappen van uw HDInsight-cluster op de Azure Portal.

Beveiliging

Toegang tot de Kafka REST-proxy wordt beheerd Azure Active Directory beveiligingsgroepen. Wanneer u het Kafka-cluster maakt, geeft u de Azure AD-beveiligingsgroep rest-eindpunttoegang. Kafka-clients die toegang tot de REST-proxy nodig hebben, moeten door de groepseigenaar worden geregistreerd bij deze groep. De groepseigenaar kan zich registreren via de portal of via PowerShell.

Voor aanvragen van REST-proxy-eindpunten moeten clienttoepassingen een OAuth-token krijgen. Het token wordt gebruikt om het lidmaatschap van de beveiligingsgroep te controleren. Zoek hieronder een voorbeeld van een clienttoepassing die laat zien hoe u een OAuth-token kunt krijgen. De clienttoepassing geeft het OAuth-token in de HTTPS-aanvraag door aan de REST-proxy.

Notitie

Zie Toegang tot apps en resources beheren Azure Active Directory groepenvoor meer informatie over AAD-beveiligingsgroepen. Zie Toegang verlenen tot Azure Active Directory-webtoepassingen met behulp van de OAuth 2.0-stroom voor hetverlenen van code voor meer informatie over hoe OAuth-tokens werken.

Kafka REST-proxy met netwerkbeveiligingsgroepen

Als u uw eigen VNet gebruikt en netwerkverkeer met netwerkbeveiligingsgroepen controleert, staat u naast poort 443 ook inkomende verkeer toe op poort 9400. Dit zorgt ervoor dat de Kafka REST-proxyserver bereikbaar is.

Vereisten

  1. U registreert een toepassing met Azure AD. De clienttoepassingen die u schrijft om te communiceren met de Kafka REST-proxy gebruiken de id en het geheim van deze toepassing voor verificatie bij Azure.

  2. Maak een Azure AD-beveiligingsgroep. Voeg de toepassing die u hebt geregistreerd bij Azure AD toe aan de beveiligingsgroep als lid van de groep. Deze beveiligingsgroep wordt gebruikt om te bepalen welke toepassingen mogen communiceren met de REST-proxy. Zie Een basisgroep maken en leden toevoegen met Azure Active Directory voor meer informatie over het maken van Azure AD-groepen.

    Controleer of de groep van het type Beveiliging is. Beveiligingsgroep

    Controleer of de toepassing lid is van groep. Lidmaatschap controleren

Een Kafka-cluster maken met REST-proxy ingeschakeld

In de onderstaande stappen wordt de Azure Portal. Zie Create Apache Kafka REST proxy cluster using Azure CLI (Een REST-proxyclustermaken met behulp van Azure CLI) voor een voorbeeld van het gebruik van Azure CLI.

  1. Schakel tijdens het maken van het Kafka-cluster op het tabblad Beveiliging en netwerken de optie Kafka REST-proxy inschakelen in.

    Schermopname van de pagina H D Insight-cluster maken met Beveiliging en netwerken geselecteerd.

  2. Klik op Beveiligingsgroep selecteren. Selecteer in de lijst met beveiligingsgroepen de beveiligingsgroep die u toegang wilt geven tot de REST-proxy. U kunt het zoekvak gebruiken om de juiste beveiligingsgroep te vinden. Klik onderaan op de knop Selecteren.

    Schermopname van de pagina H D Insight-cluster maken met de optie om een beveiligingsgroep te selecteren.

  3. Voltooi de resterende stappen voor het maken van uw cluster, zoals beschreven in Apache Kafka cluster maken in Azure HDInsight met behulp Azure Portal.

  4. Zodra het cluster is gemaakt, gaat u naar de clustereigenschappen om de Kafka REST-proxy-URL vast te stellen.

    REST-proxy-URL weergeven

Voorbeeld van clienttoepassing

U kunt de onderstaande Python-code gebruiken om te communiceren met de REST-proxy in uw Kafka-cluster. Volg deze stappen om het codevoorbeeld te gebruiken:

  1. Sla de voorbeeldcode op een computer op met Python geïnstalleerd.

  2. Installeer vereiste Python-afhankelijkheden door uit te pip3 install msal voeren.

  3. Wijzig de codesectie Configureer deze eigenschappen en werk de volgende eigenschappen bij voor uw omgeving:

    Eigenschap Beschrijving
    Tenant-id De Azure-tenant waar uw abonnement zich is.
    Client-id De id voor de toepassing die u hebt geregistreerd in de beveiligingsgroep.
    Clientgeheim Het geheim voor de toepassing die u hebt geregistreerd in de beveiligingsgroep.
    Kafkarest_endpoint Haal deze waarde op via het tabblad Eigenschappen in het clusteroverzicht, zoals beschreven in de implementatiesectie. Deze moet de volgende indeling hebben: https://<clustername>-kafkarest.azurehdinsight.net
  4. Voer vanaf de opdrachtregel het Python-bestand uit door uit te voeren sudo python3 <filename.py>

Met deze code wordt de volgende actie ondernomen:

  1. Haalt een OAuth-token op uit Azure AD.
  2. Laat zien hoe u een aanvraag kunt indienen bij de Kafka REST-proxy.

Zie Python AuthenticationContext-klassevoor meer informatie over het verkrijgen van OAuth-tokens in Python. Mogelijk ziet u een vertraging terwijl die niet is gemaakt of verwijderd via topics de Kafka REST-proxy daar wordt weergegeven. Deze vertraging is het gevolg van het vernieuwen van de cache. Het waardeveld van de Producer-API is verbeterd. Nu worden JSON-objecten en alle geseraliseerde vormen geaccepteerd.

#Required python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Hieronder vindt u nog een voorbeeld van het krijgen van een token uit Azure for REST-proxy met behulp van een curl-opdracht. U ziet dat de opgegeven scope=https://hib.azurehdinsight.net/.default moet zijn tijdens het verkrijgen van een token.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Volgende stappen