AnomalyDetection_ChangePoint (Azure Stream Analytics)

Обнаруживает постоянные аномалии в потоке событий временных рядов. Базовая модель машинного обучения использует алгоритм Мартингейла с возможностью обмена.

Синтаксис

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Аргументы

scalar_expression

Столбец событий или вычисляемое поле, по которому модель выполняет обнаружение аномалий. Допустимые значения для этого параметра включают типы данных FLOAT или BIGINT, возвращающие одно (скалярное) значение.

Выражение с подстановочными знаками * запрещено. Кроме того, scalar_expression не может содержать другие аналитические или внешние функции.

Уверенность

Процентное число от 1,00 до 100 (включительно), которое задает чувствительность модели машинного обучения. Чем ниже достоверность, тем выше число обнаруженных аномалий и наоборот. Начните с произвольного числа от 70 до 90 и измените его на основе результатов, наблюдаемых при разработке или тестировании.

historySize

Количество событий в скользящем окне, которые модель постоянно изучает и использует для оценки следующего события на предмет аномальной активности. Как правило, это должен представлять период времени нормального поведения, чтобы позволить модели пометить последующую аномалию. Начните с обученного предположения, используя журналы журнала, и корректируйте их на основе результатов, наблюдаемых при разработке или тестировании.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

Используется для секционирования обучения модели на основе определенного столбца в событиях. Модель применяет одинаковые параметры параметров функции ко всем секциям.

limit_duration_clause DURATION(единица, длина)

Размер скользящего окна в Stream Analytics с точки зрения времени. Рекомендуемый размер этого временного окна эквивалентен времени, затрачиваемого на создание historySize числа событий в устойчивом состоянии.

when_clause

Задает логическое условие для событий, предоставляемых модели для обнаружения аномалий. When_clause является необязательным.

Типы возвращаемых данных

Функция возвращает вложенную запись, состоящую из следующих столбцов:

IsAnomaly

BigINT (0 или 1), указывающий, было ли событие аномальным или нет.

Оценка

Вычисленная оценка Мартингейла (float), указывающая, насколько аномальным является событие. Эта оценка растет экспоненциально с аномальными значениями.

Примеры

В следующем примере запроса первый запрос предполагает событие каждые 5 минут, а второй запрос предполагает событие каждую секунду. Уровень достоверности для обеих моделей равен 75.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Пример, предполагающий единую скорость ввода 1 событие в секунду в 20-минутном скользящем окне с размером журнала 1200 событий. Заключительная инструкция SELECT извлекает и выводит оценку и состояние аномалии с уровнем достоверности 80 %.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Пример с неоднородным входным потоком, который становится однородным с помощью переворачивающегося окна в 1 секунду:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Пример секционированного запроса для обучения отдельной модели для каждого датчика:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep