LAG (Azure Stream Analytics)

Der LAG-Analyseoperator ermöglicht es, innerhalb bestimmter Einschränkungen ein "vorheriges" Ereignis in einem Ereignisdatenstrom nachzuschlagen. Es ist sehr nützlich, um die Wachstumsrate einer Variablen zu berechnen und zu erkennen, wann eine Variable einen Schwellenwert überschreitet oder wenn eine Bedingung beginnt oder nicht mehr wahr ist.

In Stream Analytics ist der Lag-Bereich (d. h. wie weit zurück im Verlauf von dem aktuellen Ereignis gesucht werden muss) immer auf ein endliches Zeitintervall beschränkt, wobei die LIMIT DURATION-Klausel verwendet wird. Lag kann optional auf ereignisse beschränkt werden, die mit dem aktuellen Ereignis für eine bestimmte Eigenschaft oder Bedingung übereinstimmen, indem die PARTITION BY- und WHEN-Klauseln verwendet werden.

LAG ist nicht von Prädikaten in der WHERE-Klausel, Joinbedingungen in der JOIN-Klausel oder Gruppierungsausdrücken in der GROUP BY-Klausel der aktuellen Abfrage betroffen, da sie vor diesen Klauseln ausgewertet wird.

Syntax

LAG(<scalar_expression >, [<offset >], [<default>])  
     OVER ([PARTITION BY <partition key>] LIMIT DURATION(<unit>, <length>) [WHEN boolean_expression])
  

Zum Beispiel:

LAG(reading) OVER (LIMIT DURATION(hour, 3))  
LAG(name, 2, 'none such') OVER (PARTITION BY userId LIMIT DURATION(minute, 2))  

Argumente

scalar_expression

Der zurückzugebende Wert auf Grundlage des angegebenen Offsets. Es ist entweder ein Ausdruck eines beliebigen Typs, der einen einzelnen (skalaren) Wert zurückgibt, oder der Platzhalterzeichenausdruck "*". Für "*" wird das gesamte Ereignis gemäß dem angegebenen Offset zurückgegeben und im Ergebnisereignis (geschachtelter Datensatz) enthalten sein.
scalar_expression darf keine anderen analytischen oder externen Funktionen enthalten.

offset

Die Anzahl der Ereignisse, die ab dem aktuellen Ereignis zurückzugehen ist, aus dem ein Wert abgerufen werden soll. Wenn nicht angegeben, ist der Standardwert 1, was bedeutet, dass das vorherige Ereignis zurückgegeben wird. Offset muss eine ganze Zahl größer oder gleich 1 sein. Ereignisse werden in der zeitlichen Reihenfolge verarbeitet. Treten mehrere Ereignisse mit dem gleichen Zeitstempel auf, so werden Ereignisse in der eingetroffenen Reihenfolge verarbeitet.

default

Der zurückzugebende Wert, wenn am angegebenen Offset kein Ereignis vorhanden ist. Wenn kein Standardwert angegeben ist, wird NULL zurückgegeben. "No event at the specified offset" can the case 1) if the number of corresponding events seen is kleiner than the specified offset or 2) if the event at the specified offset is timed out according to the specified limit_duration_clause 3) events exist, but not match boolean condition specified in the when_clause.

Wenn das Ereignis am angegebenen Offset vorhanden ist und der Wert von scalar_expression NULL ist, dann NULL
wird zurückgegeben. default kann eine Spalte, Unterabfrage oder ein anderer Ausdruck sein, aber er kann keine anderen Ausdrücke enthalten.
Analysefunktionen oder externe Funktionen. default muss genau denselben Typ aufweisen wie
Scalar_expression.

OVER ( [ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause Partition BY-Partitionsschlüsselklausel <> fordert nur Ereignisse an, deren Wert von
<Der Partitionsschlüssel> ist identisch mit dem des aktuellen Ereignisses, das berücksichtigt werden soll. Beispiel:

LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  

gibt den vorherigen Wert desselben Sensors wie das aktuelle Ereignis zurück (wenn ein solcher innerhalb der vorherigen 1 Stunde aufgetreten ist).

limit_duration-Klausel DURATION(<unit>, <length>)

Gibt an, wie viel des Verlaufs aus dem aktuellen Ereignis berücksichtigt werden muss. Eine ausführliche Beschreibung der unterstützten Einheiten und deren Abkürzungen finden Sie in DATEDIFF. Wenn nicht genügend übereinstimmende Ereignisse innerhalb des DURATION-Intervalls gefunden werden, wird der <Standardwert> zurückgegeben.

when_clause
Gibt eine boolesche Bedingung für die Ereignisse an, die bei der LAG-Berechnung berücksichtigt werden sollen. Wenn nicht genügend übereinstimmende Ereignisse innerhalb des DURATION-Intervalls gefunden werden, wird der <Standardwert> zurückgegeben. Die when_clause ist optional.

Rückgabetypen

Der Datentyp des angegebenen scalar_expression. NULL wird zurückgegeben, wenn scalar_expression

Allgemeine Hinweise

LAG ist nicht deterministisch. Ereignisse werden in der zeitlichen Reihenfolge verarbeitet. Treten mehrere Ereignisse mit dem gleichen Zeitstempel auf, so werden Ereignisse in der eingetroffenen Reihenfolge verarbeitet.

Das Anwenden von LAG auf das Resultset einer Fensterfunktion kann zu unerwarteten Ergebnissen führen. Fensterfunktionen ändern den Zeitstempel von Ereignissen, da jeder Fenstervorgang ein Ereignis am Ende des Fensters ausgibt. Auf den aktuellen Zeitstempel eines Ereignisses kann mit system.timestamp()zugegriffen werden. Nach einem Fenstervorgang unterscheidet er sich vom ursprünglichen Ereigniszeit-Attribut. Wenn die LAG vor dem Fenstervorgang nicht verschoben werden kann, sollten Sie CollectTop verwenden und nach der ursprünglichen Ereigniszeit sortieren.

Beispiele

Berechnen Sie die Wachstumsrate pro Sensor:

SELECT sensorId,  
       growth = reading -
                        LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  
FROM input  
  

Suchen Sie den vorherigen Sensorwert ungleich NULL:

SELECT  
     sensorId,  
     LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)  
     FROM input  
  

Suchen Sie den vorherigen Sensorwert ungleich NULL für einen bestimmten Sensortyp:

WITH filterSensor AS
(
  SELECT *
  FROM input
  WHERE input.sensorType = 4 AND sensorId IS NOT NULL
)

SELECT
  LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))
FROM filterSensor

Bestimmen Sie, wann eine Variable einen Schwellenwert überschreitet:

SELECT
    sensorId, reading
FROM input
WHERE
    devicetype = 'thermostat'
    AND reading > 100
    AND LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN devicetype = 'thermostat') <= 100

Weitere Informationen

ISFIRST (Azure Stream Analytics)
LAST (Azure Stream Analytics)