Analyzovat data JSON a Avro v Azure Stream Analytics

Azure Stream Analytics podporovat zpracování událostí v datových formátech CSV, JSON a Avro. Data JSON a Avro mohou být strukturovaná a obsahují některé komplexní typy, jako jsou například vnořené objekty (záznamy) a pole.

Poznámka

Soubory AVRO vytvořené pomocí centra událostí zachycení používají konkrétní formát, který vyžaduje použití vlastní funkce deserializace . Další informace najdete v tématu čtení vstupu v jakémkoli formátu pomocí vlastních deserializátorů .NET.

Deserializace Stream Analytics AVRO nepodporuje typ mapy. Stream Analytics nemůže číst objekty blob pro zachycení EventHub, protože zachycení EventHub používá mapu.

Záznam typů dat

Datové typy záznamu se používají k reprezentování polí JSON a Avro, když se v datových proudech vstupních dat používají odpovídající formáty. Tyto příklady ukazují vzorový senzor, který čte vstupní události ve formátu JSON. Tady je příklad jedné události:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Přístup k vnořeným polím ve známém schématu

Pro snadný přístup k vnořeným polím přímo z dotazu použijte tečku (.). Tento dotaz například vybírá souřadnice Zeměpisná šířka a délka pod vlastností Location (umístění) v předchozích datech JSON. Zápis tečky lze použít k procházení více úrovní, jak je znázorněno níže.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Výsledek je následující:

DeviceID Připojí Dlouhou Teplota Verze
12345 47 122 80 1.2.45

Vybrat všechny vlastnosti

Můžete vybrat všechny vlastnosti vnořeného záznamu pomocí zástupného znaku *. Uvažujte následující příklad:

SELECT
    DeviceID,
    Location.*
FROM input

Výsledek je následující:

DeviceID Připojí Dlouhou
12345 47 122

Přístup k vnořeným polím, pokud je název vlastnosti proměnná

Použijte funkci GetRecordPropertyValue , pokud je název vlastnosti proměnná. To umožňuje vytvářet dynamické dotazy bez názvů vlastností zakódujeme.

Představte si například, že vzorový datový proud musí být spojen s referenčními daty obsahujícími prahové hodnoty pro každý senzor zařízení. Fragment těchto referenčních dat je uveden níže.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Cílem je připojit se k naší ukázkové sadě dat z horní části článku k těmto referenčním datům a výstupem jedné události pro každou míru snímače nad rámec její prahové hodnoty. To znamená, že naše jediná událost může vygenerovat více výstupních událostí, pokud je více senzorů nad rámec jejich odpovídajících prahových hodnot, a to díky JOIN. Chcete-li dosáhnout podobných výsledků bez připojení, přečtěte si část níže.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert : Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue vybere vlastnost v SensorReadings, která název odpovídá názvu vlastnosti přicházejícímu z referenčních dat. Pak je extrahována přidružená hodnota z SensorReadings .

Výsledek je následující:

DeviceID Senzor Zadaná hodnota alertmessage
12345 Vlhkost Výstraha: senzor nad prahovou hodnotou

Převod polí záznamu na samostatné události

Chcete-li převést pole záznamu na samostatné události, použijte operátor Apply společně s funkcí GetRecordProperties .

S původními ukázkovými daty můžete k extrakci vlastností do různých událostí použít následující dotaz.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Výsledek je následující:

DeviceID Senzor Zadaná hodnota alertmessage
12345 Teplota 80
12345 Vlhkost 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [objekt objektu]

Pomocí smůžete tyto události směrovat do různých umístění:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Analyzovat záznam JSON v referenčních datech SQL

Při použití Azure SQL Database jako referenčních dat v rámci úlohy je možné mít sloupec, který má data ve formátu JSON. Příklad najdete níže.

DeviceID Data
12345 {"Key": "hodnota1"}
54321 {"Key": "hodnota2"}

Záznam JSON můžete analyzovat ve sloupci dat tak, že napíšete jednoduchou uživatelsky definovanou funkci JavaScriptu.

function parseJson(string) {
return JSON.parse(string);
}

Pak můžete vytvořit krok v dotazu Stream Analytics, jak je znázorněno níže, a získat tak přístup k polím záznamů JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Datové typy polí

Datové typy pole jsou seřazené kolekce hodnot. Některé typické operace s hodnotami polí jsou popsány níže. V těchto příkladech se používají funkce GetArrayElement, GetArrayElements, GetArrayLengtha operátor Apply .

Tady je příklad jedné události. CustomSensor03A SensorMetadata jsou typu Array:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Práce s konkrétním prvkem pole

Vybrat prvek pole v zadaném indexu (Výběr prvního prvku pole):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Výsledek je následující:

firstElement
12

Vybrat délku pole

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Výsledek je následující:

arrayLength
3

Převést elementy pole na samostatné události

Vyberte všechny prvky pole jako jednotlivé události. Operátor Apply společně s vestavěnou funkcí GetArrayElements extrahuje všechny prvky pole jako jednotlivé události:

SELECT
    DeviceId,
    CustomSensor03Record.ArrayIndex,
    CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Výsledek je následující:

DeviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId, 
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Výsledek je následující:

DeviceId smKey smValue
12345 Manufacturer ABC
12345 Verze 1.2.45

Pokud se extrahovaná pole musí zobrazit ve sloupcích, je možné datovou sadu pivotovat pomocí syntaxe with kromě operace Join . Toto spojení bude vyžadovat podmínku časové hranice , která zabrání duplikaci:

WITH DynamicCTE AS (
    SELECT   
        i.DeviceId,
        SensorMetadataRecords.ArrayValue.smKey as smKey,
        SensorMetadataRecords.ArrayValue.smValue as smValue
    FROM input i
    CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
    i.DeviceId,
    i.Location.*,
    V.smValue AS 'smVersion',
    M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Výsledek je následující:

DeviceId Připojí Dlouhou smVersion smManufacturer
12345 47 122 1.2.45 ABC

Viz také

Datové typy v Azure Stream Analytics