AnomalyDetection_ChangePoint (Azure Stream Analytics)

Détecte les anomalies persistantes dans un flux d’événements de série chronologique. Le modèle Machine Learning sous-jacent utilise l’algorithme Exchangeability Martingales.

Syntaxe

AnomalyDetection_ChangePoint(
        <scalar_expression>, 
        <confidence>, 
        <historySize>)
    OVER ([PARTITION BY <partition key>] 
        LIMIT DURATION(<unit>, <length>)
    [WHEN boolean_expression])

Arguments

scalar_expression

Colonne d’événement ou champ calculé sur lequel le modèle effectue la détection d’anomalies. Les valeurs autorisées pour ce paramètre incluent les types de données FLOAT ou BIGINT qui retournent une valeur unique (scalaire).

L’expression générique * n’est pas autorisée. En outre, scalar_expression ne peut pas contenir d’autres fonctions analytiques ou fonctions externes.

Confiance

Nombre de pourcentages compris entre 1,00 et 100 (inclus) qui définit la sensibilité du modèle Machine Learning. Plus la confiance est faible, plus le nombre d’anomalies détectées est élevé, et inversement. Commencez à partir d’un nombre arbitraire compris entre 70 et 90 et ajustez-le en fonction des résultats observés dans le développement ou les tests.

historySize

Nombre d’événements dans une fenêtre glissante que le modèle apprend en continu et utilise pour noter l’événement suivant en cas d’anomalie. En règle générale, cela doit représenter la période de temps de comportement normal pour permettre au modèle de signaler une anomalie ultérieure. Commencez par une estimation instruite à l’aide des journaux d’activité historiques et ajustez en fonction des résultats observés dans le développement ou le test.

OVER ([ partition_by_clause ] limit_duration_clause [when_clause])

clause_partition_by

Permet de partitionner l’entraînement d’un modèle en fonction d’une colonne particulière dans les événements. Le modèle applique les mêmes paramètres de fonction sur toutes les partitions.

limit_duration_clause DURATION(unit, length)

Taille de la fenêtre glissante dans Stream Analytics en termes de temps. La taille recommandée de cette fenêtre de temps est l’équivalent du temps nécessaire pour générer le nombre d’événements historySize dans un état stable.

clause_when

Spécifie la condition booléenne pour les événements à fournir au modèle afin d’effectuer la détection des anomalies. La when_clause est facultative.

Types de retour

La fonction retourne un enregistrement imbriqué composé des colonnes suivantes :

IsAnomaly

BIGINT (0 ou 1) indiquant si l’événement était anormal ou non.

Score

Le score martingale calculé (float) indiquant à quel point un événement est anormal. Ce score augmente de façon exponentielle avec des valeurs anormales.

Exemples

Dans l’exemple de requête suivant, la première requête suppose un événement toutes les 5 minutes, et la deuxième requête suppose un événement toutes les secondes. Le niveau de confiance est défini sur 75 pour les deux modèles.

AnomalyDetection_ChangePoint(reading, 75, 72)
    OVER (LIMIT DURATION(hour, 6))

AnomalyDetection_ChangePoint(temperature, 75, 120)
    OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))

Exemple en supposant un taux d’entrée uniforme de 1 événement par seconde dans une fenêtre glissante de 20 minutes avec une taille d’historique de 1 200 événements. La dernière instruction SELECT extrait et génère le score et le statut d’anomalie avec un niveau de confiance de 80 %.

WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Exemple avec un flux d’entrée non uniforme rendu uniforme à l’aide d’une fenêtre bascule de 1 seconde :

WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200) 
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)

SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep

Exemple avec une requête partitionnée pour entraîner un modèle distinct par capteur :

WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200) 
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)

SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly

INTO output
FROM AnomalyDetectionStep