Anomalieerkennung in Azure Stream AnalyticsAnomaly detection in Azure Stream Analytics

Azure Stream Analytics ist sowohl in der Cloud als auch in Azure IoT Edge verfügbar und bietet integrierte Anomalieerkennungsfunktionen auf Machine Learning-Basis, die zum Überwachen der beiden am häufigsten auftretenden Anomalien verwendet werden können: temporäre und permanente.Available in both the cloud and Azure IoT Edge, Azure Stream Analytics offers built-in machine learning based anomaly detection capabilities that can be used to monitor the two most commonly occurring anomalies: temporary and persistent. Mit den Funktionen AnomalyDetection_SpikeAndDip und AnomalyDetection_ChangePoint können Sie die Erkennung von Anomalien direkt in Ihrem Stream Analytics-Auftrag ausführen.With the AnomalyDetection_SpikeAndDip and AnomalyDetection_ChangePoint functions, you can perform anomaly detection directly in your Stream Analytics job.

Die Machine Learning-Modelle setzen einheitlich als Stichprobe entnommene Zeitreihen voraus.The machine learning models assume a uniformly sampled time series. Wenn die Zeitreihen nicht einheitlich sind, können Sie einen Aggregationsschritt mit einem rollierenden Fenster vor dem Aufrufen der Erkennung von Anomalien einfügen.If the time series is not uniform, you may insert an aggregation step with a tumbling window prior to calling anomaly detection.

Die Machine Learning-Vorgänge unterstützen derzeit keine saisonalen Trends oder Korrelationen mit mehreren Varianten.The machine learning operations do not support seasonality trends or multi-variate correlations at this time.

Anomalieerkennung unter Verwendung von Machine Learning in Azure Stream AnalyticsAnomaly detection using machine learning in Azure Stream Analytics

Das folgende Video veranschaulicht, wie eine Anomalie in Echtzeit mithilfe von Machine Learning-Funktionen in Azure Stream Analytics erkannt wird.The following video demonstrates how to detect an anomaly in real time using machine learning functions in Azure Stream Analytics.

ModellverhaltenModel behavior

In der Regel wird die Genauigkeit des Modells besser, je mehr Daten das gleitende Fenster enthält.Generally, the model's accuracy improves with more data in the sliding window. Die Daten im angegebenen gleitenden Fenster werden als Teil des normalen Wertebereichs für diesen Zeitrahmen behandelt.The data in the specified sliding window is treated as part of its normal range of values for that time frame. Das Modell berücksichtigt nur den Ereignisverlauf im gleitenden Fenster, um zu überprüfen, ob das aktuelle Ereignis anomal ist.The model only considers event history over the sliding window to check if the current event is anomalous. Wenn das gleitende Fenster verschoben wird, werden die alten Werte des Trainings des Modells entfernt.As the sliding window moves, old values are evicted from the model’s training.

Die Funktionen legen basierend auf dem bisher Beobachteten einen bestimmten Normalwert ein.The functions operate by establishing a certain normal based on what they have seen so far. Ausreißer werden durch Vergleich mit dem festgelegten Normalwert innerhalb des Zuverlässigkeitsgrads identifiziert.Outliers are identified by comparing against the established normal, within the confidence level. Die Größe des Fensters sollte auf der Anzahl von Ereignissen basieren, die mindestens erforderlich sind, um das Modell für das normale Verhalten zu trainieren, sodass es eine Anomalie erkennen kann, sobald sie auftritt.The window size should be based on the minimum events required to train the model for normal behavior so that when an anomaly occurs, it would be able to recognize it.

Die Antwortzeit des Modells nimmt mit der Größe des Verlaufs zu, da eine höhere Anzahl vergangener Ereignisse verglichen werden muss.The model's response time increases with history size because it needs to compare against a higher number of past events. Zur Verbesserung der Leistung sollte nur die erforderliche Anzahl von Ereignissen einbezogen werden.It is recommended to only include the necessary number of events for better performance.

Lücken in der Zeitreihe können darauf zurückzuführen sein, dass das Modell zu bestimmten Zeitpunkten keine Ereignisse empfangen hat.Gaps in the time series can be a result of the model not receiving events at certain points in time. Diese Situation wird von Stream Analytics mit Annahmelogik behandelt.This situation is handled by Stream Analytics using imputation logic. Für das gleiche gleitende Fenster wird die Größe des Verlaufs ebenso wie die Dauer zum Berechnen der durchschnittlichen Rate verwendet, mit der Ereignisse auftreten.The history size, as well as a time duration, for the same sliding window is used to calculate the average rate at which events are expected to arrive.

Mit einem hier verfügbaren Anomalie-Generator können Sie für einen IoT-Hub Daten mit verschiedenen Anomaliemustern bereitstellen.An anomaly generator available here can be used to feed an Iot Hub with data with different anomaly patterns. Mit diesen Funktionen für die Anomalieerkennung kann ein ASA-Auftrag so eingerichtet werden, dass Daten aus diesem IoT-Hub gelesen und Anomalien erkannt werden.An ASA job can be set up with these anomaly detection functions to read from this Iot Hub and detect anomalies.

Spitzen und SenkenSpike and dip

Temporäre Anomalien im Ereignisdatenstrom einer Zeitreihe werden als Spitzen und Senken bezeichnet.Temporary anomalies in a time series event stream are known as spikes and dips. Spitzen und Senken können mithilfe des auf Machine Learning basierenden AnomalyDetection_SpikeAndDip-Operators überwacht werden.Spikes and dips can be monitored using the Machine Learning based operator, AnomalyDetection_SpikeAndDip.

Beispiel für Anomalien bei Spitzen und Senken

Wenn im gleichen gleitenden Fenster eine zweite Spitze kleiner ist als die erste, ist das berechnete Ergebnis für die kleinere Spitze wahrscheinlich nicht signifikant genug im Vergleich zum Ergebnis für die erste Spitze innerhalb des angegebenen Zuverlässigkeitsgrads.In the same sliding window, if a second spike is smaller than the first one, the computed score for the smaller spike is probably not significant enough compared to the score for the first spike within the confidence level specified. Sie können den Zuverlässigkeitsgrad des Modells verringern, um solche Anomalien zu erkennen.You can try decreasing the model's confidence level to detect such anomalies. Sollten Sie jedoch zu viele Warnungen erhalten, können Sie ein höheres Konfidenzintervall verwenden.However, if you start to get too many alerts, you can use a higher confidence interval.

Die folgende Beispielabfrage setzt eine einheitliche Eingangsrate von einem Ereignis pro Sekunde in einem zweiminütigen gleitenden Fenster mit einer Verlaufsgröße von 120 Ereignissen voraus.The following example query assumes a uniform input rate of one event per second in a 2-minute sliding window with a history of 120 events. Mit einem Zuverlässigkeitsgrad von 95% extrahiert die letzte SELECT-Anweisung Ergebnis und Anomalienstatus und gibt sie aus.The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 95%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
            OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

ÄnderungspunktChange point

Permanente Anomalien im Ereignisdatenstrom einer Zeitreihe sind Änderungen bei der Verteilung der Werte im Ereignisdatenstrom, wie Änderungen des Zuverlässigkeitsgrads und Trends.Persistent anomalies in a time series event stream are changes in the distribution of values in the event stream, like level changes and trends. In Stream Analytics werden diese Anomalien mithilfe des auf Machine Learning basierenden AnomalyDetection_ChangePoint-Operators erkannt.In Stream Analytics, such anomalies are detected using the Machine Learning based AnomalyDetection_ChangePoint operator.

Permanente Änderungen dauern viel länger als Spitzen und Senken und könnten auf katastrophale Ereignisse hinweisen.Persistent changes last much longer than spikes and dips and could indicate catastrophic event(s). Permanente Änderungen sind in der Regel mit bloßem Auge nicht sichtbar, können aber mit dem AnomalyDetection_ChangePoint-Operator erkannt werden.Persistent changes are not usually visible to the naked eye, but can be detected with the AnomalyDetection_ChangePoint operator.

Die folgende Abbildung ist ein Beispiel für die Änderung des Zuverlässigkeitsgrads:The following image is an example of a level change:

Beispiel für eine Anomalie der Änderung des Zuverlässigkeitsgrads

Die folgende Abbildung ist ein Beispiel für eine Trendänderung:The following image is an example of a trend change:

Beispiel für eine Anomalie der Trendänderung

Die folgende Beispielabfrage setzt eine einheitliche Eingangsrate von einem Ereignis pro Sekunde in einem 20-minütigen gleitenden Fenster mit einer Verlaufsgröße von 1.200 Ereignissen voraus.The following example query assumes a uniform input rate of one event per second in a 20-minute sliding window with a history size of 1200 events. Mit einem Zuverlässigkeitsgrad von 80% extrahiert die letzte SELECT-Anweisung Ergebnis und Anomalienstatus und gibt sie aus.The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 80%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200) 
        OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
    ChangePointScore,
    CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
    IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep

LeistungsmerkmalePerformance characteristics

Die Leistungsfähigkeit dieser Modelle hängt von der Verlaufsgröße, der Fensterdauer, der Ereignislast und davon ab, ob die Partitionierung auf Funktionsebene verwendet wird.The performance of these models depends on the history size, window duration, event load, and whether function level partitioning is used. In diesem Abschnitt werden die Konfigurationen erläutert und Beispiele für die Unterstützung von Datenerfassungsraten von 1 K, 5 K und 10 K Ereignissen pro Sekunde gezeigt.This section discusses these configurations and provides samples for how to sustain ingestion rates of 1K, 5K and 10K events per second.

  • Verlaufsgröße – Die Modellleistung verhält sich linear zur Verlaufsgröße.History size - These models perform linearly with history size. Je länger die Verlaufsgröße, umso mehr Zeit benötigen die Modelle, um ein neues Ereignis zu bewerten.The longer the history size, the longer the models take to score a new event. Dies liegt daran, dass das neue Ereignis von den Modellen mit jedem vergangenen Ereignis im Verlaufspuffer verglichen wird.This is because the models compare the new event with each of the past events in the history buffer.
  • Fensterdauer – Die Fensterdauer sollte widerspiegeln, wie lange die Erfassung der Anzahl von Ereignissen dauert, die durch die Verlaufsgröße angegeben ist.Window duration - The Window duration should reflect how long it takes to receive as many events as specified by the history size. Wenn diese Anzahl von Ereignisse im Fenster nicht erreicht wird, werden die fehlenden Werte von Azure Stream Analytics vervollständigt.Without that many events in the window, Azure Stream Analytics would impute missing values. Daher besteht eine direkte Beziehung zwischen der CPU-Auslastung und der Verlaufsgröße.Hence, CPU consumption is a function of the history size.
  • Ereignislast – Je größer die Ereignislast, desto mehr Arbeit müssen die Modelle leisten, was sich wiederum auf die CPU-Auslastung auswirkt.Event load - The greater the event load, the more work that is performed by the models, which impacts CPU consumption. Der Auftrag kann durch eine extreme Parallelverarbeitung horizontal skaliert werden. Dabei muss die Verwendung zusätzlicher Eingabepartitionen für die Geschäftslogik jedoch sinnvoll sein.The job can be scaled out by making it embarrassingly parallel, assuming it makes sense for business logic to use more input partitions.
  • Bei der Partitionierung auf Funktionsebene - Partitionierung auf Funktionsebene wird PARTITION BY innerhalb des Funktionsaufrufs der Anomalieerkennung verwendet.Function level partitioning - Function level partitioning is done by using PARTITION BY within the anomaly detection function call. Durch diese Partitionierung entsteht ein Mehraufwand, da der Zustand für mehrere Modelle gleichzeitig aufrechterhalten werden muss.This type of partitioning adds an overhead, as state needs to be maintained for multiple models at the same time. Die Partitionierung auf Funktionsebene wird in Szenarien wie der Partitionierung auf Geräteebene verwendet.Function level partitioning is used in scenarios like device level partitioning.

BeziehungRelationship

Zwischen der Verlaufsgröße, der Fensterdauer und der Gesamtereignislast besteht folgende Beziehung:The history size, window duration, and total event load are related in the following way:

windowDuration (in ms) = 1000 * historySize/(Eingabeereignisse gesamt [s]/Anzahl der Eingabepartitionen)windowDuration (in ms) = 1000 * historySize / (Total Input Events Per Sec / Input Partition Count)

Wenn Sie die Funktion nach „deviceId“ partitionieren, fügen Sie dem Funktionsaufruf der Anomalieerkennung „PARTITION BY deviceId“ hinzu.When partitioning the function by deviceId, add “PARTITION BY deviceId” to the anomaly detection function call.

BeobachtungenObservations

In der folgenden Tabelle sind die beobachteten Durchsätze bei einem einzelnen Knoten (6 SU) ohne Partitionierung aufgeführt:The following table includes the throughput observations for a single node (6 SU) for the non-partitioned case:

Verlaufsgröße (Ereignisse)History size (events) Fensterdauer (ms)Window duration (ms) Eingabeereignisse gesamt/sTotal input events per sec
6060 5555 2.2002,200
600600 728728 1.6501,650
6.0006,000 10.91010,910 1.1001,100

In der folgenden Tabelle sind die beobachteten Durchsätze bei einem einzelnen Knoten (6 SU) mit Partitionierung aufgeführt:The following table includes the throughput observations for a single node (6 SU) for the partitioned case:

Verlaufsgröße (Ereignisse)History size (events) Fensterdauer (ms)Window duration (ms) Eingabeereignisse gesamt/sTotal input events per sec GeräteanzahlDevice count
6060 1.0911,091 1.1001,100 1010
600600 10.91010,910 1.1001,100 1010
6.0006,000 218.182218,182 < 550<550 1010
6060 21.81921,819 550550 100100
600600 218.182218,182 550550 100100
6.0006,000 2.181.8192,181,819 < 550<550 100100

Codebeispiele zum Ausführen der oben genannten Konfigurationen ohne Partitionierung finden Sie in den Azure-Beispielen im Streaming At Scale-Repository.Sample code to run the non-partitioned configurations above is located in the Streaming At Scale repo of Azure Samples. Im Code wird ein Stream Analytics-Auftrag ohne Partitionierung auf Funktionsebene erstellt. Die Ein- und Ausgabe erfolgt über Event Hub.The code creates a stream analytics job with no function level partitioning, which uses Event Hub as input and output. Die Eingabelast wird mithilfe von Testclients generiert.The input load is generated using test clients. Jedes Eingabeereignis ist ein JSON-Dokument von 1 KB.Each input event is a 1KB json document. Mit den Ereignissen wird ein IoT-Gerät simuliert, das JSON-Daten (für bis zu 1.000 Geräte) sendet.Events simulate an IoT device sending JSON data (for up to 1K devices). Die Verlaufsgröße, die Fensterdauer und die Gesamtereignislast verteilen sich auf zwei Eingabepartitionen:The history size, window duration, and total event load are varied over 2 input partitions.

Hinweis

Um eine genauere Schätzung zu erhalten, passen Sie die Beispiele an Ihr Szenario an.For a more accurate estimate, customize the samples to fit your scenario.

Identifizieren von EngpässenIdentifying bottlenecks

Verwenden Sie den Bereich „Metriken“ in ihrem Azure Stream Analytics-Auftrag, um Engpässe in Ihrer Pipeline zu identifizieren.Use the Metrics pane in your Azure Stream Analytics job to identify bottlenecks in your pipeline. Überprüfen Sie Eingabe-/Ausgabeereignisse hinsichtlich des Durchsatzes sowie "Wasserzeichenverzögerung" oder Ereignisse im Rückstand, um festzustellen, ob der Auftrag mit der Eingangsrate Schritt halten kann.Review Input/Output Events for throughput and "Watermark Delay" or Backlogged Events to see if the job is keeping up with the input rate. Suchen Sie für Event Hub-Metriken nach Gedrosselte Anforderungen, und passen Sie die Schwellenwerteinheiten (TU) entsprechend an.For Event Hub metrics, look for Throttled Requests and adjust the Threshold Units accordingly. Überprüfen Sie für Cosmos DB-Metriken Max. genutzte RU/Sek. pro Partitionsschlüsselbereich unter „Durchsatz“, um sicherzustellen, dass Ihre Partitionsschlüsselbereiche gleichmäßig genutzt werden.For Cosmos DB metrics, review Max consumed RU/s per partition key range under Throughput to ensure your partition key ranges are uniformly consumed. Überwachen Sie für Azure SQL-DB Protokoll-E/A und CPU.For Azure SQL DB, monitor Log IO and CPU.

Nächste SchritteNext steps