JSON- en Avro-gegevens parseren in Azure Stream Analytics

Azure Stream Analytics biedt ondersteuning voor het verwerken van gebeurtenissen in CSV-, JSON- en Avro-gegevensindelingen. Zowel JSON- als Avro-gegevens kunnen worden gestructureerd en bevatten enkele complexe typen, zoals geneste objecten (records) en matrices.

Notitie

AVRO-bestanden die door Event Hub Capture zijn gemaakt, gebruiken een specifieke indeling waarvoor u de aangepaste deserialisatiefunctie moet gebruiken. Zie Invoer lezen in elke indeling met aangepaste .NET-deserializers voor meer informatie.

Gegevenstypen vastleggen

Recordgegevenstypen worden gebruikt om JSON- en Avro-matrices weer te geven wanneer overeenkomende indelingen worden gebruikt in de invoergegevensstromen. In deze voorbeelden ziet u een voorbeeldsensor, die invoerevenementen in JSON-indeling leest. Hier volgt een voorbeeld van één gebeurtenis:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Geneste velden in een bekend schema openen

Gebruik puntnotatie (.) om eenvoudig geneste velden rechtstreeks vanuit uw query te openen. Deze query selecteert bijvoorbeeld de coördinaten Breedtegraad en Lengtegraad onder de eigenschap Locatie in de voorgaande JSON-gegevens. De puntnotatie kan worden gebruikt om door meerdere niveaus te navigeren, zoals hieronder wordt weergegeven.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Het resultaat is:

DeviceID Lat Lange Temperatuur Versie
12345 47 122 80 1.2.45

Alle eigenschappen selecteren

U kunt alle eigenschappen van een geneste record selecteren met behulp van het jokerteken *. Kijk eens naar het volgende voorbeeld:

SELECT
    DeviceID,
    Location.*
FROM input

Het resultaat is:

DeviceID Lat Lange
12345 47 122

Geneste velden openen wanneer de eigenschapsnaam een variabele is

Gebruik de functie GetRecordPropertyValue als de naam van de eigenschap een variabele is. Hierdoor kunt u dynamische query's maken zonder eigenschapsnamen hardcoding.

Stel dat de voorbeeldgegevensstroom moet worden samengevoegd met referentiegegevens met drempelwaarden voor elke apparaatsensor. Hieronder ziet u een fragment van dergelijke referentiegegevens.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Het doel hier is om onze voorbeeldgegevensset vanaf het begin van het artikel samen te voegen met die referentiegegevens en één gebeurtenis uit te voeren voor elke sensormeting boven de drempelwaarde. Dit betekent dat de bovenstaande gebeurtenis meerdere uitvoergebeurtenissen kan genereren als meerdere sensoren hun respectieve drempelwaarden overschrijden, dankzij de join. Als u vergelijkbare resultaten wilt bereiken zonder een join, raadpleegt u de onderstaande sectie.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue selecteert de eigenschap in SensorReadings, die overeenkomt met de naam van de eigenschap die afkomstig is van de referentiegegevens. Vervolgens wordt de bijbehorende waarde uit SensorReadings geëxtraheerd.

Het resultaat is:

DeviceID SensorName AlertMessage
12345 Vochtigheid Waarschuwing: sensor boven drempelwaarde

Recordvelden converteren naar afzonderlijke gebeurtenissen

Als u recordvelden wilt converteren naar afzonderlijke gebeurtenissen, gebruikt u de operator APPLY samen met de functie GetRecordProperties .

Met de oorspronkelijke voorbeeldgegevens kan de volgende query worden gebruikt om eigenschappen te extraheren in verschillende gebeurtenissen.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Het resultaat is:

DeviceID SensorName AlertMessage
12345 Temperatuur 80
12345 Vochtigheid 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [objectobject]

Met BEHULP van WITH is het vervolgens mogelijk om deze gebeurtenissen naar verschillende bestemmingen te routeren:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

JSON-record parseren in SQL-referentiegegevens

Wanneer u Azure SQL Database gebruikt als referentiegegevens in uw taak, is het mogelijk om een kolom met gegevens in JSON-indeling te hebben. Hieronder kunt u een voorbeeld bekijken.

DeviceID Gegevens
12345 {"key": "value1"}
54321 {"key": "value2"}

U kunt de JSON-record in de kolom Gegevens parseren door een eenvoudige door de gebruiker gedefinieerde JavaScript-functie te schrijven.

function parseJson(string) {
return JSON.parse(string);
}

U kunt vervolgens een stap maken in uw Stream Analytics-query, zoals hieronder wordt weergegeven, om toegang te krijgen tot de velden van uw JSON-records.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Matrixgegevenstypen

Matrixgegevenstypen zijn een geordende verzameling waarden. Hieronder worden enkele typische bewerkingen voor matrixwaarden beschreven. In deze voorbeelden worden de functies GetArrayElement, GetArrayElements, GetArrayLength en de operator APPLY gebruikt.

Hier volgt een voorbeeld van een gebeurtenis. Zowel CustomSensor03 als SensorMetadata zijn van het type matrix:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Werken met een specifiek matrixelement

Selecteer het matrixelement in een opgegeven index (het eerste matrixelement selecteren):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Het resultaat is:

firstElement
12

Matrixlengte selecteren

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Het resultaat is:

arrayLength
3

Matrixelementen converteren naar afzonderlijke gebeurtenissen

Selecteer alle matrixelementen als afzonderlijke gebeurtenissen. De operator APPLY samen met de ingebouwde functie GetArrayElements extraheert alle matrixelementen als afzonderlijke gebeurtenissen:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Het resultaat is:

DeviceId MatrixIndex MatrixWaarde
12345 0 12
12345 1 5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Het resultaat is:

DeviceId smKey smValue
12345 Fabrikant ABC
12345 Versie 1.2.45

Als de geëxtraheerde velden in kolommen moeten worden weergegeven, is het mogelijk om de gegevensset te draaien met behulp van de with-syntaxis naast de JOIN-bewerking . Voor deze join is een tijdsgrensvoorwaarde vereist die duplicatie voorkomt:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Het resultaat is:

DeviceId Lat Lange smVersion smManufacturer
12345 47 122 1.2.45 ABC

Zie ook

Gegevenstypen in Azure Stream Analytics