Share via


Esercitazione: Usare Rilevamento anomalie multivariate in Azure Synapse Analytics

Importante

A partire dal 20 settembre 2023 non sarà possibile creare nuove risorse Rilevamento anomalie. Il servizio Rilevamento anomalie viene ritirato il 1° ottobre 2026.

Usare questa esercitazione per rilevare anomalie tra più variabili in Azure Synapse Analytics in set di dati e database molto grandi. Questa soluzione è perfetta per scenari come la manutenzione predittiva delle apparecchiature. La potenza sottostante deriva dall'integrazione con SynapseML, una libreria open source che mira a semplificare la creazione di pipeline di Machine Learning con scalabilità massiccia. Può essere installato e usato in qualsiasi infrastruttura Spark 3, tra cui il computer locale, Databricks, Synapse Analytics e altri.

Questa esercitazione illustra come:

  • Usare Azure Synapse Analytics per rilevare anomalie tra più variabili in Synapse Analytics.
  • Eseguire il training di un modello di rilevamento anomalie multivariato e inferenza in notebook separati in Synapse Analytics.
  • Ottenere il risultato del rilevamento anomalie e l'analisi della causa radice per ogni anomalia.

Prerequisiti

In questa sezione verranno create le risorse seguenti nel portale di Azure:

  • Una risorsa Rilevamento anomalie per ottenere l'accesso alla funzionalità multivariate Rilevamento anomalie.
  • Una risorsa di Azure Synapse Analytics per l'uso di Synapse Studio.
  • Un account Archiviazione per caricare i dati per il training del modello e il rilevamento delle anomalie.
  • Risorsa dell'insieme di credenziali delle chiavi per contenere la chiave di Rilevamento anomalie e il stringa di connessione dell'account Archiviazione.

Creare Rilevamento anomalie e risorse di Azure Synapse Analytics

Creare una risorsa dell'account di archiviazione

  • Creare una risorsa dell'account di archiviazione nel portale di Azure. Dopo aver compilato l'account di archiviazione, creare un contenitore per archiviare i dati intermedi, poiché SynapseML trasformerà i dati originali in uno schema supportato da Multivariate Rilevamento anomalie. (Fare riferimento a Schema di input multivariato Rilevamento anomalie)

    Nota

    Ai fini di questo esempio viene impostata solo la sicurezza nel contenitore per consentire l'accesso in lettura anonimo per contenitori e BLOB, perché conterrà solo i dati csv di esempio. Per scopi diversi dalla demo, questa opzione non è consigliata.

    A screenshot of the creating a container in a storage account.

Creare un insieme di credenziali delle chiavi per contenere Rilevamento anomalie account di archiviazione e chiave stringa di connessione

  • Creare un insieme di credenziali delle chiavi e configurare segreti e accesso

    1. Creare un insieme di credenziali delle chiavi nel portale di Azure.

    2. Passare a Criteri di accesso di Key Vault > e concedere all'area di lavoro di Azure Synapse l'autorizzazione per leggere i segreti da Azure Key Vault.

      A screenshot of granting permission to Synapse.

  • Creare un segreto in Key Vault per contenere la chiave Rilevamento anomalie

    1. Passare alla risorsa Rilevamento anomalie, Rilevamento anomalie> Chiavi ed Endpoint. Copiare quindi uno dei due tasti negli Appunti.

    2. Passare a Key Vault Secret (Segreto dell'insieme di>credenziali delle chiavi) per creare un nuovo segreto. Specificare il nome del segreto e quindi incollare la chiave del passaggio precedente nel campo Valore . Infine, selezionare Crea.

      A screenshot of the creating a secret.

  • Creare un segreto in Key Vault per contenere Connessione stringa di Archiviazione account

    1. Passare alla risorsa dell'account Archiviazione, selezionare Chiavi di accesso per copiare una delle stringhe di Connessione ion.

      A screenshot of copying connection string.

    2. Passare quindi a Key Vault>Secret (Segreto dell'insieme di credenziali delle chiavi) per creare un nuovo segreto. Specificare il nome del segreto (ad esempio myconnectionstring) e quindi incollare la stringa di Connessione ion del passaggio precedente nel campo Valore. Infine, selezionare Crea.

Uso di un notebook per eseguire il rilevamento anomalie multivariato in Synapse Analytics

Creare un notebook e un pool di Spark

  1. Accedere ad Azure Synapse Analytics e creare un nuovo notebook per la codifica.

    A screenshot of creating notebook in Synapse.

  2. Selezionare Gestisci pool nella pagina del notebook per creare un nuovo pool di Apache Spark, se non ne è disponibile uno.

    A screenshot of creating spark pool.

Scrittura di codice nel notebook

  1. Installare la versione più recente di SynapseML con i modelli Spark di rilevamento anomalie. È anche possibile installare SynapseML in pacchetti Spark, Databricks, Docker e così via. Fare riferimento alla home page di SynapseML.

    Se si usa Spark 3.1, usare il codice seguente:

    %%configure -f
    {
      "name": "synapseml",
      "conf": {
          "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.5-13-d1b51517-SNAPSHOT",
          "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
          "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12",
          "spark.yarn.user.classpath.first": "true"
      }
    }
    

    Se si usa Spark 3.2, usare il codice seguente:

    %%configure -f
    {
      "name": "synapseml",
      "conf": {
          "spark.jars.packages": " com.microsoft.azure:synapseml_2.12:0.9.5 ",
          "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
          "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,io.netty:netty-tcnative-boringssl-static",
          "spark.yarn.user.classpath.first": "true"
      }
    }
    
  2. Importare i moduli e le librerie necessari.

    from synapse.ml.cognitive import *
    from notebookutils import mssparkutils
    import numpy as np
    import pandas as pd
    import pyspark
    from pyspark.sql.functions import col
    from pyspark.sql.functions import lit
    from pyspark.sql.types import DoubleType
    import synapse.ml
    
  3. Caricare i dati. Comporre i dati nel formato seguente e caricarli in un archivio cloud supportato da Spark come un account Archiviazione di Azure. La colonna timestamp deve essere in ISO8601 formato e le colonne delle funzionalità devono essere string di tipo .

        df = spark.read.format("csv").option("header", True).load("wasbs://[container_name]@[storage_account_name].blob.core.windows.net/[csv_file_name].csv")
    
        df = df.withColumn("sensor_1", col("sensor_1").cast(DoubleType())) \
            .withColumn("sensor_2", col("sensor_2").cast(DoubleType())) \
            .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))
    
        df.show(10)
    

    A screenshot of raw data.

  4. Eseguire il training di un modello di rilevamento anomalie multivariato.

    A screenshot of training parameter.

        #Input your key vault name and anomaly key name in key vault.
        anomalyKey = mssparkutils.credentials.getSecret("[key_vault_name]","[anomaly_key_secret_name]")
        #Input your key vault name and connection string name in key vault.
        connectionString = mssparkutils.credentials.getSecret("[key_vault_name]", "[connection_string_secret_name]")
    
        #Specify information about your data.
        startTime = "2021-01-01T00:00:00Z"
        endTime = "2021-01-02T09:18:00Z"
        timestampColumn = "timestamp"
        inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
        #Specify the container you created in Storage account, you could also initialize a new name here, and Synapse will help you create that container automatically.
        containerName = "[container_name]"
        #Set a folder name in Storage account to store the intermediate data.
        intermediateSaveDir = "intermediateData"
    
        simpleMultiAnomalyEstimator = (FitMultivariateAnomaly()
            .setSubscriptionKey(anomalyKey)
        #In .setLocation, specify the region of your Anomaly Detector resource, use lowercase letter like: eastus.
            .setLocation("[anomaly_detector_region]")
            .setStartTime(startTime)
            .setEndTime(endTime)
            .setContainerName(containerName)
            .setIntermediateSaveDir(intermediateSaveDir)
            .setTimestampCol(timestampColumn)
            .setInputCols(inputColumns)
            .setSlidingWindow(200)
            .setConnectionString(connectionString))
    

    Attivare il processo di training tramite questi codici.

    model = simpleMultiAnomalyEstimator.fit(df)
    type(model)
    
  5. Processo di inferenza del trigger.

    startInferenceTime = "2021-01-02T09:19:00Z"
    endInferenceTime = "2021-01-03T01:59:00Z"
    result = (model
          .setStartTime(startInferenceTime)
          .setEndTime(endInferenceTime)
          .setOutputCol("results")
          .setErrorCol("errors")
          .setTimestampCol(timestampColumn)
          .setInputCols(inputColumns)
          .transform(df))
    
  6. Ottenere i risultati dell'inferenza.

    rdf = (result.select("timestamp",*inputColumns, "results.contributors", "results.isAnomaly", "results.severity").orderBy('timestamp', ascending=True).filter(col('timestamp') >= lit(startInferenceTime)).toPandas())
    
    def parse(x):
        if type(x) is list:
            return dict([item[::-1] for item in x])
        else:
            return {'series_0': 0, 'series_1': 0, 'series_2': 0}
    
    rdf['contributors'] = rdf['contributors'].apply(parse)
    rdf = pd.concat([rdf.drop(['contributors'], axis=1), pd.json_normalize(rdf['contributors'])], axis=1)
    rdf
    

    I risultati dell'inferenza verranno visualizzati come seguiti. severity è un numero compreso tra 0 e 1, che mostra il grado grave di anomalia. Le ultime tre colonne indicano l'oggetto contribution score di ogni sensore, maggiore è il numero, più anomalo è il sensore. A screenshot of inference result.

Pulire i dati intermedi (facoltativo)

Per impostazione predefinita, il rilevamento anomalie caricherà automaticamente i dati in un account di archiviazione in modo che il servizio possa elaborare i dati. Per pulire i dati intermedi, è possibile eseguire i codici seguenti.

simpleMultiAnomalyEstimator.cleanUpIntermediateData()
model.cleanUpIntermediateData()

Usare il modello sottoposto a training in un altro notebook con ID modello (facoltativo)

Se è necessario eseguire codice di training e codice di inferenza in notebook separati in Synapse, è prima possibile ottenere l'ID modello e usare tale ID per caricare il modello in un altro notebook creando un nuovo oggetto.

  1. Ottenere l'ID del modello nel notebook di training.

    model.getModelId()
    
  2. Caricare il modello nel notebook di inferenza.

    retrievedModel = (DetectMultivariateAnomaly()
        .setSubscriptionKey(anomalyKey)
        .setLocation("eastus")
        .setOutputCol("result")
        .setStartTime(startTime)
        .setEndTime(endTime)
        .setContainerName(containerName)
        .setIntermediateSaveDir(intermediateSaveDir)
        .setTimestampCol(timestampColumn)
        .setInputCols(inputColumns)
        .setConnectionString(connectionString)
        .setModelId('5bXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXe9'))
    

Altre informazioni

Informazioni su Rilevamento anomalie

Informazioni su Synapse