Détection d’anomalies dans Azure Stream AnalyticsAnomaly detection in Azure Stream Analytics

Disponible à la fois dans le cloud et Azure IoT Edge, Azure Stream Analytics offre des fonctionnalités de détection des anomalies basées sur un Machine Learning intégré qui peuvent permettre de surveiller les deux anomalies qui se produisent le plus souvent : temporaires et persistantes.Available in both the cloud and Azure IoT Edge, Azure Stream Analytics offers built-in machine learning based anomaly detection capabilities that can be used to monitor the two most commonly occurring anomalies: temporary and persistent. Les fonctions AnomalyDetection_SpikeAndDip et AnomalyDetection_ChangePoint permettent de détecter des anomalies directement dans votre travail Stream Analytics.With the AnomalyDetection_SpikeAndDip and AnomalyDetection_ChangePoint functions, you can perform anomaly detection directly in your Stream Analytics job.

Les modèles Machine Learning supposent une série chronologique équitablement échantillonnée.The machine learning models assume a uniformly sampled time series. Si la série chronologique n’est pas uniforme, vous pouvez insérer une étape d’agrégation avec une fenêtre bascule avant l’appel de la détection des anomalies.If the time series is not uniform, you may insert an aggregation step with a tumbling window prior to calling anomaly detection.

Actuellement, les opérations Machine Learning ne prennent pas en charge les tendances de saisonnalité ni les corrélations multivariées.The machine learning operations do not support seasonality trends or multi-variate correlations at this time.

Détection des anomalies à laide de Machine Learning dans Azure Stream AnalyticsAnomaly detection using machine learning in Azure Stream Analytics

La vidéo suivante montre comment détecter une anomalie en temps réel à l’aide des fonctions Machine Learning dans Azure Stream Analytics.The following video demonstrates how to detect an anomaly in real time using machine learning functions in Azure Stream Analytics.

Comportement du modèleModel behavior

En règle générale, la précision du modèle s’améliore avec l’augmentation des données dans la fenêtre glissante.Generally, the model's accuracy improves with more data in the sliding window. Les données de la fenêtre glissante spécifiée sont traitées comme si elles faisaient partie de la plage normale de valeurs pour cette période.The data in the specified sliding window is treated as part of its normal range of values for that time frame. Le modèle ne prend en considération que l’historique des événements de la fenêtre glissante pour vérifier si l’événement actuel est anormal.The model only considers event history over the sliding window to check if the current event is anomalous. À mesure que la fenêtre glissante se déplace, les anciennes valeurs sont supprimées de la formation du modèle.As the sliding window moves, old values are evicted from the model’s training.

Les fonctions établissent une certaine normale selon ce qu’elles ont déjà vu jusqu’à présent.The functions operate by establishing a certain normal based on what they have seen so far. Les valeurs hors norme sont identifiées par la comparaison par rapport à la normale établie au sein du niveau de confiance.Outliers are identified by comparing against the established normal, within the confidence level. La taille de la fenêtre doit être basée sur le nombre minimal d’événements requis pour former le modèle à un comportement normal afin qu’il puisse reconnaître une anomalie lorsqu’elle se produit.The window size should be based on the minimum events required to train the model for normal behavior so that when an anomaly occurs, it would be able to recognize it.

Le temps de réponse du modèle augmente avec la taille de l’historique, car il doit effectuer une comparaison avec un nombre plus élevé d’événements passés.The model's response time increases with history size because it needs to compare against a higher number of past events. Il est recommandé de n’inclure que le nombre d’événements nécessaires pour optimiser les performances.It is recommended to only include the necessary number of events for better performance.

Des lacunes dans la série chronologique peuvent être le résultat du modèle qui ne reçoit pas d’événements à certains moments au fil du temps.Gaps in the time series can be a result of the model not receiving events at certain points in time. Cette situation est gérée par Stream Analytics à l’aide de la logique d'imputation.This situation is handled by Stream Analytics using imputation logic. La taille de l’historique, ainsi que la durée, pour la même fenêtre glissante sont utilisées pour calculer la vitesse moyenne à laquelle les événements sont attendus.The history size, as well as a time duration, for the same sliding window is used to calculate the average rate at which events are expected to arrive.

Un générateur d’anomalies disponible ici peut être utilisé pour alimenter un Iot Hub en données avec différents modèles d’anomalies.An anomaly generator available here can be used to feed an Iot Hub with data with different anomaly patterns. Une tâche ASA peut être configurée avec ces fonctions de détection d’anomalies à des fins de lecture à partir de cet Iot Hub et de détection des anomalies.An ASA job can be set up with these anomaly detection functions to read from this Iot Hub and detect anomalies.

Pics et baissesSpike and dip

Les anomalies temporaires d’un flux d’événements d’une série chronologique sont appelées « pics et baisses ».Temporary anomalies in a time series event stream are known as spikes and dips. Les pics et baisses peuvent être surveillés à l’aide de l’opérateur basé sur Machine Learning AnomalyDetection_SpikeAndDip.Spikes and dips can be monitored using the Machine Learning based operator, AnomalyDetection_SpikeAndDip.

Exemple d’anomalie de pic et de baisse

Dans la même fenêtre glissante, si un deuxième pic est plus petit que le premier, le score calculé pour le plus petit pic n’est probablement pas suffisamment important par rapport au score du premier pic au niveau de confiance spécifié.In the same sliding window, if a second spike is smaller than the first one, the computed score for the smaller spike is probably not significant enough compared to the score for the first spike within the confidence level specified. Vous pouvez essayer de réduire le paramètre de niveau de confiance du modèle pour détecter ces anomalies.You can try decreasing the model's confidence level to detect such anomalies. Toutefois, si vous commencez à recevoir trop d’alertes, vous pouvez utiliser un intervalle de confiance plus élevé.However, if you start to get too many alerts, you can use a higher confidence interval.

L’exemple de requête suivant suppose une vitesse d’entrée uniforme d’un événement par seconde dans une fenêtre glissante de 2 minutes avec un historique de 120 événements.The following example query assumes a uniform input rate of one event per second in a 2-minute sliding window with a history of 120 events. La dernière instruction SELECT extrait et génère le score et le statut d’anomalie avec un niveau de confiance de 95 %.The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 95%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
            OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

Point de changementChange point

Les anomalies persistantes d’un flux d’événements d’une série chronologique sont des modifications de la distribution des valeurs du flux d’événements, telles que des modifications du niveau et des tendances.Persistent anomalies in a time series event stream are changes in the distribution of values in the event stream, like level changes and trends. Dans Stream Analytics, ces anomalies sont détectées à l’aide de l’opérateur basé sur Machine Learning AnomalyDetection_ChangePoint.In Stream Analytics, such anomalies are detected using the Machine Learning based AnomalyDetection_ChangePoint operator.

Les modifications persistantes durent beaucoup plus longtemps que les pics et les baisses, et peuvent indiquer un ou des événements catastrophiques.Persistent changes last much longer than spikes and dips and could indicate catastrophic event(s). Les modifications persistantes ne sont généralement pas visibles à l’œil nu, mais peuvent être détectées avec l’opérateur AnomalyDetection_ChangePoint.Persistent changes are not usually visible to the naked eye, but can be detected with the AnomalyDetection_ChangePoint operator.

L’image suivante est un exemple de modification de niveau :The following image is an example of a level change:

Exemple d’anomalie de modification de niveau

L’image suivante est un exemple de modification de tendance :The following image is an example of a trend change:

Exemple d’anomalie de modification de tendance

L’exemple de requête suivant suppose une vitesse d’entrée uniforme d’un événement par seconde dans une fenêtre glissante de 20 minutes avec une taille d’historique de 1 200 événements.The following example query assumes a uniform input rate of one event per second in a 20-minute sliding window with a history size of 1200 events. La dernière instruction SELECT extrait et génère le score et le statut d’anomalie avec un niveau de confiance de 80 %.The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 80%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200) 
        OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
    ChangePointScore,
    CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
    IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep

Caractéristiques du niveau de performancePerformance characteristics

Le niveau de performance de ces modèles dépend de la taille de l’historique, de la durée de la fenêtre, de la charge d'événements, ainsi que de l'utilisation du partitionnement au niveau de la fonction.The performance of these models depends on the history size, window duration, event load, and whether function level partitioning is used. Cette section traite de ces configurations et propose des exemples illustrant comment prendre en charge des taux d’ingestion d'événements de 1 Ko, 5 Ko et 10 Ko par seconde.This section discusses these configurations and provides samples for how to sustain ingestion rates of 1K, 5K and 10K events per second.

  • Taille de l’historique - Ces modèles fonctionnent de manière linéaire avec la taille de l’historique.History size - These models perform linearly with history size. Plus l’historique est volumineux, plus les modèles mettent du temps à évaluer un nouvel événement.The longer the history size, the longer the models take to score a new event. En effet, les modèles comparent le nouvel événement avec les différents événements passés dans la mémoire tampon de l’historique.This is because the models compare the new event with each of the past events in the history buffer.
  • Durée de la fenêtre - La durée de la fenêtre doit refléter le temps nécessaire à la réception du nombre d'événements défini par la taille de l’historique.Window duration - The Window duration should reflect how long it takes to receive as many events as specified by the history size. Sans ce nombre d’événements dans la fenêtre, Azure Stream Analytics impute les valeurs manquantes.Without that many events in the window, Azure Stream Analytics would impute missing values. Par conséquent, la consommation du processeur est une fonction de la taille de l’historique.Hence, CPU consumption is a function of the history size.
  • Charge d'événements - Plus la charge d’événements est importante, plus les modèles sont sollicités, ce qui a une incidence sur la consommation du processeur.Event load - The greater the event load, the more work that is performed by the models, which impacts CPU consumption. Le travail peut être monté en charge moyennant un parallélisme massif, en supposant qu'il soit judicieux, en terme de logique métier, d'utiliser plus de partitions d’entrée.The job can be scaled out by making it embarrassingly parallel, assuming it makes sense for business logic to use more input partitions.
  • Partitionnement au niveau de la fonction - Le partitionnement au niveau de la fonction s’effectue avec PARTITION BY au sein de l’appel de fonction de détection des anomalies.Function level partitioning - Function level partitioning is done by using PARTITION BY within the anomaly detection function call. Ce type de partitionnement ajoute une charge, car l'état doit être conservé pour plusieurs modèles en même temps.This type of partitioning adds an overhead, as state needs to be maintained for multiple models at the same time. Le partitionnement au niveau de la fonction est utilisé dans des scénarios tels que le partitionnement au niveau de l'appareil.Function level partitioning is used in scenarios like device level partitioning.

RelationRelationship

La taille de l’historique, la durée de la fenêtre et la charge totale d’événements sont associées comme suit :The history size, window duration, and total event load are related in the following way:

windowDuration (en ms) = 1000 * historySize / (total d'événements d'entrée par seconde / nombre de partitions d’entrée)windowDuration (in ms) = 1000 * historySize / (Total Input Events Per Sec / Input Partition Count)

Lors du partitionnement de la fonction par deviceId, ajoutez « PARTITION par deviceId » à l’appel de la fonction de détection des anomalies.When partitioning the function by deviceId, add “PARTITION BY deviceId” to the anomaly detection function call.

ObservationsObservations

Le tableau suivant présente les observations de débit pour un seul nœud (6 SU) pour le cas non partitionné :The following table includes the throughput observations for a single node (6 SU) for the non-partitioned case:

Taille de l’historique (événements)History size (events) Durée de la fenêtre (ms)Window duration (ms) Nombre total d’événements d’entrée par secondeTotal input events per sec
6060 5555 2 2002,200
600600 728728 1 6501,650
6 000 / 7506,000 10 91010,910 1 1001,100

Le tableau suivant présente les observations de débit pour un seul nœud (6 SU) pour le cas partitionné :The following table includes the throughput observations for a single node (6 SU) for the partitioned case:

Taille de l’historique (événements)History size (events) Durée de la fenêtre (ms)Window duration (ms) Nombre total d’événements d’entrée par secondeTotal input events per sec Nombre d’appareilsDevice count
6060 1 0911,091 1 1001,100 1010
600600 10 91010,910 1 1001,100 1010
6 000 / 7506,000 218 182218,182 <550<550 1010
6060 21 81921,819 550550 100100
600600 218 182218,182 550550 100100
6 000 / 7506,000 2 181 8192,181,819 <550<550 100100

Un exemple de code permettant d'exécuter les configurations non partitionnées ci-dessus est disponible dans le référentiel Streaming At Scale des exemples Azure.Sample code to run the non-partitioned configurations above is located in the Streaming At Scale repo of Azure Samples. Le code crée un travail Stream Analytics sans partitionnement au niveau de la fonction, utilisant Event Hub comme entrée et sortie.The code creates a stream analytics job with no function level partitioning, which uses Event Hub as input and output. La charge d’entrée est générée à l’aide de clients de test.The input load is generated using test clients. Chaque événement d’entrée correspond à un document json de 1 Ko.Each input event is a 1KB json document. Les événements simulent un appareil IoT envoyant des données JSON (jusqu'à 1 Ko).Events simulate an IoT device sending JSON data (for up to 1K devices). La taille de l’historique, la durée de la fenêtre et la charge totale d'événements varient sur deux partitions d'entrée.The history size, window duration, and total event load are varied over 2 input partitions.

Nota

Pour une estimation plus précise, personnalisez les échantillons en fonction de votre scénario.For a more accurate estimate, customize the samples to fit your scenario.

Identification des goulots d’étranglementIdentifying bottlenecks

Utilisez le volet Métriques de votre travail Azure Stream Analytics pour identifier les goulots d’étranglement de votre pipeline.Use the Metrics pane in your Azure Stream Analytics job to identify bottlenecks in your pipeline. Examinez les événements d’entrée/sortie pour le débit, ainsi que le « Délai en filigrane » ou les Événements en backlog, pour voir si le travail suit la vitesse d’entrée.Review Input/Output Events for throughput and "Watermark Delay" or Backlogged Events to see if the job is keeping up with the input rate. Pour les métriques Event Hub, recherchez les Demandes limitées et ajustez les Unités de seuil en conséquence.For Event Hub metrics, look for Throttled Requests and adjust the Threshold Units accordingly. Pour les métriques de Cosmos DB, examinez la valeur Nombre maximal de RU/s consommées par groupe de clés de partition sous Débit pour vous assurer que les groupes de clés de partition sont consommés de manière uniforme.For Cosmos DB metrics, review Max consumed RU/s per partition key range under Throughput to ensure your partition key ranges are uniformly consumed. Pour Azure SQL DB, surveillez E/S journal et UC.For Azure SQL DB, monitor Log IO and CPU.

Étapes suivantesNext steps