Hızlı Başlangıç: Azure Depolama Event Hubs verilerini yakalama ve Python (azure-eventhub) kullanarak okuma

Olay hub'ını yapılandırarak olay hub'ına gönderilen verilerin bir Azure depolama hesabında veya Azure Data Lake Depolama 1. Nesil veya 2. Nesil'de yakalanmasını sağlayabilirsiniz. Bu makalede, bir olay hub'ına olay göndermek ve Azure Blob depolamadan yakalanan verileri okumak için Python kodu yazma işlemi gösterilmektedir. Bu özellik hakkında daha fazla bilgi için bkz . Event Hubs Yakalama özelliğine genel bakış.

Bu hızlı başlangıçta Yakalama özelliğini göstermek için Azure Python SDK'sı kullanılmaktadır. sender.py uygulaması JSON biçiminde olay hub'larına sanal ortam telemetrisi gönderir. Olay hub'ı, bu verileri toplu olarak Blob depolamaya yazmak için Yakalama özelliğini kullanacak şekilde yapılandırılmıştır. capturereader.py uygulaması bu blobları okur ve her cihaz için bir ekleme dosyası oluşturur. Uygulama daha sonra verileri CSV dosyalarına yazar.

Bu hızlı başlangıçta:

  • Azure portalında bir Azure Blob depolama hesabı ve kapsayıcısı oluşturun.
  • Azure portalını kullanarak bir Event Hubs ad alanı oluşturun.
  • Yakalama özelliğinin etkinleştirildiği bir olay hub'ı oluşturun ve depolama hesabınıza bağlayın.
  • Python betiği kullanarak olay hub'ınıza veri gönderin.
  • Başka bir Python betiği kullanarak Event Hubs Capture'tan dosyaları okuyun ve işleyin.

Önkoşullar

Olay hub'ı için Yakalama özelliğini etkinleştirme

Olay hub'ı için Yakalama özelliğini etkinleştirin. Bunu yapmak için Azure portalını kullanarak Event Hubs Yakalamayı Etkinleştirme başlığındaki yönergeleri izleyin. Önceki adımda oluşturduğunuz depolama hesabını ve blob kapsayıcısını seçin. Çıkış olayı serileştirme biçimi için Avro'ya tıklayın.

Olay hub'ınıza olay göndermek için Python betiği oluşturma

Bu bölümde, bir olay hub'ına 200 olay (10 cihaz * 20 olay) gönderen bir Python betiği oluşturacaksınız. Bu olaylar, JSON biçiminde gönderilen örnek bir ortam okumasıdır.

  1. Visual Studio Code gibi sık kullandığınız Python düzenleyicisini açın.

  2. sender.py adlı bir betik oluşturun.

  3. Aşağıdaki kodu sender.py yapıştırın.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
        reading = {
                'id': dev, 
                'timestamp': str(datetime.datetime.utcnow()), 
                'uv': random.random(), 
                'temperature': random.randint(70, 100), 
                'humidity': random.randint(70, 100)
            }
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Betiklerde aşağıdaki değerleri değiştirin:

    • değerini Event Hubs ad alanınızın bağlantı dizesi ile değiştirinEVENT HUBS NAMESPACE CONNECTION STRING.
    • değerini olay hub'ınızın adıyla değiştirin EVENT HUB NAME .
  5. Olay hub'ına olay göndermek için betiği çalıştırın.

  6. Azure portalında, olay hub'ında iletilerin alındığını doğrulayabilirsiniz. Ölçümler bölümünde İletiler görünümüne geçin. Grafiği güncelleştirmek için sayfayı yenileyin. Sayfanın iletilerin alındığını görüntülemesi birkaç saniye sürebilir.

    Verify that the event hub received the messages

Capture dosyalarınızı okumak için Python betiği oluşturma

Bu örnekte, yakalanan veriler Azure Blob depolamada depolanır. Bu bölümdeki betik, Azure depolama hesabınızdan yakalanan veri dosyalarını okur ve kolayca açıp görüntüleyebilmek için CSV dosyaları oluşturur. Uygulamanın geçerli çalışma dizininde 10 dosya görürsünüz. Bu dosyalar 10 cihaz için ortam okumalarını içerir.

  1. Python düzenleyicinizde capturereader.py adlı bir betik oluşturun. Bu betik, yakalanan dosyaları okur ve her cihaz için yalnızca bu cihaz için veri yazmak üzere bir dosya oluşturur.

  2. Aşağıdaki kodu capturereader.py yapıştırın.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. değerini Azure depolama hesabınızın bağlantı dizesi ile değiştirinAZURE STORAGE CONNECTION STRING. Bu hızlı başlangıçta oluşturduğunuz kapsayıcının adı yakalamadır. Kapsayıcı için farklı bir ad kullandıysanız capture değerini depolama hesabındaki kapsayıcının adıyla değiştirin.

Betikleri çalıştırma

  1. Yolunda Python bulunan bir komut istemi açın ve python önkoşul paketlerini yüklemek için şu komutları çalıştırın:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. Dizininizi, sender.py ve capturereader.py kaydettiğiniz dizinle değiştirin ve şu komutu çalıştırın:

    python sender.py
    

    Bu komut, göndereni çalıştırmak için yeni bir Python işlemi başlatır.

  3. Yakalamanın çalışması için birkaç dakika bekleyin ve ardından özgün komut pencerenize aşağıdaki komutu girin:

    python capturereader.py
    

    Bu yakalama işlemcisi, depolama hesabından ve kapsayıcısından tüm blobları indirmek için yerel dizini kullanır. Boş olmayan dosyaları işler ve sonuçları yerel dizine CSV dosyaları olarak yazar.

Sonraki adımlar

GitHub'da Python örneklerine göz atın.