Azure 流分析 (AnomalyDetection_ChangePoint)

检测时序事件流中的持久异常。 底层机器学习模型使用 Exchangeability Martingales 算法。

语法

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

参数

scalar_expression

模型用于执行异常检测的事件列或计算字段。 此参数允许的值包括返回单个 (标量) 值的 FLOAT 或 BIGINT 数据类型。

不允许使用通配符表达式 *。 此外, scalar_expression 不能包含其他分析函数或外部函数。

confidence

用于设置机器学习模型敏感度的从1.00 到 100 (包含) 的百分比数字。 置信度越低,检测到的异常数量就越高,反之亦然。 从70到90之间的任意数字开始,根据开发或测试中观测到的结果调整此值。

historySize

滑动窗口中模型持续进行的事件数,并将其用于计分 anomalousness 的下一个事件。 通常情况下,这应该表示正常行为的时间段,以使模型可以标记后续异常。 首先,使用历史日志进行合理推测,并根据开发或测试中观测到的结果进行调整。

超出 ( [partition_by_clause] limit_duration_clause [when_clause] )

partition_by_clause

用于基于事件中的特定列对模型的定型进行分区。 模型在所有分区中应用相同的函数参数设置。

limit_duration_clause 持续时间 (单位,长度)

流分析中的滑动窗口的大小(以时间为依据)。 此时间范围的建议大小等效于生成稳定状态的 historySize 事件所花的时间。

when_clause

为要提供给模型的事件指定布尔条件以执行异常检测。 When_clause是可选的。

返回类型

函数返回由以下各列组成的嵌套记录:

IsAnomaly

BIGINT (0 或 1) 指示事件是否异常。

评分

计算出的鞅分数 (float) 指示事件的异常程度。 此分数与异常值呈指数级增长。

示例

在下面的查询示例中,第一个查询假设每5分钟发生一次事件,第二个查询每秒假定一个事件。 对于这两个模型,置信度设置为75。

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

示例假设20分钟滑动窗口中每秒1个事件的统一输入速率,历史记录大小为1200事件。 最终的 SELECT 语句将提取事件,并输出评分和置信度级别为 80% 的异常状态。

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

例如,使用非统一输入流,该流使用1秒的翻转窗口进行统一:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

使用分区查询为每个传感器定型单独的模型的示例:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep