التشغيل السريع: التقاط بيانات مراكز الأحداث في Azure Storage وقراءتها باستخدام Python (azure-eventhub)

يمكنك تكوين مركز أحداث؛ بحيث يتم التقاط البيانات التي يتم إرسالها إلى مركز أحداث في حساب تخزين Azure، أو إلى Azure Data Lake Storage من الجيل الأول أو الجيل الثاني. توضح هذه المقالة كيفية كتابة تعليمة Python البرمجية لإرسال الأحداث إلى مركز أحداث وقراءة البيانات الملتقطة من تخزين Azure Blob. لمزيد من المعلومات حول هذه الميزة، راجع نظرة عامة على ميزة التقاط مراكز الأحداث.

يستخدم هذا التشغيل السريع Azure Python SDK لإظهار ميزة الالتقاط. يرسل تطبيق sender.py بيانات تتبع الاستخدام البيئية المحاكية إلى مراكز الأحداث بتنسيق JSON. يتم تكوين مركز الأحداث لاستخدام ميزة الالتقاط لكتابة هذه البيانات في مساحة تخزين Blob على دفعات. يقرأ تطبيق capturereader.py هذه الكائنات الثنائية كبيرة الحجم وينشئ ملف إلحاق لكل جهاز. ثم يكتب التطبيق البيانات في ملفات بتنسيق CSV.

في هذه البداية السريعة، قمت بـ:

  • إنشاء حساب تخزين Azure Blob، وحاوية في مدخل Azure.
  • إنشاء مساحة اسم لمراكز الأحداث باستخدام مدخل Azure.
  • إنشاء مركز أحداث مع تمكين ميزة الالتقاط، وتوصيلها بحساب التخزين الخاص بك.
  • إرسال البيانات إلى مركز الأحداث الخاص بك باستخدام برنامج نصي بلغة Python.
  • قراءة الملفات ومعالجتها من ميزة الالتقاط في مراكز الأحداث باستخدام برنامج نصي آخر بلغة Python.

المتطلبات الأساسية

تمكين ميزة الالتقاط لمركز الأحداث

مكّن ميزة الالتقاط لمركز الأحداث هذا. للقيام بذلك، اتبع الإرشادات الموجودة في تمكين التقاط مراكز الأحداث باستخدام مدخل Microsoft Azure. حدد حساب التخزين وحاوية النقاط اللذين أنشأتهما في الخطوة السابقة. حدد Avro لتنسيق تسلسل حدث الإخراج.

إنشاء برنامج نصي بلغة Python لإرسال الأحداث إلى مركز الأحداث الخاص بك

في هذا القسم، تنشئ برنامجًا نصيًا بلغة Python يرسل 200 حدث (10 أجهزة × 20 حدثًا) إلى مركز أحداث. هذه الأحداث هي عينة من القراءة البيئية التي يتم إرسالها بتنسيق JSON.

  1. افتح محرر Python المفضل لديك، مثل Visual Studio Code.

  2. إنشاء برنامج نصي يسمى sender.py.

  3. الصق التعليمات البرمجية التالية في sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
        reading = {
                'id': dev, 
                'timestamp': str(datetime.datetime.utcnow()), 
                'uv': random.random(), 
                'temperature': random.randint(70, 100), 
                'humidity': random.randint(70, 100)
            }
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. استبدل القيم الآتية في البرامج النصية:

    • استبدل EVENT HUBS NAMESPACE CONNECTION STRING بسلسلة الاتصال لمساحة اسم مراكز أحداث.
    • استبدل EVENT HUB NAME باسم مركز الأحداث الخاص بك.
  5. شغّل البرنامج النصي لإرسال الأحداث إلى مركز الأحداث.

  6. في مدخل Azure، يمكنك التحقق من مركز الأحداث قد استلم الرسائل. بدّل إلى طريقة عرض Messages في قسم Metrics. أعد تنشيط الصفحة لتحديث المخطط. قد تستغرق الصفحة بضع ثوانٍ حتى تعرض الرسائل التي تم استلامها.

    Verify that the event hub received the messages

إنشاء برنامج نصي بلغة Python لقراءة ملفات الالتقاط الخاصة بك

في هذا المثال، يتم تخزين البيانات المُلتقطة في مساحة تخزين Azure Blob. يقرأ البرنامج النصي في هذا القسم ملفات البيانات المُلتقطة من حساب تخزين Azure الخاص بك، وينشئ ملفات بتنسيق CSV لكي يسهُل عليك فتحها وعرضها. ترى 10 ملفات في دليل العمل الحالي للتطبيق. تحتوي هذه الملفات على قراءات بيئية للأجهزة العشرة.

  1. في محرر Python، قم بإنشاء برنامج نصي يسمى capturereader.py. يقرأ هذا البرنامج النصي الملفات المُلتقطة، وينشئ ملفًا لكل جهاز لكتابة البيانات لهذا الجهاز وحده.

  2. الصق التعليمات البرمجية التالية في capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. استبدل AZURE STORAGE CONNECTION STRING سلسلة الاتصال لحساب تخزين Azure الخاص بك. اسم الحاوية التي قمت بإنشائها في هذا التشغيل السريع هو الالتقاط. إذا استخدمت اسما مختلفا للحاوية، فاستبدل capture باسم الحاوية في حساب التخزين.

تشغيل البرامج النصية

  1. افتح مطالبة أمر تتضمن Python في مسارها، ثم شغّل هذه الأوامر لتثبيت حزم متطلبات Python الأساسية:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. قم بتغيير الدليل إلى الدليل حيث قمت بحفظ sender.py capturereader.py، وقم بتشغيل هذا الأمر:

    python sender.py
    

    يبدأ هذا الأمر عملية Python جديدة لتشغيل المرسل.

  3. انتظر لبضع دقائق حتى يتم تشغيل الالتقاط، ثم أدخل الأمر التالي في نافذة الأوامر الأصلية لديك:

    python capturereader.py
    

    يستخدم معالج الالتقاط هذا الدليل المحلي لتنزيل كافة النقاط من حساب التخزين والحاوية. يعالج الملفات غير الفارغة، ويكتب النتائج كملفات CSV في الدليل المحلي.

الخطوات التالية

تحقق من عينات Python على GitHub.