Azure Stream Analytics での異常検出Anomaly detection in Azure Stream Analytics

クラウドと Azure IoT Edge の両方で利用できます。Azure Stream Analytics には、機械学習をベースにした異常検出機能が組み込まれています。これを使用して、最も多く発生する 2 種類の異常 (一時的な異常と永続的な異常) を監視できます。Available in both the cloud and Azure IoT Edge, Azure Stream Analytics offers built-in machine learning based anomaly detection capabilities that can be used to monitor the two most commonly occurring anomalies: temporary and persistent. AnomalyDetection_SpikeAndDipAnomalyDetection_ChangePoint 関数を使用して、異常検出を Stream Analytics ジョブ内で直接実行することができます。With the AnomalyDetection_SpikeAndDip and AnomalyDetection_ChangePoint functions, you can perform anomaly detection directly in your Stream Analytics job.

機械学習モデルでは、均等にサンプリングされたタイム シリーズを想定しています。The machine learning models assume a uniformly sampled time series. タイム シリーズが均等でない場合は、異常検出を呼び出す前に、タンブリング ウィンドウを使って集計手順を挿入してもかまいません。If the time series is not uniform, you may insert an aggregation step with a tumbling window prior to calling anomaly detection.

現時点で、機械学習の処理は、季節性の傾向や多変量の相関関係には対応していません。The machine learning operations do not support seasonality trends or multi-variate correlations at this time.

モデルの動作Model behavior

一般に、スライディング ウィンドウ内のデータが多いほどモデルの精度が向上します。Generally, the model's accuracy improves with more data in the sliding window. 指定したスライディング ウィンドウ内のデータは、その期間の正常範囲の値の一部として扱われます。The data in the specified sliding window is treated as part of its normal range of values for that time frame. モデルで現在のイベントの異常性を調べるときは、スライディング ウィンドウ内のイベント履歴のみが考慮されます。The model only considers event history over the sliding window to check if the current event is anomalous. スライディング ウィンドウが動くと、モデルのトレーニングから古い値が削除されます。As the sliding window moves, old values are evicted from the model’s training.

この機能は、これまでの履歴を基に一定の標準を確立することによって動作します。The functions operate by establishing a certain normal based on what they have seen so far. 外れ値は、信頼度レベル内で、確立された標準と比較することで識別されます。Outliers are identified by comparing against the established normal, within the confidence level. 異常が発生したときに認識できるように、ウィンドウのサイズは、正常な動作でモデルをトレーニングするのに必要な最小イベント数を基準にしてください。The window size should be based on the minimum events required to train the model for normal behavior so that when an anomaly occurs, it would be able to recognize it.

履歴のサイズが大きくなると、比較対象となる過去のイベント数も増えるため、モデルの応答時間が増えます。The model's response time increases with history size because it needs to compare against a higher number of past events. パフォーマンス向上のため、イベントは必要な数だけ含めることをお勧めします。It is recommended to only include the necessary number of events for better performance.

タイム シリーズにギャップがある場合、モデルが特定の時点のイベントを受け取っていないことが原因である可能性があります。Gaps in the time series can be a result of the model not receiving events at certain points in time. このような状況は、Stream Analytics の補完ロジックによって処理されます。This situation is handled by Stream Analytics using imputation logic. 同じスライディング ウィンドウの履歴サイズと期間の両方を使用して、イベントの平均出現率が計算されます。The history size, as well as a time duration, for the same sliding window is used to calculate the average rate at which events are expected to arrive.

ここで公開されている異常ジェネレーターは、IoT Hub にさまざまな異常パターン データをフィードするために使用できます。An anomaly generator available here can be used to feed an Iot Hub with data with different anomaly patterns. ASA ジョブをこの異常検知機能と合わせて設定すると、この IoT Hub から読み取って異常を検出することができます。An ASA job can be set up with these anomaly detection functions to read from this Iot Hub and detect anomalies.

スパイクとディップSpike and dip

タイム シリーズ イベント ストリーム内の一時的な異常は、スパイクとディップと呼ばれます。Temporary anomalies in a time series event stream are known as spikes and dips. スパイクとディップは、機械学習をベースにした演算子 AnomalyDetection_SpikeAndDip を使って監視できます。Spikes and dips can be monitored using the Machine Learning based operator, AnomalyDetection_SpikeAndDip.

スパイクとディップの異常の例

同じスライディング ウィンドウ内の 2 番目のスパイクが 1 番目のものよりも小さい場合、小さい方のスパイクに対して計算されるスコアが、指定した信頼度レベル内の最初のスパイクのスコアに比べて十分な大きさにならないことがあります。In the same sliding window, if a second spike is smaller than the first one, the computed score for the smaller spike is probably not significant enough compared to the score for the first spike within the confidence level specified. このような異常を検知するには、モデルの信頼度レベルの設定を低くしてみてください。You can try decreasing the model's confidence level to detect such anomalies. ただし、それによってアラートの数が増えすぎてしまう場合は、より高い信頼区間を使用できます。However, if you start to get too many alerts, you can use a higher confidence interval.

次のサンプル クエリでは、120 件のイベント履歴を持つ 2 分間のスライディング ウィンドウで、1 秒あたり 1 件という一定の割合でイベントの入力があるものとします。The following example query assumes a uniform input rate of one event per second in a 2-minute sliding window with a history of 120 events. 最後の SELECT ステートメントで、95% の信頼度レベルでスコアと異常状態を抽出して出力します。The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 95%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
            OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

変化点Change point

タイム シリーズ イベント ストリーム内の永続的な異常は、レベルの変化や傾向の変化などのような、イベント ストリーム内の値の分布内で起きる変化です。Persistent anomalies in a time series event stream are changes in the distribution of values in the event stream, like level changes and trends. Stream Analytics では、このような異常を機械学習をベースにした AnomalyDetection_ChangePoint 演算子を使って検出します。In Stream Analytics, such anomalies are detected using the Machine Learning based AnomalyDetection_ChangePoint operator.

永続的な変化は、スパイクやディップよりもはるかに長期間継続するため、致命的なイベントを示している可能性があります。Persistent changes last much longer than spikes and dips and could indicate catastrophic event(s). 永続的な変化は、肉眼では確認できないことがほとんどですが、AnomalyDetection_ChangePoint 演算子を使えば検出できます。Persistent changes are not usually visible to the naked eye, but can be detected with the AnomalyDetection_ChangePoint operator.

次の画像はレベルの変化の例です。The following image is an example of a level change:

レベルの変化の異常の例

次の画像は傾向の変化の例です。The following image is an example of a trend change:

傾向の変化の異常の例

次のサンプル クエリでは、1,200 件のイベント履歴サイズを持つ 20 分間のスライディング ウィンドウで、1 秒あたり 1 件という一定の割合でイベントの入力があるものとします。The following example query assumes a uniform input rate of one event per second in a 20-minute sliding window with a history size of 1200 events. 最後の SELECT ステートメントで、80% の信頼度レベルでスコアと異常状態を抽出して出力します。The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 80%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200) 
        OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
    ChangePointScore,
    CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
    IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep

パフォーマンス特性Performance characteristics

これらのモデルのパフォーマンスは、履歴のサイズ、ウィンドウ期間、イベントの負荷、関数レベルのパーティション分割を使用しているかどうかによります。The performance of these models depends on the history size, window duration, event load, and whether function level partitioning is used. このセクションでは、これらの構成について説明し、1 秒あたり 1,000 件、5,000 件、1 万件のイベントの取り込み速度を維持する方法のサンプルを提供します。This section discusses these configurations and provides samples for how to sustain ingestion rates of 1K, 5K and 10K events per second.

  • 履歴のサイズ - これらのモデルは、履歴のサイズに対して直線性を有します。History size - These models perform linearly with history size. 履歴のサイズが大きくなるほど、モデルが新しいイベントをスコア付けするためにかかる時間が長くなります。The longer the history size, the longer the models take to score a new event. これは、モデルが新しいイベントと履歴バッファー内にある過去の各イベントを比較するためです。This is because the models compare the new event with each of the past events in the history buffer.
  • ウィンドウ期間 - ウィンドウ期間は、履歴のサイズで指定された数のイベントの受信にかかる時間を反映する必要があります。Window duration - The Window duration should reflect how long it takes to receive as many events as specified by the history size. ウィンドウ内に多くのイベントがない場合、Azure Stream Analytics は欠損値を補完します。Without that many events in the window, Azure Stream Analytics would impute missing values. そのため、CPU 使用量は履歴のサイズの関数となります。Hence, CPU consumption is a function of the history size.
  • イベント負荷 - イベント負荷が大きいほどモデルの作業量が増え、CPU 使用量に影響を与えます。Event load - The greater the event load, the more work that is performed by the models, which impacts CPU consumption. ビジネス ロジックで入力パーティションを増やせると仮定すれば、ジョブを驚異的並列にすることでスケール アウトできます。The job can be scaled out by making it embarrassingly parallel, assuming it makes sense for business logic to use more input partitions.
  • 関数レベルのパーティション分割 - 関数レベルのパーティション分割は、異常検出関数呼び出し内で PARTITION BY を使用して行われます。Function level partitioning - Function level partitioning is done by using PARTITION BY within the anomaly detection function call. この種類のパーティション分割は、同時に複数のモデルで状態を維持する必要があるため、オーバーヘッドが増加します。This type of partitioning adds an overhead, as state needs to be maintained for multiple models at the same time. 関数レベルのパーティション分割は、デバイス レベルのパーティション分割などのシナリオで使用されます。Function level partitioning is used in scenarios like device level partitioning.

リレーションシップRelationship

履歴のサイズ、ウィンドウ期間、およびイベント負荷の合計は、次のように関連します。The history size, window duration, and total event load are related in the following way:

ウィンドウ期間 (ミリ秒) = 1000 * 履歴のサイズ / (1 秒あたりの入力イベントの合計 / 入力パーティション数)windowDuration (in ms) = 1000 * historySize / (Total Input Events Per Sec / Input Partition Count)

関数を deviceId でパーティション分割する場合は、異常検出関数呼び出しに「PARTITION BY deviceId」を追加します。When partitioning the function by deviceId, add “PARTITION BY deviceId” to the anomaly detection function call.

観測値Observations

次の表には、パーティション分割されていない場合の 1 つのノード (6 SU) のスループット観測値が含まれています。The following table includes the throughput observations for a single node (6 SU) for the non-partitioned case:

履歴のサイズ (イベント)History size (events) ウィンドウ期間 (ミリ秒)Window duration (ms) 1 秒あたりの入力イベントの合計Total input events per sec
6060 5555 2,2002,200
600600 728728 1,6501,650
6,0006,000 10,91010,910 1,1001,100

次の表には、パーティション分割されている場合の 1 つのノード (6 SU) のスループット観測値が含まれています。The following table includes the throughput observations for a single node (6 SU) for the partitioned case:

履歴のサイズ (イベント)History size (events) ウィンドウ期間 (ミリ秒)Window duration (ms) 1 秒あたりの入力イベントの合計Total input events per sec デバイス数Device count
6060 1,0911,091 1,1001,100 1010
600600 10,91010,910 1,1001,100 1010
6,0006,000 218,182218,182 <550<550 1010
6060 21,81921,819 550550 100100
600600 218,182218,182 550550 100100
6,0006,000 2,181,8192,181,819 <550<550 100100

上記のパーティション分割されていない構成を実行するサンプル コードは、Azure サンプルの streaming-at-scale リポジトリにあります。Sample code to run the non-partitioned configurations above is located in the Streaming At Scale repo of Azure Samples. このコードでは、関数レベルのパーティション分割が行われていないストリーム分析ジョブを作成し、入力および出力として Event Hub を使用しています。The code creates a stream analytics job with no function level partitioning, which uses Event Hub as input and output. 入力の負荷は、テスト クライアントを使用して生成されます。The input load is generated using test clients. 各入力イベントは、1 KB の json ドキュメントです。Each input event is a 1KB json document. イベントは、JSON データを送信する IoT デバイスをシミュレートします (最大 1,000 台のデバイス)。Events simulate an IoT device sending JSON data (for up to 1K devices). 履歴のサイズ、ウィンドウ期間、およびイベント負荷の合計は、2 つの入力パーティションで異なります。The history size, window duration, and total event load are varied over 2 input partitions.

注意

見積もりの精度を高めるには、ご使用のシナリオに合わせてサンプルをカスタマイズしてください。For a more accurate estimate, customize the samples to fit your scenario.

ボトルネックの特定Identifying bottlenecks

Azure Stream Analytics ジョブの [メトリックス] ウィンドウを使用して、パイプラインのボトルネックを特定します。Use the Metrics pane in your Azure Stream Analytics job to identify bottlenecks in your pipeline. スループットについての [Input/Output Events](入出力イベント) および [透かしの遅延] または [Backlogged Events](バックログされたイベント) を確認して、ジョブが入力速度に対応しているかどうかを確認します。Review Input/Output Events for throughput and "Watermark Delay" or Backlogged Events to see if the job is keeping up with the input rate. イベント ハブのメトリックスについては、 [Throttled Requests] (スロットルされた要求数) を検索し、必要に応じてしきい値ユニットを調整します。For Event Hub metrics, look for Throttled Requests and adjust the Threshold Units accordingly. Cosmos DB メトリックスについては、スループットの下の [パーティション キーの範囲ごとの使用された最大 RU/秒] を確認して、パーティション キーの範囲が均一に消費されていることを確認します。For Cosmos DB metrics, review Max consumed RU/s per partition key range under Throughput to ensure your partition key ranges are uniformly consumed. Azure SQL DB については、 [ログ IO] および [CPU] を監視します。For Azure SQL DB, monitor Log IO and CPU.

Azure Stream Analytics で機械学習を使用した異常検出Anomaly detection using machine learning in Azure Stream Analytics

次の動画では、Azure Stream Analytics の機械学習機能を使ってリアルタイムで異常を検出する方法を示します。The following video demonstrates how to detect an anomaly in real time using machine learning functions in Azure Stream Analytics.

次の手順Next steps