Capture datos de Event Hubs en Azure Storage y léalos mediante Python (azure-eventhub versión 5)Capture Event Hubs data in Azure Storage and read it by using Python (azure-eventhub version 5)

Puede configurar un centro de eventos para que los datos que se envíen al mismo se capturen en una cuenta de Azure Storage o en Azure Data Lake Storage Gen 1 o Gen 2.You can configure an event hub so that the data that's sent to an event hub is captured in an Azure storage account or Azure Data Lake Storage Gen 1 or Gen 2. En este artículo se muestra cómo escribir código de Python para enviar eventos a un centro de eventos y cómo leer los datos capturados de Azure Blob Storage.This article shows you how to write Python code to send events to an event hub and read the captured data from Azure Blob storage. Para más información sobre esta característica, consulte la introducción a la característica de captura de Event Hubs.For more information about this feature, see Event Hubs Capture feature overview.

Este inicio rápido usa el SDK de Azure Python para mostrar la característica de captura.This quickstart uses the Azure Python SDK to demonstrate the Capture feature. La aplicación sender.py envía datos telemétricos del entorno simulados a Event Hubs en formato JSON.The sender.py app sends simulated environmental telemetry to event hubs in JSON format. El centro de eventos está configurado para usar la característica Capture para escribir estos datos en Blob Storage en lotes.The event hub is configured to use the Capture feature to write this data to Blob storage in batches. La aplicación capturereader.py lee estos blobs y crea un archivo de anexos para cada dispositivo.The capturereader.py app reads these blobs and creates an append file for each device. Luego, la aplicación escribe los datos en archivos .CSV.The app then writes the data into CSV files.

En esta guía de inicio rápido:In this quickstart, you:

  • Crear una cuenta de Azure Blob Storage y un contenedor en Azure Portal.Create an Azure Blob storage account and container in the Azure portal.
  • Crear un espacio de nombres de Event Hubs mediante Azure Portal.Create an Event Hubs namespace by using the Azure portal.
  • Cree un centro de eventos con la característica de Captura habilitada y conéctelo a su cuenta de almacenamiento.Create an event hub with the Capture feature enabled and connect it to your storage account.
  • Enviar datos al centro de eventos con un script de Python.Send data to your event hub by using a Python script.
  • Leer y procesar los archivos de Capture de Event Hubs con otro script de Python.Read and process files from Event Hubs Capture by using another Python script.

Requisitos previosPrerequisites

Creación de un script de Python para enviar eventos a un centro de eventosCreate a Python script to send events to your event hub

En esta sección, creará un script de Python que envía 200 eventos (10 dispositivos x 20 eventos) a un centro de eventos.In this section, you create a Python script that sends 200 events (10 devices * 20 events) to an event hub. Estos eventos son una lectura del entorno de ejemplo que se envía en formato JSON.These events are a sample environmental reading that's sent in JSON format.

  1. Abra el editor de Python que prefiera, como Visual Studio Code.Open your favorite Python editor, such as Visual Studio Code.

  2. Crear un script denominado sender.py.Create a script called sender.py.

  3. Pegue el código siguiente en sender.py.Paste the following code into sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
            reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)}
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Reemplace los valores siguientes en los scripts:Replace the following values in the scripts:

    • Reemplace EVENT HUBS NAMESPACE CONNECTION STRING por la cadena de conexión para el espacio de nombres de Event Hubs.Replace EVENT HUBS NAMESPACE CONNECTION STRING with the connection string for your Event Hubs namespace.
    • Reemplace EVENT HUB NAME por el nombre del centro de eventos.Replace EVENT HUB NAME with the name of your event hub.
  5. Ejecute el script para enviar eventos al centro de eventos.Run the script to send events to the event hub.

  6. En Azure Portal, puede comprobar que el centro de eventos ha recibido los mensajes.In the Azure portal, you can verify that the event hub has received the messages. Cambie a la vista Mensajes en la sección Métricas.Switch to Messages view in the Metrics section. Actualice la página para actualizar el gráfico.Refresh the page to update the chart. Pueden pasar unos segundos hasta que aparezca en la página que los mensajes se han recibido.It might take a few seconds for the page to display that the messages have been received.

    Comprobación de que el centro de eventos ha recibido los mensajesVerify that the event hub received the messages

Creación de un script de Python que lea archivos de CaptureCreate a Python script to read your Capture files

En este ejemplo, los datos capturados se almacenan en Azure Blob Storage.In this example, the captured data is stored in Azure Blob storage. El script de esta sección lee los archivos de datos capturados de la cuenta de Azure Storage y genera archivos CSV que podrá abrir y ver fácilmente.The script in this section reads the captured data files from your Azure storage account and generates CSV files for you to easily open and view. Verá 10 archivos en el directorio de trabajo actual de la aplicación.You will see 10 files in the current working directory of the application. Estos archivos contendrán las lecturas de entorno para los 10 dispositivos.These files will contain the environmental readings for the 10 devices.

  1. En el editor de Python, cree un script denominado capturereader.py.In your Python editor, create a script called capturereader.py. Este script lee los archivos capturados y crea un archivo para cada dispositivo, con el fin de escribir los datos solo de dicho dispositivo.This script reads the captured files and creates a file for each device to write the data only for that device.

  2. Pegue el código siguiente en capturereader.py.Paste the following code into capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Reemplace AZURE STORAGE CONNECTION STRING por la cadena de conexión de su cuenta de Azure Storage.Replace AZURE STORAGE CONNECTION STRING with the connection string for your Azure storage account. El nombre del contenedor que creó en este inicio rápido es capture.The name of the container you created in this quickstart is capture. Si ha usado otro nombre para el contenedor, reemplace capture por el nombre del contenedor en la cuenta de almacenamiento.If you used a different name for the container, replace capture with the name of the container in the storage account.

Ejecución de los scriptsRun the scripts

  1. Abra un símbolo del sistema que tiene Python en su ruta de acceso y, después, ejecute dichos comandos para instalar los paquetes de requisitos previos de Python:Open a command prompt that has Python in its path, and then run these commands to install Python prerequisite packages:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. Vaya al directorio en que guardó sender.py y capturereader.py y ejecute este comando:Change your directory to the directory where you saved sender.py and capturereader.py, and run this command:

    python sender.py
    

    Este comando inicia un nuevo proceso de Python para ejecutar el remitente.This command starts a new Python process to run the sender.

  3. Espere unos minutos a que se ejecute la captura y, después, escriba el siguiente comando en la ventana de comandos original:Wait a few minutes for the capture to run, and then enter the following command in your original command window:

    python capturereader.py
    

    Este procesador de captura usa el directorio local para descargar todos los blobs de la cuenta de almacenamiento y del contenedor.This capture processor uses the local directory to download all the blobs from the storage account and container. Procesa los que no estén vacíos y escribe los resultados en forma de archivos .CSV en el directorio local.It processes any that are not empty, and it writes the results as CSV files into the local directory.

Pasos siguientesNext steps

Consulte los ejemplos de Python en GitHub.Check out Python samples on GitHub.