Zachytávání Event Hubs dat v Azure Storage a jejich čtení pomocí Pythonu (azure-eventhub)

Centrum událostí můžete nakonfigurovat tak, aby se data odesílaná do centra událostí zachytála v účtu úložiště Azure nebo v Azure Data Lake Storage Gen 1 nebo Gen 2. Tento článek ukazuje, jak napsat kód Pythonu pro odesílání událostí do centra událostí a čtení zachycených dat z úložiště objektů blob v Azure. Další informace o této funkci najdete v tématu Event Hubs Capture.

V tomto rychlém startu se k předvedení funkce Capture používá sada Azure Python SDK. Aplikace sender.py simulovaná telemetrická data prostředí do center událostí ve formátu JSON. Centrum událostí je nakonfigurované tak, aby k zápisu dat do úložiště objektů blob v dávkách používá funkci Capture. Aplikace capturereader.py tyto objekty blob načte a pro každé zařízení vytvoří připojovací soubor. Aplikace pak data zapíše do souborů CSV.

V tomto rychlém startu:

  • Ve službě Azure Blob Storage vytvořte účet a kontejner Azure Portal.
  • Vytvořte Event Hubs oboru názvů pomocí Azure Portal.
  • Vytvořte centrum událostí s povolenou funkcí Capture a připojte ji ke svému účtu úložiště.
  • Odešlete data do centra událostí pomocí skriptu Pythonu.
  • Čtení a zpracování souborů z Event Hubs Capture pomocí jiného skriptu Pythonu.

Požadavky

Vytvoření skriptu Pythonu pro odesílání událostí do centra událostí

V této části vytvoříte skript Pythonu, který do centra událostí odešle 200 událostí (10 zařízení * 20 událostí). Tyto události jsou ukázkovým čtením prostředí, které se odesílá ve formátu JSON.

  1. Otevřete oblíbený editor Pythonu, například Visual Studio Code.

  2. Vytvořte skript s názvem sender.py.

  3. Do souboru vložte následující sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
            reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)}
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Ve skriptech nahraďte následující hodnoty:

    • Nahraďte EVENT HUBS NAMESPACE CONNECTION STRING připojovacím řetězcem pro váš Event Hubs názvů.
    • Nahraďte EVENT HUB NAME názvem vašeho centra událostí.
  5. Spusťte skript pro odesílání událostí do centra událostí.

  6. V Azure Portal můžete ověřit, že centrum událostí přijalo zprávy. V části Metriky přepněte do zobrazení Zprávy. Aktualizujte stránku a aktualizujte graf. Může trvat několik sekund, než se na stránce zobrazí, že byly přijaty zprávy.

    Ověření, že centrum událostí přijalo zprávy

Vytvoření skriptu Pythonu pro čtení zachytávání souborů

V tomto příkladu jsou zachycená data uložená ve službě Azure Blob Storage. Skript v této části načte zachycené datové soubory z vašeho účtu úložiště Azure a vygeneruje soubory CSV, abyste je mohli snadno otevřít a zobrazit. V aktuálním pracovním adresáři aplikace se zobrazí 10 souborů. Tyto soubory budou obsahovat odečty prostředí pro 10 zařízení.

  1. V editoru Pythonu vytvořte skript s názvem capturereader.py. Tento skript přečte zachycené soubory a vytvoří soubor pro každé zařízení, který bude zapisovat data pouze pro toto zařízení.

  2. Do souboru vložte následující capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Nahraďte AZURE STORAGE CONNECTION STRING připojovacím řetězcem pro váš účet úložiště Azure. Název kontejneru, který jste vytvořili v tomto rychlém startu, je capture. Pokud jste pro kontejner použili jiný název, nahraďte capture názvem kontejneru v účtu úložiště.

Spuštění skriptů

  1. Otevřete příkazový řádek, který má v cestě Python, a potom spuštěním těchto příkazů nainstalujte balíčky požadovaných součástí Pythonu:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    

    Poznámka

    Kód v tomto článku byl na těchto verzích otestován.

    • Python 3.7
    • azure-eventhub 5.2.0
    • azure-storage-blob 12.6.0
    • avro-python3 1.10.1
  2. Změňte adresář na adresář, kam jste uložili sender.py a capturereader.py, a spusťte tento příkaz:

    python sender.py
    

    Tento příkaz spustí nový proces Pythonu pro spuštění odesílatele.

  3. Počkejte několik minut, než se zachycení spustí, a pak do původního příkazového okna zadejte následující příkaz:

    python capturereader.py
    

    Tento procesor zachycení pomocí místního adresáře stáhne všechny objekty blob z účtu úložiště a kontejneru. Zpracuje všechny, které nejsou prázdné, a zapíše výsledky jako soubory CSV do místního adresáře.

Další kroky

Podívejte se na ukázky Pythonu na GitHub.