تحليل بيانات JSON وAvro في Azure Stream Analytics
يدعم Azure Stream Analytics معالجة الأحداث بتنسيقات بيانات CSV وJSON وAvro. يمكن هيكلة بيانات JSON وAvro وتحتوي على بعض الأنواع المعقدة مثل الكائنات (السجلات) والمصفوفات المتداخلة.
ملاحظة
تستخدم ملفات AVRO التي تم إنشاؤها بواسطة Event Hub Capture تنسيقًا معينًا يتطلب منك استخدام ميزة إلغاء التسلسل المخصص. لمزيد من المعلومات، راجع قراءة الإدخال بأي تنسيق باستخدام أدوات إلغاء التسلسل المخصصة لـ .NET.
أنواع بيانات السجل
تُستخدم أنواع بيانات السجل لتمثيل مصفوفات JSON وAvro عند استخدام التنسيقات المقابلة في تدفقات بيانات الإدخال. توضح هذه الأمثلة مستشعر عينة، والذي يقرأ أحداث الإدخال بتنسيق JSON. فيما يلي مثال على حدث واحد:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
الوصول إلى الحقول المتداخلة في المخطط المعروف
استخدم تدوين النقطة (.) للوصول بسهولة إلى الحقول المتداخلة مباشرة من استعلامك. على سبيل المثال، يحدد هذا الاستعلام إحداثيات خطوط الطول والعرض ضمن خاصية الموقع في بيانات JSON السابقة. يمكن استخدام التدوين النقطي للتنقل بين مستويات متعددة كما هو موضح أدناه.
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
النتيجة هي:
DeviceID | Lat | Long | درجة الحرارة | إصدار |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
حدد جميع الخصائص
يمكنك تحديد جميع خصائص السجل المتداخل باستخدام حرف البدل "*". انظر في المثال التالي:
SELECT
DeviceID,
Location.*
FROM input
النتيجة هي:
DeviceID | Lat | Long |
---|---|---|
12345 | 47 | 122 |
الوصول إلى الحقول المتداخلة عندما يكون اسم الخاصية متغيرًا
استخدم الدالة GetRecordPropertyValue إذا كان اسم الخاصية متغير. يسمح هذا بإنشاء استعلامات ديناميكية دون ترميز أسماء الخصائص.
على سبيل المثال، تخيل أن تدفق بيانات العينة يحتاج إلى أن ينضم مع البيانات المرجعية التي تحتوي على حدود لكل جهاز استشعار. يُعرض مقتطف من هذه البيانات المرجعية أدناه.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
الهدف هنا هو الانضمام إلى عينة مجموعة البيانات من أعلى المقالة إلى تلك البيانات المرجعية، وإخراج حدث واحد لكل مقياس استشعار أعلى من حده. وهذا يعني أن حدثنا الفردي أعلاه يمكن أن يولد أحداث إخراج متعددة إذا كانت أجهزة الاستشعار المتعددة أعلى من الحدود الخاصة بها، وذلك بفضل الصلة. لتحقيق نتائج مماثلة دون الانضمام، راجع القسم أدناه.
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
يحدد GetRecordPropertyValue الخاصية في SensorReadings، والذي يطابق الاسم اسم الخاصية الوارد من البيانات المرجعية. ثم يتم استخراج القيمة المقترنة من SensorReadings.
النتيجة هي:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | الرطوبة | تنبيه : أداة استشعار فوق الحد |
تحويل حقول التسجيل إلى أحداث منفصلة
لتحويل حقول السجلات إلى أحداث منفصلة، استخدم عامل التشغيل APPLY باستخدام الدالة GetRecordProperties.
باستخدام بيانات العينة الأصلية، يمكن استخدام الاستعلام التالي لاستخراج الخصائص في أحداث مختلفة.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
النتيجة هي:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | درجة الحرارة | 80 |
12345 | الرطوبة | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
باستخدام WITH، من الممكن بعد ذلك توجيه هذه الأحداث إلى وجهات مختلفة:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
تحليل سجل JSON في بيانات مرجع SQL
عند استخدام قاعدة بيانات Azure SQL كبيانات مرجعية في وظيفتك، من الممكن أن يكون لديك عمود يحتوي على بيانات بتنسيق JSON. إليك مثال موضح أدناه.
DeviceID | البيانات |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
يمكنك تحليل سجل JSON في عمود البيانات عن طريق كتابة دالة JavaScript بسيطة معرفة بواسطة المستخدم.
function parseJson(string) {
return JSON.parse(string);
}
يمكنك بعد ذلك إنشاء خطوة في استعلام Stream Analytics كما هو موضح أدناه للوصول إلى حقول سجلات JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
أنواع بيانات المصفوفة
أنواع بيانات المصفوفة هي مجموعة مرتبة من القيم. بعض العمليات النموذجية على قيم المصفوفة مفصلة أدناه. تستخدم هذه الأمثلة الدالات GetArrayElement وGetArrayElements وGetArrayLength وعامل APPLY.
فيما يلي مثال على حدث. كل من CustomSensor03
وSensorMetadata
من نوع الصفيف:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
العمل مع عنصر مصفوفة محدد
حدد عنصر مصفوفة في فهرس محدد (تحديد عنصر المصفوفة الأول):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
النتيجة هي:
firstElement |
---|
12 |
تحديد طول المصفوفة
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
النتيجة هي:
arrayLength |
---|
3 |
تحويل عناصر المصفوفة إلى أحداث منفصلة
تحويل جميع عناصر المصفوفة كأحداث فردية. عامل APPLY باستخدام استخدام دالة المضمنة GetArrayElements يستخرج كافة عناصر الصفيف كأحداث فردية:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
النتيجة هي:
DeviceId | ArrayIndex | ArrayValue |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
النتيجة هي:
DeviceId | smKey | smValue |
---|---|---|
12345 | الشركة المُصنّعة | ABC |
12345 | إصدار | 1.2.45 |
إذا كانت الحقول المستخرجة بحاجة إلى الظهور في أعمدة، فمن الممكن تدوير مجموعة البيانات باستخدام بناء الجملة WITH بالإضافة إلى عملية JOIN. تتطلب هذه الصلة شرط حد زمني يمنع التكرار:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
النتيجة هي:
DeviceId | Lat | Long | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |