AnomalyDetection_ChangePoint (Azure Stream Analytics)

時系列イベント ストリーム内の永続的な異常を検出します。 基になる機械学習モデルでは、Exchangeability Martingales アルゴリズムが使用されます。

構文

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

引数

scalar_expression

モデルが異常検出を実行するイベント列または計算フィールド。 このパラメーターに使用できる値には、1 つの (スカラー) 値を返す FLOAT または BIGINT データ型が含まれます。

ワイルドカード式 * は使用できません。 また、 scalar_expression には、他の分析関数や外部関数を含めることはできません。

自信

機械学習モデルの感度を設定する 1.00 ~ 100 (含む) のパーセンテージ値。 信頼度が低いほど、検出された異常の数が多くなり、その逆も同様です。 70 から 90 までの任意の数値から開始し、開発またはテストで観察された結果に基づいてこれを調整します。

historySize

モデルが継続的に学習し、異常のために次のイベントのスコア付けに使用するスライディング ウィンドウ内のイベントの数。 通常、これは、モデルが後続の異常にフラグを設定できるようにするために、通常の動作の期間を表す必要があります。 履歴ログを使用して教育を受けた推測から始め、開発またはテストで観察された結果に基づいて調整します。

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

partition_by_clause

イベント内の特定の列に基づいてモデルのトレーニングをパーティション分割するために使用されます。 モデルは、すべてのパーティションに同じ関数パラメーター設定を適用します。

limit_duration_clause DURATION(unit, length)

Stream Analytics 内のスライディング ウィンドウの時間のサイズ。 この時間枠の推奨サイズは、 historySize イベントの数が安定した状態で生成されるまでにかかる時間と同じです。

when_clause

異常検出を実行するためにモデルに提供されるイベントのブール条件を指定します。 when_clauseは省略可能です。

戻り値の型

関数は、次の列で構成される入れ子になったレコードを返します。

IsAnomaly

イベントが異常かどうかを示す BIGINT (0 または 1)。

スコア

イベントの異常を示す計算されたマルチンゲール スコア (float)。 このスコアは、異常な値で指数関数的に増加します。

次のクエリ サンプルでは、最初のクエリは 5 分ごとにイベントを想定し、2 番目のクエリでは 1 秒ごとにイベントを想定しています。 信頼度レベルは、両方のモデルで 75 に設定されます。

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

履歴サイズが 1200 イベントの 20 分間のスライディング ウィンドウで、1 秒あたり 1 イベントの均一な入力レートを想定した例。 最後の SELECT ステートメントで、80% の信頼度レベルでスコアと異常状態を抽出して出力します。

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

タンブリング ウィンドウ 1 秒を使用して統一された非均一入力ストリームの例:

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

センサーごとに個別のモデルをトレーニングするためのパーティション分割クエリの例:

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep