Κοινή χρήση μέσω


Εντοπισμός πολλαπλών ανωμαλιών με δάσος απομόνωσης

Αυτό το άρθρο δείχνει πώς μπορείτε να χρησιμοποιήσετε το SynapseML στο Apache Spark για τον εντοπισμό πολλαπλών ανωμαλιών. Ο εντοπισμός πολλαπλών ανωμαλιών επιτρέπει τον εντοπισμό ανωμαλιών μεταξύ πολλών μεταβλητών ή χρονικών ακολουθιών, λαμβάνοντας υπόψη όλες τις αλληλεπιδράσεις και τις εξαρτήσεις μεταξύ των διαφορετικών μεταβλητών. Σε αυτό το σενάριο, χρησιμοποιούμε το SynapseML για την εκπαίδευση ενός μοντέλου Isolation Forest για τον εντοπισμό πολλαπλών ανωμαλιών και, στη συνέχεια, χρησιμοποιούμε το στο εκπαιδευμένο μοντέλο για να συνάγουμε πολυμεταβλημένες ανωμαλίες μέσα σε ένα σύνολο δεδομένων που περιέχει συνθετικές μετρήσεις από τρεις αισθητήρες IoT.

Για να μάθετε περισσότερα σχετικά με το μοντέλο Isolation Forest, ανατρέξτε στην αρχική εφημερίδα του Liu et al..

Προαπαιτούμενα στοιχεία

  • Επισυνάψτε το σημειωματάριό σας σε ένα lakehouse. Στην αριστερή πλευρά, επιλέξτε Προσθήκη για να προσθέσετε μια υπάρχουσα λίμνη ή να δημιουργήσετε μια λίμνη.

Εισαγωγές βιβλιοθήκης

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Δεδομένα εισόδου

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Ανάγνωση δεδομένων

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

μετατροπή στηλών σε κατάλληλους τύπους δεδομένων

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Προετοιμασία δεδομένων εκπαίδευσης

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Δοκιμή προετοιμασίας δεδομένων

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Μοντέλο δάσους απομόνωσης εκπαίδευσης

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Στη συνέχεια, δημιουργούμε μια διοχέτευση εκμάθησης μηχανής για την εκπαίδευση του μοντέλου Δάσος απομόνωσης. Επιδεικνύουμε επίσης πώς μπορείτε να δημιουργήσετε ένα πείραμα MLflow και να καταχωρήσετε το εκπαιδευμένο μοντέλο.

Η καταχώρηση του μοντέλου MLflow απαιτείται αυστηρά μόνο εάν αποκτήσετε πρόσβαση στο εκπαιδευμένο μοντέλο αργότερα. Για την εκπαίδευση του μοντέλου και την εκτέλεση συμπερασματών στο ίδιο σημειωματάριο, το μοντέλο αντικειμένου μοντέλου επαρκεί.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Εκτέλεση συμπερασματολόγησης

Φόρτωση του εκπαιδευμένου μοντέλου δάσους απομόνωσης

Εκτέλεση συμπερασματολόγησης

df_test_pred = model.transform(df_test)
display(df_test_pred)

Προκατασκευασίες Πρόγραμμα εντοπισμού ανωμαλιών

Πρόγραμμα εντοπισμού ανωμαλιών AI του Azure

  • Κατάσταση ανωμαλίας του πιο πρόσφατου σημείου: Δημιουργεί ένα μοντέλο χρησιμοποιώντας προηγούμενα σημεία και προσδιορίζει εάν το πιο πρόσφατο σημείο είναι ανώμαλα (Scala, Python)
  • Εύρεση ανωμαλιών: δημιουργεί ένα μοντέλο χρησιμοποιώντας μια ολόκληρη σειρά και εντοπίζει ανωμαλίες στη σειρά (Scala, Python)