LAG (Azure Stream Analytics)

Оператор аналитики LAG позволяет искать "предыдущее" событие в потоке событий в пределах определенных ограничений. Это очень полезно для вычисления скорости роста переменной, обнаружения того, когда переменная пересекает пороговое значение или когда условие начинается или перестает быть истинным.

В Stream Analytics область LAG (т. е. насколько далеко от текущего события оно должно искать) всегда ограничивается конечным интервалом времени с помощью предложения LIMIT DURATION. При необходимости LAG можно ограничить только событиями, которые соответствуют текущему событию для определенного свойства или условия, с помощью предложений PARTITION BY и WHEN.

На LAG не влияют предикаты в предложении WHERE, условия соединения в предложении JOIN или выражения группирования в предложении GROUP BY текущего запроса, так как они вычисляются раньше этих предложений.

Синтаксис

LAG(<scalar_expression >, [<offset >], [<default>])  
     OVER ([PARTITION BY <partition key>] LIMIT DURATION(<unit>, <length>) [WHEN boolean_expression])
  

Например:

LAG(reading) OVER (LIMIT DURATION(hour, 3))  
LAG(name, 2, 'none such') OVER (PARTITION BY userId LIMIT DURATION(minute, 2))  

Аргументы

scalar_expression

Возвращаемое значение основано на указанном смещении. Это либо выражение любого типа, которое возвращает одно (скалярное) значение, либо выражение с подстановочными знаками "*". Для "*" возвращается все событие в соответствии с указанным смещением и будет содержаться в событии результата (вложенная запись).
scalar_expression не должно содержать других аналитических функций или внешних функций.

offset

Количество событий, возвращаемых текущим событием, из которого требуется получить значение. Если значение не указано, значение по умолчанию равно 1, то есть возвращается предыдущее событие. Смещение должно быть целым числом больше или равно 1. События обрабатываются во временной последовательности. Если существует несколько событий с одинаковыми отметками времени, то они обрабатываются в порядке их получения.

default

Значение, возвращаемое при отсутствии события с указанным смещением. Если значение по умолчанию не задано, то возвращается NULL. "Нет события с указанным смещением" может быть вариантом 1) если количество соответствующих событий, наблюдаемых до сих пор, меньше указанного смещения или 2) если событие с указанным смещением истекло в соответствии с указанным limit_duration_clause 3) события существуют, но не соответствуют логическому условию, указанному в when_clause.

Если событие с указанным смещением существует, а значение scalar_expression равно NULL, то значение NULL
. Значение default может быть столбцом, вложенным запросом или другим выражением, но не может содержать другое выражение.
аналитические функции или внешние функции. значение по умолчанию должно иметь тот же тип, что и
scalar_expression.

OVER ( [ предложение_partition_by ] предложение_limit_duration [предложение_when])

partition_by_clause Предложение partition by <partition key> запрашивает только события, для которых задано значение
<ключ секции совпадает с ключом> текущего события, которое следует рассматривать. Например,

LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  

возвращает предыдущее чтение того же датчика, что и текущее событие (если это произошло в течение предыдущего часа).

предложение limit_duration DURATION(<unit>, <length>)

Указывает, какая часть журнала из текущего события должна учитываться. В описании функции DATEDIFF приводятся подробные сведения о поддерживаемых единицах и их сокращения. Если в интервале DURATION обнаружено недостаточное количество соответствующих событий, <возвращается значение по умолчанию> .

when_clause
Задает логическое условие для событий, которые будут учитываться при вычислении LAG. Если в интервале DURATION обнаружено недостаточное количество соответствующих событий, <возвращается значение по умолчанию> . When_clause является необязательным.

Типы возвращаемых данных

Тип данных указанного выражения scalar_expression. Возвращается значение NULL, если scalar_expression

Общие замечания

Функция LAG не детерминирована. События обрабатываются во временной последовательности. Если существует несколько событий с одинаковыми отметками времени, то они обрабатываются в порядке их получения.

Применение LAG к результирующем набору оконной функции может привести к непредвиденным результатам. Оконные функции изменяют метку времени событий, так как каждая операция окна выводит событие в конце окна. Доступ к текущей метке времени события можно получить с помощью system.timestamp() и после операции окна она будет отличаться от исходного атрибута времени события. Если LAG не удается переместить до операции окна, рассмотрите возможность использования CollectTop, упорядочив его по исходному времени события.

Примеры

Вычисление скорости роста для каждого датчика:

SELECT sensorId,  
       growth = reading -
                        LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))  
FROM input  
  

Найти предыдущие показания датчика, не равного null:

SELECT  
     sensorId,  
     LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN reading IS NOT NULL)  
     FROM input  
  

Найдите предыдущие показания датчика, отличного от NULL, для определенного типа датчика:

WITH filterSensor AS
(
  SELECT *
  FROM input
  WHERE input.sensorType = 4 AND sensorId IS NOT NULL
)

SELECT
  LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1))
FROM filterSensor

Определите, когда переменная пересекает пороговое значение:

SELECT
    sensorId, reading
FROM input
WHERE
    devicetype = 'thermostat'
    AND reading > 100
    AND LAG(reading) OVER (PARTITION BY sensorId LIMIT DURATION(hour, 1) WHEN devicetype = 'thermostat') <= 100

См. также:

ISFIRST (Azure Stream Analytics)
LAST (Azure Stream Analytics)