Interacción con clústeres de Apache Kafka en Azure HDInsight mediante un proxy de REST

El proxy REST de Kafka le permite interactuar con el clúster de Kafka mediante una API REST a través de HTTPS. Esta acción significa que los clientes de Kafka pueden estar fuera de la red virtual. Los clientes pueden realizar llamadas HTTPS sencillas y seguras al clúster de Kafka, en lugar de depender de las bibliotecas de Kafka. En este artículo se muestra cómo crear un clúster de Kafka habilitado para servidor proxy REST. También proporciona código de ejemplo que muestra cómo realizar llamadas al servidor proxy REST.

Referencia de la API REST

Para conocer las operaciones que admite la API REST de Kafka, consulte Referencia de la API del servidor proxy REST de Kafka en HDInsight.

Información previa

Kafka REST proxy design.

Para obtener la especificación completa de las operaciones que admite la API, consulte API del servidor proxy REST de Apache Kafka.

Punto de conexión de proxy de REST

Al crear un clúster de Kafka en HDInsight con el servidor proxy REST, se crea un nuevo punto de conexión público para el clúster, que se puede encontrar en las Propiedades del clúster de HDInsight en Azure Portal.

Seguridad

Acceso al proxy REST de Kafka administrado con grupos de seguridad de Microsoft Entra. Al crear el clúster Kafka, proporcione al grupo de seguridad Microsoft Entra acceso al punto de conexión REST. El propietario del grupo debe registrar en este grupo los clientes de Kafka que necesitan acceso al servidor proxy REST. El propietario del grupo puede realizar el registro mediante el portal o PowerShell.

Para las solicitudes al punto de conexión proxy REST, las aplicaciones cliente deben obtener un token de OAuth. El token se usa para comprobar la pertenencia al grupo de seguridad. Busque un ejemplo de aplicación cliente que muestre cómo obtener un token de OAuth. La aplicación cliente pasa el token de OAuth de la solicitud HTTPS al servidor proxy REST.

Nota:

Consulte Administrar el acceso a aplicaciones y recursos mediante grupos de Microsoft Entra, para obtener más información sobre los grupos de seguridad de Microsoft Entra. Para obtener más información sobre cómo funcionan los tokens OAuth, consulte Autorizar el acceso a aplicaciones web Microsoft Entra utilizando el flujo de concesión de código OAuth 2.0.

Proxy REST de Kafka con grupos de seguridad de red

Si aporta su propia red virtual y controla el tráfico de red con grupos de seguridad de red, permita el tráfico entrante en el puerto 9400, además del puerto 443. Esto garantiza que el proxy REST de Kafka sea accesible.

Requisitos previos

  1. Registro de una aplicación con Microsoft Entra ID. Las aplicaciones cliente que escriba para interactuar con el proxy de REST de Kafka usan el identificador y el secreto de la aplicación para autenticarse en Azure.

  2. Cree un grupo de seguridad Microsoft Entra. Agregue la aplicación que ha registrado con Microsoft Entra ID al grupo de seguridad como miembro del grupo. Este grupo de seguridad se usará para controlar qué aplicaciones pueden interactuar con el proxy de REST. Para obtener más información sobre la creación de grupos de Microsoft Entra, consulte Crear un grupo básico y agregar miembros utilizando Microsoft Entra ID.

    Validar que el grupo es de tipo Seguridad. Security Group.

    Valide que la aplicación es miembro del grupo. Check Membership.

Creación de un clúster de Kafka con el proxy de REST habilitado

Los pasos usan Azure Portal. Para ver un ejemplo con la CLI de Azure, consulte Creación de un clúster para el proxy REST de Apache Kafka con la CLI de Azure.

  1. Durante el flujo de trabajo de creación del clúster de Kafka, en la pestaña Seguridad y redes, active la opción Habilitar proxy REST de Kafka.

    Screenshot shows the Create HDInsight cluster page with Security + networking selected.

  2. Haga clic en Seleccionar grupo de seguridad. En la lista de grupos de seguridad, seleccione el que quiere que tenga acceso al proxy de REST. Puede usar el cuadro de búsqueda para buscar el grupo de seguridad adecuado. Haga clic en el botón Seleccionar de la parte inferior.

    Screenshot shows the Create HDInsight cluster page with the option to select a security group.

  3. Complete los pasos restantes para crear el clúster como se describe en Creación de un clúster de Apache Kafka en Azure HDInsight mediante Azure Portal.

  4. Una vez creado el clúster, vaya a las propiedades del clúster para registrar la URL del proxy de REST de Kafka.

    view REST proxy URL.

Nombre de la aplicación cliente

Puede usar el código de Python para interactuar con el proxy de REST en el clúster de Kafka. Para ejecutar el ejemplo de código, siga estos pasos:

  1. Guarde el código de ejemplo en una máquina con Python instalado.

  2. Para instalar las dependencias de Python necesarias, ejecute pip3 install msal.

  3. Modifique la sección de código Configure these properties (Configurar estas propiedades) y actualice las siguientes propiedades en su entorno:

    Propiedad Descripción
    Id. de inquilino Inquilino de Azure donde se encuentra la suscripción.
    Id. de cliente Identificador de la aplicación que registró en el grupo de seguridad.
    Secreto del cliente Secreto de la aplicación que registró en el grupo de seguridad.
    Kafkarest_endpoint Obtenga este valor en la pestaña Propiedades de la información general del clúster, tal como se describe en la sección de implementación. Debería tener el siguiente formato: https://<clustername>-kafkarest.azurehdinsight.net.
  4. Desde la línea de comandos, ejecute el archivo Python mediante la ejecución de sudo python3 <filename.py>.

Este código realiza la siguiente acción:

  1. Obtiene un token OAuth de Microsoft Entra ID.
  2. Muestra cómo realizar una solicitud al servidor proxy REST de Kafka.

Para obtener más información sobre cómo obtener tokens de OAuth en Python, consulte Clase AuthenticationContext de Python. Es posible que vea un retraso en tanto que el topics que no se crea o elimina mediante el servidor proxy REST de Kafka se reflejan. Este retraso se debe a la actualización de la memoria caché. Se ha mejorado el valor campo de Producer API. Ahora, acepta objetos JSON y cualquier forma serializada.

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string


#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break
        
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

Consulte a continuación otro ejemplo sobre cómo obtener un token de Azure para el servidor proxy REST mediante el comando curl. Observe que se tiene que especificar scope=https://hib.azurehdinsight.net/.default al obtener un token.

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'

Pasos siguientes