Zachytávání Event Hubs dat v Azure Storage a jejich čtení pomocí Pythonu (azure-eventhub)
Centrum událostí můžete nakonfigurovat tak, aby se data odesílaná do centra událostí zachytála v účtu úložiště Azure nebo v Azure Data Lake Storage Gen 1 nebo Gen 2. Tento článek ukazuje, jak napsat kód Pythonu pro odesílání událostí do centra událostí a čtení zachycených dat z úložiště objektů blob v Azure. Další informace o této funkci najdete v tématu Event Hubs Capture.
V tomto rychlém startu se k předvedení funkce Capture používá sada Azure Python SDK. Aplikace sender.py simulovaná telemetrická data prostředí do center událostí ve formátu JSON. Centrum událostí je nakonfigurované tak, aby k zápisu dat do úložiště objektů blob v dávkách používá funkci Capture. Aplikace capturereader.py tyto objekty blob načte a pro každé zařízení vytvoří připojovací soubor. Aplikace pak data zapíše do souborů CSV.
V tomto rychlém startu:
- Ve službě Azure Blob Storage vytvořte účet a kontejner Azure Portal.
- Vytvořte Event Hubs oboru názvů pomocí Azure Portal.
- Vytvořte centrum událostí s povolenou funkcí Capture a připojte ji ke svému účtu úložiště.
- Odešlete data do centra událostí pomocí skriptu Pythonu.
- Čtení a zpracování souborů z Event Hubs Capture pomocí jiného skriptu Pythonu.
Požadavky
Python s NAINSTALOVANÝM PIP a následujícími balíčky. Kód v tomto článku byl na těchto verzích otestován.
- Python 3.7
- azure-eventhub 5.2.0
- azure-storage-blob 12.6.0
- avro-python3 1.10.1
Předplatné Azure. Pokud ho nemáte, vytvořte si bezplatný účet před tím, než začnete.
Aktivní aktivní Event Hubs oboru názvů a centra událostí. Vytvořte obor názvů Event Hubs a centrum událostí v oboru názvů. Zaznamená název oboru názvů Event Hubs, název centra událostí a primární přístupový klíč pro obor názvů. Pokud chcete získat přístupový klíč, podívejte se na Event Hubs připojovací řetězec. Výchozí název klíče je RootManageSharedAccessKey. Pro tento rychlý start potřebujete pouze primární klíč. Připojovací řetězec nepotřebujete.
Účet úložiště Azure, kontejner objektů blob v účtu úložiště a připojovací řetězec k účtu úložiště. Pokud tyto položky nemáte, postupujte následovně:
- Vytvoření účtu úložiště Azure
- Vytvoření kontejneru objektů blob v účtu úložiště
- Získání připojovacího řetězce k účtu úložiště
Nezapomeňte si zaznamenat připojovací řetězec a název kontejneru pro pozdější použití v tomto rychlém startu.
Povolte funkci Zachytávání pro centrum událostí. Postupujte podle pokynů v tématu Povolení Event Hubs Capture pomocí Azure Portal. Vyberte účet úložiště a kontejner objektů blob, které jste vytvořili v předchozím kroku. Tuto funkci můžete povolit také při vytváření centra událostí.
Vytvoření skriptu Pythonu pro odesílání událostí do centra událostí
V této části vytvoříte skript Pythonu, který do centra událostí odešle 200 událostí (10 zařízení * 20 událostí). Tyto události jsou ukázkovým čtením prostředí, které se odesílá ve formátu JSON.
Otevřete oblíbený editor Pythonu, například Visual Studio Code.
Vytvořte skript s názvem sender.py.
Do souboru vložte následující sender.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)} s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()Ve skriptech nahraďte následující hodnoty:
- Nahraďte
EVENT HUBS NAMESPACE CONNECTION STRINGpřipojovacím řetězcem pro váš Event Hubs názvů. - Nahraďte
EVENT HUB NAMEnázvem vašeho centra událostí.
- Nahraďte
Spusťte skript pro odesílání událostí do centra událostí.
V Azure Portal můžete ověřit, že centrum událostí přijalo zprávy. V části Metriky přepněte do zobrazení Zprávy. Aktualizujte stránku a aktualizujte graf. Může trvat několik sekund, než se na stránce zobrazí, že byly přijaty zprávy.
Vytvoření skriptu Pythonu pro čtení zachytávání souborů
V tomto příkladu jsou zachycená data uložená ve službě Azure Blob Storage. Skript v této části načte zachycené datové soubory z vašeho účtu úložiště Azure a vygeneruje soubory CSV, abyste je mohli snadno otevřít a zobrazit. V aktuálním pracovním adresáři aplikace se zobrazí 10 souborů. Tyto soubory budou obsahovat odečty prostředí pro 10 zařízení.
V editoru Pythonu vytvořte skript s názvem capturereader.py. Tento skript přečte zachycené soubory a vytvoří soubor pro každé zařízení, který bude zapisovat data pouze pro toto zařízení.
Do souboru vložte následující capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()Nahraďte
AZURE STORAGE CONNECTION STRINGpřipojovacím řetězcem pro váš účet úložiště Azure. Název kontejneru, který jste vytvořili v tomto rychlém startu, je capture. Pokud jste pro kontejner použili jiný název, nahraďte capture názvem kontejneru v účtu úložiště.
Spuštění skriptů
Otevřete příkazový řádek, který má v cestě Python, a potom spuštěním těchto příkazů nainstalujte balíčky požadovaných součástí Pythonu:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3Poznámka
Kód v tomto článku byl na těchto verzích otestován.
- Python 3.7
- azure-eventhub 5.2.0
- azure-storage-blob 12.6.0
- avro-python3 1.10.1
Změňte adresář na adresář, kam jste uložili sender.py a capturereader.py, a spusťte tento příkaz:
python sender.pyTento příkaz spustí nový proces Pythonu pro spuštění odesílatele.
Počkejte několik minut, než se zachycení spustí, a pak do původního příkazového okna zadejte následující příkaz:
python capturereader.pyTento procesor zachycení pomocí místního adresáře stáhne všechny objekty blob z účtu úložiště a kontejneru. Zpracuje všechny, které nejsou prázdné, a zapíše výsledky jako soubory CSV do místního adresáře.
Další kroky
Podívejte se na ukázky Pythonu na GitHub.
