Azure Event Hubs verileri yakalama Depolama Python kullanarak okuma (azure-eventhub)

Olay hub'larını bir olay hub' a gönderilen verilerin bir Azure depolama hesabında veya Azure Data Lake Depolama 1. Nesil'de veya 2. Nesil'de yakalanması için yapılandırabilirsiniz. Bu makalede bir olay hub'ını olay hub'ını göndermek ve yakalanan verileri Azure Blob depolamadan okumak için Python kodu yazma hakkında bilgi ve bilgiler yer alır. Bu özellik hakkında daha fazla bilgi için bkz. Event Hubs özelliğine genel bakış.

Bu hızlı başlangıçta Yakalama özelliğini göstermek için Azure Python SDK'sı lanmıştır. Uygulama sender.py, olay hub'larına JSON biçiminde sanal ortam telemetrisi gönderir. Olay hub'ı, bu verileri toplu olarak Blob depolamaya yazmak için Yakalama özelliğini kullanmak üzere yapılandırılmıştır. Uygulama capturereader.py bu blobları okur ve her cihaz için bir ekleme dosyası oluşturur. Uygulama daha sonra verileri CSV dosyalarına yazar.

Bu hızlı başlangıçta:

  • Azure Blob depolama hesabı ve kapsayıcısı oluşturmak için Azure portal.
  • Event Hubs kullanarak bir ad alanı Azure portal.
  • Yakalama özelliğinin etkin olduğu bir olay hub'ı oluşturun ve bunu depolama hesabınıza bağlayın.
  • Python betiği kullanarak olay hub'ınıza veri gönderme.
  • Başka bir Python betiği Event Hubs Capture'dan dosyaları okuma ve işleme.

Önkoşullar

  • PIP ve aşağıdaki paketlerin yüklü olduğu Python. Bu makaledeki kod, bu sürümlere karşı test edilmiştir.

    • Python 3.7
    • azure-eventhub 5.2.0
    • azure-storage-blob 12.6.0
    • avro-python3 1.10.1
  • Azure aboneliği. Hesabınız yoksa başlamadan önce ücretsiz bir hesap oluşturun.

  • Etkin bir Event Hubs ve olay hub'ı. ad Event Hubs bir olay hub'ı oluşturun. Event Hubs alanının adını, olay hub'ını ve ad alanının birincil erişim anahtarını girin. Erişim anahtarını almak için bkz. Bağlantı Event Hubs dizesini al. Varsayılan anahtar adı RootManageSharedAccessKey'dir. Bu hızlı başlangıç için yalnızca birincil anahtar gerekir. Bağlantı dizesine ihtiyacınız yok.

  • Azure depolama hesabı, depolama hesabı içinde bir blob kapsayıcısı ve depolama hesabına bağlantı dizesi. Bu öğelere sahip değilsanız, şunları yapın:

    1. Azure depolama hesabı oluşturma
    2. Depolama hesabında blob kapsayıcısı oluşturma
    3. Depolama hesabına bağlantı dizesini al

    Bu hızlı başlangıçta daha sonra kullanmak üzere bağlantı dizesini ve kapsayıcı adını kaydetmeyi deneyin.

  • Olay hub'ı için Yakalama özelliğini etkinleştirin. Bunu yapmak için, Event Hubs Capture'ı Azure portal. Önceki adımda oluşturduğunuz depolama hesabını ve blob kapsayıcıyı seçin. Özelliği bir olay hub'ı oluşturma sırasında da etkinleştirebilirsiniz.

Olay hub'ınıza olay göndermek için Python betiği oluşturma

Bu bölümde, bir olay hub'sine 200 olay (10 cihaz * 20 olay) gönderen bir Python betiği oluşturabilirsiniz. Bu olaylar, JSON biçiminde gönderilen örnek bir ortam okumadır.

  1. gibi sık kullanılan Python düzenleyicinizi Visual Studio Code.

  2. sender.py.

  3. Aşağıdaki kodu sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
            reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)}
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Betiklerde aşağıdaki değerleri değiştirin:

    • yerine EVENT HUBS NAMESPACE CONNECTION STRING ad alanınız için bağlantı dizesini Event Hubs değiştirin.
    • yerine EVENT HUB NAME olay hub'nizin adını yazın.
  5. Olay hub'a olay göndermek için betiği çalıştırın.

  6. Bu Azure portal, olay hub' ın iletileri almış olduğunu doğrularsınız. Ölçümler bölümünde İletiler görünümüne geçiş. Grafiği güncelleştirmek için sayfayı yenileyin. Sayfanın iletilerin alınmıştır görüntülemesi birkaç saniye sürebilir.

    Olay hub' ın iletileri aldıklarını doğrulayın

Yakalama dosyalarınızı okumak için Python betiği oluşturma

Bu örnekte yakalanan veriler Azure Blob depolama alanında depolanır. Bu bölümdeki betik, Yakalanan veri dosyalarını Azure depolama hesabından okur ve kolayca açıp görüntülemeniz için CSV dosyaları üretir. Uygulamanın geçerli çalışma dizininde 10 dosya görüyorsunuz. Bu dosyalar 10 cihaz için ortam okumalarını içerir.

  1. Python düzenleyicide, capturereader.py adlı bir betik oluşturun. Bu betik, yakalanan dosyaları okur ve her cihaz için yalnızca o cihaza veri yazmak için bir dosya oluşturur.

  2. Aşağıdaki kodu capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. yerine AZURE STORAGE CONNECTION STRING Azure depolama hesabınız için bağlantı dizesini kullanın. Bu hızlı başlangıçta oluşturduğunuz kapsayıcının adı yakalamadır. Kapsayıcı için farklı bir ad kullandıysanız capture ifadesini depolama hesabı içinde kapsayıcının adıyla değiştirin.

Betikleri çalıştırma

  1. Yolunda Python olan bir komut istemi açın ve ardından Python önkoşul paketlerini yüklemek için şu komutları çalıştırın:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    

    Not

    Bu makaledeki kod, bu sürümlere karşı test edilmiştir.

    • Python 3.7
    • azure-eventhub 5.2.0
    • azure-storage-blob 12.6.0
    • avro-python3 1.10.1
  2. Dizininizi, dosyanızı ve dosyanızı sender.py capturereader.py dizinine capturereader.py şu komutu çalıştırın:

    python sender.py
    

    Bu komut, göndereni çalıştırmak için yeni bir Python işlemi başlatır.

  3. Yakalamanın çalışması için birkaç dakika bekleyin ve ardından özgün komut pencerenize aşağıdaki komutu girin:

    python capturereader.py
    

    Bu yakalama işlemcisi, depolama hesabı ve kapsayıcıdan tüm blobları indirmek için yerel dizini kullanır. Boş olan tüm dosyaları işler ve sonuçları yerel dizine CSV dosyaları olarak yazar.

Sonraki adımlar

üzerinde Python örneklerine göz GitHub.