Samla Event Hubs data i Azure Storage och läsa dem med hjälp av Python (azure-eventhub)

Du kan konfigurera en händelsehubb så att de data som skickas till en händelsehubb samlas in i ett Azure-lagringskonto eller Azure Data Lake Storage Gen 1 eller Gen 2. Den här artikeln visar hur du skriver Python-kod för att skicka händelser till en händelsehubb och läsa inlästa data från Azure Blob Storage. Mer information om den här funktionen finns i Event Hubs Översikt över capture-funktioner.

Den här snabbstarten använder Azure Python SDK för att demonstrera capture-funktionen. Appen sender.py skickar simulerad telemetri om miljön till händelsehubben i JSON-format. Händelsehubben är konfigurerad för att använda capture-funktionen för att skriva dessa data till Blob Storage i batchar. Appen capturereader.py läser dessa blobar och skapar en tilläggsfil för varje enhet. Appen skriver sedan data till CSV-filer.

I den här snabbstarten kommer du att göra följande:

  • Skapa ett Azure Blob Storage-konto och en container i Azure Portal.
  • Skapa ett Event Hubs namnområde med hjälp av Azure Portal.
  • Skapa en händelsehubb med funktionen Capture aktiverad och anslut den till ditt lagringskonto.
  • Skicka data till din händelsehubb med hjälp av ett Python-skript.
  • Läsa och bearbeta filer från Event Hubs Capture med hjälp av ett annat Python-skript.

Förutsättningar

Skapa ett Python-skript för att skicka händelser till din händelsehubb

I det här avsnittet skapar du ett Python-skript som skickar 200 händelser (10 enheter * 20 händelser) till en händelsehubb. Dessa händelser är exempel på miljöläsning som skickas i JSON-format.

  1. Öppna din Python-favoritredigerare, till exempel Visual Studio Code.

  2. Skapa ett skript med namnet sender.py.

  3. Klistra in följande kod i sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
            reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)}
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Ersätt följande värden i skripten:

    • Ersätt EVENT HUBS NAMESPACE CONNECTION STRING med anslutningssträngen för Event Hubs namnområdet.
    • Ersätt EVENT HUB NAME med namnet på din händelsehubb.
  5. Kör skriptet för att skicka händelser till händelsehubben.

  6. I Azure Portal kan du kontrollera att händelsehubben har tagit emot meddelandena. Växla till vyn Meddelanden i avsnittet Mått. Uppdatera sidan för att uppdatera diagrammet. Det kan ta några sekunder för sidan att visa att meddelandena har tagits emot.

    Kontrollera att händelsehubben tog emot meddelandena

Skapa ett Python-skript för att läsa dina capture-filer

I det här exemplet lagras infångade data i Azure Blob Storage. Skriptet i det här avsnittet läser in avbildade datafiler från ditt Azure-lagringskonto och genererar CSV-filer så att du enkelt kan öppna och visa dem. Du ser 10 filer i den aktuella arbetskatalogen för programmet. De här filerna innehåller miljöavläsningarna för de 10 enheterna.

  1. I Python-redigeraren skapar du ett skript med namnet capturereader.py. Det här skriptet läser de avbildade filerna och skapar en fil för varje enhet för att endast skriva data för den enheten.

  2. Klistra in följande kod i capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Ersätt AZURE STORAGE CONNECTION STRING med anslutningssträngen för ditt Azure Storage-konto. Namnet på containern som du skapade i den här snabbstarten är capture. Om du använde ett annat namn för containern ersätter du capture med namnet på containern i lagringskontot.

Kör skripten

  1. Öppna en kommandotolk som har Python i sökvägen och kör sedan följande kommandon för att installera nödvändiga Python-paket:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    

    Anteckning

    Koden i den här artikeln har testats mot dessa versioner.

    • Python 3.7
    • azure-eventhub 5.2.0
    • azure-storage-blob 12.6.0
    • avro-python3 1.10.1
  2. Ändra katalogen till den katalog där du sparade sender.py capturereader.py och kör följande kommando:

    python sender.py
    

    Det här kommandot startar en ny Python-process för att köra avsändaren.

  3. Vänta några minuter tills avfångstfilen har körts och ange sedan följande kommando i det ursprungliga kommandofönstret:

    python capturereader.py
    

    Den här avbildningsprocessorn använder den lokala katalogen för att ladda ned alla blobar från lagringskontot och containern. Den bearbetar alla som inte är tomma och skriver resultatet som CSV-filer till den lokala katalogen.

Nästa steg

Kolla in Python-exempel på GitHub.