Parsa JSON- och Avro-data i Azure Stream Analytics

Azure Stream Analytics stöder bearbetning av händelser i CSV-, JSON- och Avro-dataformat. Både JSON- och Avro-data kan struktureras och innehålla vissa komplexa typer, till exempel kapslade objekt (poster) och matriser.

Anteckning

AVRO-filer som skapats av Event Hub Capture använder ett visst format som kräver att du använder den anpassade deserialiserarfunktionen . Mer information finns i Läsa indata i valfritt format med anpassade .NET-deserialiserare.

Registrera datatyper

Postdatatyper används för att representera JSON- och Avro-matriser när motsvarande format används i indataströmmarna. De här exemplen visar en exempelsensor som läser indatahändelser i JSON-format. Här är ett exempel på en enskild händelse:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Få åtkomst till kapslade fält i ett känt schema

Använd punktnotation (.) för att enkelt komma åt kapslade fält direkt från din fråga. Den här frågan väljer till exempel koordinaterna Latitud och Longitud under egenskapen Plats i föregående JSON-data. Punktnotationen kan användas för att navigera i flera nivåer enligt nedan.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Resultatet är:

Deviceid Lat Lång Temperatur Version
12345 47 122 80 1.2.45

Markera alla egenskaper

Du kan välja alla egenskaper för en kapslad post med jokertecknet *. Se följande exempel:

SELECT
    DeviceID,
    Location.*
FROM input

Resultatet är:

Deviceid Lat Lång
12345 47 122

Få åtkomst till kapslade fält när egenskapsnamnet är en variabel

Använd funktionen GetRecordPropertyValue om egenskapsnamnet är en variabel. På så sätt kan du skapa dynamiska frågor utan att hårdkoda egenskapsnamn.

Anta till exempel att exempeldataströmmen måste kopplas till referensdata som innehåller tröskelvärden för varje enhetssensor. Ett kodfragment med sådana referensdata visas nedan.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Målet här är att koppla vår exempeldatauppsättning överst i artikeln till dessa referensdata och mata ut en händelse för varje sensormått över tröskelvärdet. Det innebär att vår enda händelse ovan kan generera flera utdatahändelser om flera sensorer ligger över sina respektive tröskelvärden tack vare kopplingen. Om du vill uppnå liknande resultat utan koppling kan du läsa avsnittet nedan.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue väljer egenskapen i SensorReadings, vilket namn matchar egenskapsnamnet som kommer från referensdata. Sedan extraheras det associerade värdet från SensorReadings .

Resultatet är:

Deviceid SensorName AlertMessage
12345 Luftfuktighet Avisering: Sensorn överskrider tröskelvärdet

Konvertera postfält till separata händelser

Om du vill konvertera postfält till separata händelser använder du apply-operatorn tillsammans med funktionen GetRecordProperties .

Med ursprungliga exempeldata kan följande fråga användas för att extrahera egenskaper till olika händelser.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Resultatet är:

Deviceid SensorName AlertMessage
12345 Temperatur 80
12345 Luftfuktighet 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [objektobjekt]

Med hjälp av WITH kan du sedan dirigera dessa händelser till olika mål:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Parsa JSON-post i SQL-referensdata

När du använder Azure SQL Database som referensdata i jobbet kan du ha en kolumn som har data i JSON-format. Ett exempel på detta visas nedan.

Deviceid Data
12345 {"key": "value1"}
54321 {"key": "value2"}

Du kan parsa JSON-posten i kolumnen Data genom att skriva en enkel användardefinierad JavaScript-funktion.

function parseJson(string) {
return JSON.parse(string);
}

Du kan sedan skapa ett steg i Stream Analytics-frågan enligt nedan för att komma åt fälten i dina JSON-poster.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Matrisdatatyper

Matrisdatatyper är en ordnad samling värden. Några typiska åtgärder för matrisvärden beskrivs nedan. I de här exemplen används funktionerna GetArrayElement, GetArrayElements, GetArrayLength och APPLY-operatorn .

Här är ett exempel på en händelse. Både CustomSensor03 och SensorMetadata är av typen matris:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Arbeta med ett specifikt matriselement

Välj matriselement vid ett angivet index (välj det första matriselementet):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Resultatet är:

firstElement
12

Välj matrislängd

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Resultatet är:

arrayLength
3

Konvertera matriselement till separata händelser

Markera alla matriselement som enskilda händelser. APPLY-operatorn tillsammans med den inbyggda funktionen GetArrayElements extraherar alla matriselement som enskilda händelser:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Resultatet är:

DeviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Resultatet är:

DeviceId smKey smValue
12345 Tillverkare ABC
12345 Version 1.2.45

Om de extraherade fälten måste visas i kolumner är det möjligt att pivoteras datauppsättningen med hjälp av WITH-syntaxen utöver JOIN-åtgärden . Den kopplingen kräver ett tidsgränsvillkor som förhindrar duplicering:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Resultatet är:

DeviceId Lat Lång smVersion smManufacturer
12345 47 122 1.2.45 ABC

Se även

Datatyper i Azure Stream Analytics