التشغيل السريع: التقاط بيانات مراكز الأحداث في Azure Storage وقراءتها باستخدام Python (azure-eventhub)
يمكنك تكوين مركز أحداث؛ بحيث يتم التقاط البيانات التي يتم إرسالها إلى مركز أحداث في حساب تخزين Azure، أو إلى Azure Data Lake Storage من الجيل الأول أو الجيل الثاني. توضح هذه المقالة كيفية كتابة تعليمة Python البرمجية لإرسال الأحداث إلى مركز أحداث وقراءة البيانات الملتقطة من تخزين Azure Blob. لمزيد من المعلومات حول هذه الميزة، راجع نظرة عامة على ميزة التقاط مراكز الأحداث.
يستخدم هذا التشغيل السريع Azure Python SDK لإظهار ميزة الالتقاط. يرسل تطبيق sender.py بيانات تتبع الاستخدام البيئية المحاكية إلى مراكز الأحداث بتنسيق JSON. يتم تكوين مركز الأحداث لاستخدام ميزة الالتقاط لكتابة هذه البيانات في مساحة تخزين Blob على دفعات. يقرأ تطبيق capturereader.py هذه الكائنات الثنائية كبيرة الحجم وينشئ ملف إلحاق لكل جهاز. ثم يكتب التطبيق البيانات في ملفات بتنسيق CSV.
في هذه البداية السريعة، قمت بـ:
- إنشاء حساب تخزين Azure Blob، وحاوية في مدخل Azure.
- إنشاء مساحة اسم لمراكز الأحداث باستخدام مدخل Azure.
- إنشاء مركز أحداث مع تمكين ميزة الالتقاط، وتوصيلها بحساب التخزين الخاص بك.
- إرسال البيانات إلى مركز الأحداث الخاص بك باستخدام برنامج نصي بلغة Python.
- قراءة الملفات ومعالجتها من ميزة الالتقاط في مراكز الأحداث باستخدام برنامج نصي آخر بلغة Python.
المتطلبات الأساسية
Python 3.8 أو أحدث، مع تثبيت pip وتحديثه.
اشتراك Azure. في حال لم يكن لديك اشتراك، أنشئ حسابًا مجانيًا قبل البدء.
مساحة اسم نشطة لمراكز الأحداث، ومركز أحداث. إنشاء مساحة اسم مراكز الأحداث ومركز أحداث في مساحة الاسم. سجّل اسم مساحة اسم مراكز الأحداث، واسم مركز الأحداث، ومفتاح الوصول الأساسي لمساحة الاسم. للحصول على مفتاح الوصول، راجع الحصول على مراكز الأحداث سلسلة الاتصال. اسم المفتاح الافتراضي هو RootManageSharedAccessKey. في هذا التشغيل السريع، لست بحاجة سوى إلى المفتاح الأساسي. لن تحتاج إلى سلسة الاتصال.
حساب تخزين في Azure، وحاوية نقاط في حساب التخزين، وسلسلة اتصال بحساب التخزين. إذا لم يكن لديك هذه العناصر، فقم بالخطوات التالية:
- أنشئ حساب خدمة في Azure
- إنشاء حاوية كائن ثنائي كبير الحجم في حساب التخزين
- الحصول على سلسة الاتصال بحساب التخزين
تأكد من تسجيل سلسلة الاتصال واسم الحاوية لاستخدامهما لاحقًا في هذا التشغيل السريع.
تمكين ميزة الالتقاط لمركز الأحداث
مكّن ميزة الالتقاط لمركز الأحداث هذا. للقيام بذلك، اتبع الإرشادات الموجودة في تمكين التقاط مراكز الأحداث باستخدام مدخل Microsoft Azure. حدد حساب التخزين وحاوية النقاط اللذين أنشأتهما في الخطوة السابقة. حدد Avro لتنسيق تسلسل حدث الإخراج.
إنشاء برنامج نصي بلغة Python لإرسال الأحداث إلى مركز الأحداث الخاص بك
في هذا القسم، تنشئ برنامجًا نصيًا بلغة Python يرسل 200 حدث (10 أجهزة × 20 حدثًا) إلى مركز أحداث. هذه الأحداث هي عينة من القراءة البيئية التي يتم إرسالها بتنسيق JSON.
افتح محرر Python المفضل لديك، مثل Visual Studio Code.
إنشاء برنامج نصي يسمى sender.py.
الصق التعليمات البرمجية التالية في sender.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()
استبدل القيم الآتية في البرامج النصية:
- استبدل
EVENT HUBS NAMESPACE CONNECTION STRING
بسلسلة الاتصال لمساحة اسم مراكز أحداث. - استبدل
EVENT HUB NAME
باسم مركز الأحداث الخاص بك.
- استبدل
شغّل البرنامج النصي لإرسال الأحداث إلى مركز الأحداث.
في مدخل Azure، يمكنك التحقق من مركز الأحداث قد استلم الرسائل. بدّل إلى طريقة عرض Messages في قسم Metrics. أعد تنشيط الصفحة لتحديث المخطط. قد تستغرق الصفحة بضع ثوانٍ حتى تعرض الرسائل التي تم استلامها.
إنشاء برنامج نصي بلغة Python لقراءة ملفات الالتقاط الخاصة بك
في هذا المثال، يتم تخزين البيانات المُلتقطة في مساحة تخزين Azure Blob. يقرأ البرنامج النصي في هذا القسم ملفات البيانات المُلتقطة من حساب تخزين Azure الخاص بك، وينشئ ملفات بتنسيق CSV لكي يسهُل عليك فتحها وعرضها. ترى 10 ملفات في دليل العمل الحالي للتطبيق. تحتوي هذه الملفات على قراءات بيئية للأجهزة العشرة.
في محرر Python، قم بإنشاء برنامج نصي يسمى capturereader.py. يقرأ هذا البرنامج النصي الملفات المُلتقطة، وينشئ ملفًا لكل جهاز لكتابة البيانات لهذا الجهاز وحده.
الصق التعليمات البرمجية التالية في capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()
استبدل
AZURE STORAGE CONNECTION STRING
سلسلة الاتصال لحساب تخزين Azure الخاص بك. اسم الحاوية التي قمت بإنشائها في هذا التشغيل السريع هو الالتقاط. إذا استخدمت اسما مختلفا للحاوية، فاستبدل capture باسم الحاوية في حساب التخزين.
تشغيل البرامج النصية
افتح مطالبة أمر تتضمن Python في مسارها، ثم شغّل هذه الأوامر لتثبيت حزم متطلبات Python الأساسية:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3
قم بتغيير الدليل إلى الدليل حيث قمت بحفظ sender.py capturereader.py، وقم بتشغيل هذا الأمر:
python sender.py
يبدأ هذا الأمر عملية Python جديدة لتشغيل المرسل.
انتظر لبضع دقائق حتى يتم تشغيل الالتقاط، ثم أدخل الأمر التالي في نافذة الأوامر الأصلية لديك:
python capturereader.py
يستخدم معالج الالتقاط هذا الدليل المحلي لتنزيل كافة النقاط من حساب التخزين والحاوية. يعالج الملفات غير الفارغة، ويكتب النتائج كملفات CSV في الدليل المحلي.
الخطوات التالية
تحقق من عينات Python على GitHub.