Envío o recepción de eventos desde Event Hubs mediante Python

En este inicio rápido se muestra cómo enviar y recibir eventos desde un centro de eventos mediante el paquete de Python azure-eventhub.

Requisitos previos

Si es la primera vez que usa Azure Event Hubs, consulte la información general de Event Hubs antes de continuar con este inicio rápido.

Para completar este tutorial de inicio rápido, debe cumplir los siguientes requisitos previos:

  • Una suscripción a Microsoft Azure. Para usar los servicios de Azure, entre los que se incluye Azure Event Hubs, se necesita una suscripción. Si no tiene una cuenta de Azure, puede registrarse para obtener una evaluación gratuita.
  • Python 3.8, o posterior, con PIP instalado y actualizado.
  • Visual Studio Code (se recomienda) o cualquier otro entorno de desarrollo integrado (IDE).
  • Creación de un espacio de nombres de Event Hubs y un centro de eventos. El primer paso consiste en usar Azure Portal para crear un espacio de nombres de Event Hubs y obtener las credenciales de administración que la aplicación necesita para comunicarse con el centro de eventos. Para crear un espacio de nombres y un centro de eventos, siga el procedimiento que se indica en este artículo.

Instalación de los paquetes para el envío de eventos

Para instalar los paquetes de Python para Event Hubs, abra un símbolo del sistema que tenga Python en la ruta de acceso. Cambie el directorio a la carpeta donde desea guardar los ejemplos.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Autenticación de la aplicación en Azure

En este inicio rápido se muestran dos maneras de conectarse a Azure Event Hubs: sin contraseña y con la cadena de conexión. La primera opción muestra cómo usar la entidad de seguridad de Microsoft Entra ID y el control de acceso basado en roles (RBAC) para conectarse a un espacio de nombres de Event Hubs. No es necesario preocuparse por tener cadenas de conexión codificadas de forma rígida en el código, en un archivo de configuración o en un almacenamiento seguro, como Azure Key Vault. La segunda opción muestra cómo usar una cadena de conexión para conectarse a un espacio de nombres de Event Hubs. Si no está familiarizado con Azure, puede que le resulte más fácil la opción de la cadena de conexión. Se recomienda usar la opción sin contraseña en aplicaciones reales y entornos de producción. Para más información, consulte Autenticación y autorización. También puede obtener más información sobre la autenticación sin contraseña en la página de información general.

Asignación de roles al usuario de Microsoft Entra

Al desarrollar localmente, asegúrese de que la cuenta de usuario que se conecta a Azure Event Hubs tenga los permisos correctos. Necesitará el rol propietario de datos Azure Event Hubs para enviar y recibir mensajes. Para asignarse este rol, necesitará el rol Administrador de accesos de usuarios, u otro que incluya la acción Microsoft.Authorization/roleAssignments/write. Puede asignar roles RBAC de Azure a un usuario mediante Azure Portal, la CLI de Azure o Azure PowerShell. Puede obtener más información sobre los ámbitos disponibles para las asignaciones de roles en la página de información general del ámbito.

En el ejemplo siguiente se asigna el rol Azure Event Hubs Data Owner a la cuenta de usuario, que proporciona un acceso completo a los recursos de Azure Event Hubs. En un escenario real, siga el Principio de privilegios mínimos para conceder a los usuarios solo los permisos mínimos necesarios para un entorno de producción más seguro.

Roles integrados de Azure para Azure Event Hubs

En el caso de Azure Event Hubs, la administración de los espacios de nombres y de todos los recursos relacionados mediante Azure Portal y la API de administración de recursos de Azure, ya se ha protegido mediante el modelo de Azure RBAC. Azure proporciona los siguientes roles integrados de Azure para autorizar el acceso a un espacio de nombres de Event Hubs:

Si desea crear un rol personalizado, consulte Derechos necesarios para las operaciones de Event Hubs.

Importante

En la mayoría de los casos, la asignación de roles tardará un minuto o dos en propagarse en Azure. En ocasiones excepcionales, puede tardar hasta ocho minutos. Si recibe errores de autenticación al ejecutar por primera vez el código, espere unos instantes e inténtelo de nuevo.

  1. En Azure Portal, localice el espacio de nombres de Event Hubs mediante la barra de búsqueda principal o el panel de navegación de la izquierda.

  2. En la página de información general, seleccione Control de acceso (IAM) en el menú de la izquierda.

  3. En la página Control de acceso (IAM), seleccione la pestaña Asignación de roles.

  4. Seleccione + Agregar en el menú superior y, a continuación, Agregar asignación de roles en el menú desplegable resultante.

    A screenshot showing how to assign a role.

  5. Puede usar el cuadro de búsqueda para filtrar los resultados por el rol deseado. En este ejemplo, busque Azure Event Hubs Data Owner y seleccione el resultado coincidente. Después, haga clic en Siguiente.

  6. En la pestaña Asignar acceso a, seleccione Usuario, grupo o entidad de servicio y, a continuación, elija + Seleccionar miembros.

  7. En el cuadro de diálogo, busque el nombre de usuario de Microsoft Entra (normalmente su dirección de correo electrónico de user@domain) y, a continuación, elija Seleccionar en la parte inferior del cuadro de diálogo.

  8. Seleccione Revisar y asignar para ir a la página final y, a continuación, de nuevo Revisar y asignar para completar el proceso.

Envío de eventos

En esta sección, cree un script de Python para enviar eventos al centro de eventos que creó anteriormente.

  1. Abra el editor de Python que prefiera, como Visual Studio Code.

  2. Cree un script llamado send.py. Este script envía un lote de eventos al centro de eventos que creó anteriormente.

  3. Pegue el siguiente código en send.py:

    En el código, use valores reales para reemplazar los siguientes marcadores de posición:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Nota

    Para ver ejemplos de otras opciones de envío de eventos a Event Hubs de forma asincrónica usando una cadena de conexión, consulte la página send_async.py de GitHub. Los patrones que se muestran también son aplicables al envío de eventos sin contraseña.

Recepción de eventos

En este inicio rápido se usa Azure Blob Storage como almacén de puntos de control. El almacén de puntos de control se usa para conservar los puntos de control (es decir, las últimas posiciones de lectura).

Siga estas recomendaciones al usar Azure Blob Storage como almacén de puntos de control:

  • Use un contenedor independiente para cada grupo de consumidores. Puede usar la misma cuenta de almacenamiento, pero usar un contenedor por cada grupo.
  • No use el contenedor ni la cuenta de almacenamiento para otras actividades.
  • La cuenta de almacenamiento debe estar en la misma región en la que se encuentra la aplicación implementada. Si la aplicación es local, intente elegir la región más cercana posible.

En la página Cuenta de almacenamiento de Azure Portal, en la sección Blob service, asegúrese de que la siguiente configuración está deshabilitada.

  • Espacio de nombres jerárquico
  • Eliminación temporal de blobs
  • Control de versiones

Creación de una cuenta de Azure Storage y un contenedor de blobs

Cree una cuenta de Azure Storage y un contenedor de blobs en ella, para lo que debe seguir estos pasos:

  1. Creación de una cuenta de Azure Storage
  2. Cree un contenedor de blobs.
  3. Autenticación en el contenedor de blobs.

Asegúrese de registrar la cadena de conexión y el nombre del contenedor para usarlo posteriormente en el código de recepción.

Al desarrollar localmente, asegúrese de que la cuenta de usuario que accede a los datos de blob tenga los permisos correctos. Necesitará el Colaborador de datos de blobs de almacenamiento para leer y escribir datos de blob. Para asignarse este rol a sí mismo, necesitará que se le asigne el rol Administrador de acceso de usuario u otro rol que incluya la acción Microsoft.Authorization/roleAssignments/write. Puede asignar roles RBAC de Azure a un usuario mediante Azure Portal, la CLI de Azure o Azure PowerShell. Puede obtener más información sobre los ámbitos disponibles para las asignaciones de roles en la página de información general del ámbito.

En este escenario, asignará permisos a la cuenta de usuario, cuyo ámbito es la cuenta de almacenamiento, a fin de seguir el principio de privilegios mínimos. Esta práctica solo proporciona a los usuarios los permisos mínimos necesarios y crea entornos de producción más seguros.

En el ejemplo siguiente se asignará el rol Colaborador de datos de blobs de almacenamiento a la cuenta de usuario, que proporciona acceso de lectura y escritura a los datos de blobs de la cuenta de almacenamiento.

Importante

En la mayoría de los casos, la asignación de roles tardará un minuto o dos en propagarse en Azure, pero en casos excepcionales puede tardar hasta ocho minutos. Si recibe errores de autenticación al ejecutar por primera vez el código, espere unos instantes e inténtelo de nuevo.

  1. En Azure Portal, busque la cuenta de almacenamiento mediante la barra de búsqueda principal o el panel de navegación de la izquierda.

  2. En la página de información general de la cuenta de almacenamiento, seleccione Control de acceso (IAM) en el menú de la izquierda.

  3. En la página Control de acceso (IAM), seleccione la pestaña Asignación de roles.

  4. Seleccione + Agregar en el menú superior y, a continuación, Agregar asignación de roles en el menú desplegable resultante.

    A screenshot showing how to assign a storage account role.

  5. Puede usar el cuadro de búsqueda para filtrar los resultados por el rol deseado. En este ejemplo, busque Colaborador de datos de blobs de almacenamiento y seleccione el resultado coincidente y, a continuación, elija Siguiente.

  6. En la pestaña Asignar acceso a, seleccione Usuario, grupo o entidad de servicio y, a continuación, elija + Seleccionar miembros.

  7. En el cuadro de diálogo, busque el nombre de usuario de Microsoft Entra (normalmente su dirección de correo electrónico de user@domain) y, a continuación, elija Seleccionar en la parte inferior del cuadro de diálogo.

  8. Seleccione Revisar y asignar para ir a la página final y, a continuación, de nuevo Revisar y asignar para completar el proceso.

Instalación de los paquetes para la recepción de eventos

Para la recepción, debe instalar uno o varios paquetes más. En este inicio rápido se usar Azure Blob Storage para conservar los puntos de control, con el fin de que el programa no lea los eventos que ya ha leído. Ejecuta los puntos de comprobación de metadatos en los mensajes recibidos a intervalos regulares en un blob. Este enfoque facilita la tarea de seguir recibiendo posteriormente mensajes desde donde lo dejó.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Creación de un script de Python para recibir eventos

En esta sección, creará un script de Python para recibir eventos del centro de eventos:

  1. Abra el editor de Python que prefiera, como Visual Studio Code.

  2. Cree un script llamado recv.py.

  3. Pegue el siguiente código en recv.py:

    En el código, use valores reales para reemplazar los siguientes marcadores de posición:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Nota

    Para ver ejemplos de otras opciones de recepción de eventos de Event Hubs de forma asincrónica usando una cadena de conexión, consulte la página recv_with_checkpoint_store_async.py de GitHub. Los patrones que se muestran también son aplicables a la recepción de eventos sin contraseña.

Ejecución de la aplicación del receptor

Para ejecutar el script, abra un símbolo del sistema que tenga Python en su ruta de acceso y, después, ejecute el siguiente comando:

python recv.py

Ejecución de la aplicación del remitente

Para ejecutar el script, abra un símbolo del sistema que tenga Python en su ruta de acceso y, después, ejecute el siguiente comando:

python send.py

La ventana del destinatario debería mostrar los mensajes que se enviaron al centro de eventos.

Solución de problemas

Si no ve eventos en la ventana del receptor o el código notifica un error, pruebe las siguientes sugerencias de solución de problemas:

  • Si no ve resultados de recy.py, ejecute send.py varias veces.

  • Si obtiene errores de "corrutina" al usar el código sin contraseña (con credenciales), asegúrese de que está realizando la importación desde azure.identity.aio.

  • Si obtiene el mensaje "Sesión de cliente no cerrada" con el código sin contraseña (con credenciales), asegúrese de cerrar la credencial cuando termine. Para obtener más información, consulte Credenciales asincrónicas.

  • Si obtiene errores de autorización con recv.py al acceder al almacenamiento, asegúrese de seguir los pasos descritos en Creación de una cuenta de Azure Storage y un contenedor de blobs y de asignar el rol Colaborador de datos de Storage Blob a la entidad de servicio.

  • Si recibe eventos con identificadores de partición diferentes, es un resultado normal. Las particiones son un mecanismo de organización de datos relacionado con el paralelismo de bajada necesario para consumir las aplicaciones. El número de particiones de un centro de eventos está directamente relacionado con el número de lectores simultáneos que espera tener. Para obtener más información, consulte Particiones.

Pasos siguientes

En este inicio rápido, ha enviado y recibido eventos de forma asincrónica. Para aprender a enviar y recibir eventos de forma sincrónica, vaya a la página sync_samples de GitHub.

Para ver todos los ejemplos (tanto sincrónicos como asincrónicos) de GitHub, vaya a Biblioteca cliente de Azure Event Hubs para ejemplos de Python.