Analyzovat data JSON a Avro v Azure Stream Analytics
Azure Stream Analytics podporovat zpracování událostí v datových formátech CSV, JSON a Avro. Data JSON a Avro mohou být strukturovaná a obsahují některé komplexní typy, jako jsou například vnořené objekty (záznamy) a pole.
Poznámka
Soubory AVRO vytvořené pomocí centra událostí zachycení používají konkrétní formát, který vyžaduje použití vlastní funkce deserializace . Další informace najdete v tématu čtení vstupu v jakémkoli formátu pomocí vlastních deserializátorů .NET.
Deserializace Stream Analytics AVRO nepodporuje typ mapy. Stream Analytics nemůže číst objekty blob pro zachycení EventHub, protože zachycení EventHub používá mapu.
Záznam typů dat
Datové typy záznamu se používají k reprezentování polí JSON a Avro, když se v datových proudech vstupních dat používají odpovídající formáty. Tyto příklady ukazují vzorový senzor, který čte vstupní události ve formátu JSON. Tady je příklad jedné události:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Přístup k vnořeným polím ve známém schématu
Pro snadný přístup k vnořeným polím přímo z dotazu použijte tečku (.). Tento dotaz například vybírá souřadnice Zeměpisná šířka a délka pod vlastností Location (umístění) v předchozích datech JSON. Zápis tečky lze použít k procházení více úrovní, jak je znázorněno níže.
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Výsledek je následující:
| DeviceID | Připojí | Dlouhou | Teplota | Verze |
|---|---|---|---|---|
| 12345 | 47 | 122 | 80 | 1.2.45 |
Vybrat všechny vlastnosti
Můžete vybrat všechny vlastnosti vnořeného záznamu pomocí zástupného znaku *. Uvažujte následující příklad:
SELECT
DeviceID,
Location.*
FROM input
Výsledek je následující:
| DeviceID | Připojí | Dlouhou |
|---|---|---|
| 12345 | 47 | 122 |
Přístup k vnořeným polím, pokud je název vlastnosti proměnná
Použijte funkci GetRecordPropertyValue , pokud je název vlastnosti proměnná. To umožňuje vytvářet dynamické dotazy bez názvů vlastností zakódujeme.
Představte si například, že vzorový datový proud musí být spojen s referenčními daty obsahujícími prahové hodnoty pro každý senzor zařízení. Fragment těchto referenčních dat je uveden níže.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
Cílem je připojit se k naší ukázkové sadě dat z horní části článku k těmto referenčním datům a výstupem jedné události pro každou míru snímače nad rámec její prahové hodnoty. To znamená, že naše jediná událost může vygenerovat více výstupních událostí, pokud je více senzorů nad rámec jejich odpovídajících prahových hodnot, a to díky JOIN. Chcete-li dosáhnout podobných výsledků bez připojení, přečtěte si část níže.
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert : Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue vybere vlastnost v SensorReadings, která název odpovídá názvu vlastnosti přicházejícímu z referenčních dat. Pak je extrahována přidružená hodnota z SensorReadings .
Výsledek je následující:
| DeviceID | Senzor | Zadaná hodnota alertmessage |
|---|---|---|
| 12345 | Vlhkost | Výstraha: senzor nad prahovou hodnotou |
Převod polí záznamu na samostatné události
Chcete-li převést pole záznamu na samostatné události, použijte operátor Apply společně s funkcí GetRecordProperties .
S původními ukázkovými daty můžete k extrakci vlastností do různých událostí použít následující dotaz.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Výsledek je následující:
| DeviceID | Senzor | Zadaná hodnota alertmessage |
|---|---|---|
| 12345 | Teplota | 80 |
| 12345 | Vlhkost | 70 |
| 12345 | CustomSensor01 | 5 |
| 12345 | CustomSensor02 | 99 |
| 12345 | SensorMetadata | [objekt objektu] |
Pomocí smůžete tyto události směrovat do různých umístění:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Analyzovat záznam JSON v referenčních datech SQL
Při použití Azure SQL Database jako referenčních dat v rámci úlohy je možné mít sloupec, který má data ve formátu JSON. Příklad najdete níže.
| DeviceID | Data |
|---|---|
| 12345 | {"Key": "hodnota1"} |
| 54321 | {"Key": "hodnota2"} |
Záznam JSON můžete analyzovat ve sloupci dat tak, že napíšete jednoduchou uživatelsky definovanou funkci JavaScriptu.
function parseJson(string) {
return JSON.parse(string);
}
Pak můžete vytvořit krok v dotazu Stream Analytics, jak je znázorněno níže, a získat tak přístup k polím záznamů JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Datové typy polí
Datové typy pole jsou seřazené kolekce hodnot. Některé typické operace s hodnotami polí jsou popsány níže. V těchto příkladech se používají funkce GetArrayElement, GetArrayElements, GetArrayLengtha operátor Apply .
Tady je příklad jedné události. CustomSensor03A SensorMetadata jsou typu Array:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Práce s konkrétním prvkem pole
Vybrat prvek pole v zadaném indexu (Výběr prvního prvku pole):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Výsledek je následující:
| firstElement |
|---|
| 12 |
Vybrat délku pole
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Výsledek je následující:
| arrayLength |
|---|
| 3 |
Převést elementy pole na samostatné události
Vyberte všechny prvky pole jako jednotlivé události. Operátor Apply společně s vestavěnou funkcí GetArrayElements extrahuje všechny prvky pole jako jednotlivé události:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Výsledek je následující:
| DeviceId | ArrayIndex | ArrayValue |
|---|---|---|
| 12345 | 0 | 12 |
| 12345 | 1 | -5 |
| 12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Výsledek je následující:
| DeviceId | smKey | smValue |
|---|---|---|
| 12345 | Manufacturer | ABC |
| 12345 | Verze | 1.2.45 |
Pokud se extrahovaná pole musí zobrazit ve sloupcích, je možné datovou sadu pivotovat pomocí syntaxe with kromě operace Join . Toto spojení bude vyžadovat podmínku časové hranice , která zabrání duplikaci:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Výsledek je následující:
| DeviceId | Připojí | Dlouhou | smVersion | smManufacturer |
|---|---|---|---|---|
| 12345 | 47 | 122 | 1.2.45 | ABC |