Envío o recepción de eventos en Event Hubs mediante Python (azure-eventhub)
En este inicio rápido se muestra cómo enviar y recibir eventos desde un centro de eventos mediante el paquete de Python azure-eventhub.
Requisitos previos
Si es la primera vez que usa Azure Event Hubs, consulte la información general de Event Hubs antes de continuar con este inicio rápido.
Para completar este tutorial de inicio rápido, debe cumplir los siguientes requisitos previos:
Una suscripción a Microsoft Azure. Para usar los servicios de Azure, entre los que se incluye Azure Event Hubs, se necesita una suscripción. Si no se dispone de una cuenta de Azure, es posible registrarse para obtener una evaluación gratuita, o bien usar las ventajas que disfrutan los suscriptores MSDN al crear una cuenta.
Python 2.7 o 3.6 o cualquier versión posterior, con PIP instalado y actualizado.
El paquete de Python para Event Hubs.
Para instalar el paquete, ejecute este comando en un símbolo del sistema que tenga Python en su ruta de acceso:
pip install azure-eventhubInstale el siguiente paquete para recibir los eventos y usar Azure Blob Storage como almacén de puntos de control:
pip install azure-eventhub-checkpointstoreblob-aioCreación de un espacio de nombres de Event Hubs y un centro de eventos. El primer paso consiste en usar Azure Portal para crear un espacio de nombres de tipo Event Hubs y obtener las credenciales de administración que la aplicación necesita para comunicarse con el centro de eventos. Para crear un espacio de nombres y un centro de eventos, siga el procedimiento que se indica en este artículo. Después, obtenga la cadena de conexión para el espacio de nombres de Event Hubs. Para ello, siga las instrucciones del artículo: Obtenga la cadena de conexión. La utilizará más adelante en este inicio rápido.
Envío de eventos
En esta sección, se crea un script de Python para enviar eventos al centro de eventos que creó anteriormente.
Abra el editor de Python que prefiera, como Visual Studio Code.
Cree un script llamado send.py. Este script envía un lote de eventos al centro de eventos que creó anteriormente.
Pegue el siguiente código en send.py:
import asyncio from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData async def run(): # Create a producer client to send messages to the event hub. # Specify a connection string to your event hubs namespace and # the event hub name. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME") async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData('First event ')) event_data_batch.add(EventData('Second event')) event_data_batch.add(EventData('Third event')) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) loop = asyncio.get_event_loop() loop.run_until_complete(run())Nota
Para ver todo el código fuente, incluidos los comentarios informativos, vaya a la página send_async de GitHub.
Recepción de eventos
En este inicio rápido se usa Azure Blob Storage como almacén de puntos de control. El almacén de puntos de control se usa para conservar los puntos de control (es decir, las últimas posiciones de lectura).
Advertencia
Si ejecuta este código en Azure Stack Hub, experimentará errores en tiempo de ejecución a menos que tenga como destino una versión específica de la API de Storage. Esto se debe a que el SDK de Event Hubs usa la API de Azure Storage más reciente disponible en Azure, que puede que no esté disponible en la plataforma de Azure Stack Hub. Azure Stack Hub puede admitir una versión diferente del SDK de Blob Storage que las que suelen estar disponibles en Azure. Si usa Azure Blob Storage como almacén de puntos de control, compruebe la versión de la API de Azure Storage admitida para la compilación de Azure Stack Hub y establezca esa versión como destino en el código.
Por ejemplo, si trabaja en la versión 2005 de Azure Stack Hub, la versión más reciente disponible para el servicio Storage es la 2019-02-02. De forma predeterminada, la biblioteca de cliente del SDK de Event Hubs usa la versión más reciente disponible en Azure (2019-07-07 en el momento de la versión del SDK). En este caso, además de seguir los pasos de esta sección, también tendrá que agregar código para usar como destino la versión 2019-02-02 de la API del servicio de almacenamiento. Para ver un ejemplo de cómo usar como destino una versión de la API de Storage, consulte los ejemplos sincrónicos y asincrónicos en GitHub.
Creación de una cuenta de Azure Storage y un contenedor de blobs
Cree una cuenta de Azure Storage y un contenedor de blobs en ella, para lo que debe seguir estos pasos:
- Creación de una cuenta de Azure Storage
- Creación de un contenedor de blobs
- Obtención de la cadena de conexión para una cuenta de almacenamiento
Asegúrese de registrar la cadena de conexión y el nombre del contenedor para usarlo posteriormente en el código de recepción.
Creación de un script de Python para recibir eventos
En esta sección, creará un script de Python para recibir eventos del centro de eventos:
Abra el editor de Python que prefiera, como Visual Studio Code.
Cree un script llamado recv.py.
Pegue el siguiente código en recv.py:
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore async def on_event(partition_context, event): # Print the event data. print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id)) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME") # Create a consumer client for the event hub. client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store) async with client: # Call the receive method. Read from the beginning of the partition (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") if __name__ == '__main__': loop = asyncio.get_event_loop() # Run the main method. loop.run_until_complete(main())Nota
Para ver todo el código fuente, incluidos los comentarios informativos adicionales, vaya a la página recv_with_checkpoint_store_async.py de GitHub.
Ejecución de la aplicación del receptor
Para ejecutar el script, abra un símbolo del sistema que tenga Python en su ruta de acceso y, después, ejecute el siguiente comando:
python recv.py
Ejecución de la aplicación del remitente
Para ejecutar el script, abra un símbolo del sistema que tenga Python en su ruta de acceso y, después, ejecute el siguiente comando:
python send.py
La ventana del destinatario debería mostrar los mensajes que se enviaron al centro de eventos.
Pasos siguientes
En este inicio rápido, ha enviado y recibido eventos de forma asincrónica. Para aprender a enviar y recibir eventos de forma sincrónica, vaya a la página sync_samples de GitHub.
Para ver todos los ejemplos (tanto sincrónicos como asincrónicos) de GitHub, vaya a Biblioteca cliente de Azure Event Hubs para ejemplos de Python.