Share via


Distribuire ed eseguire modelli MLflow in processi Spark

Questo articolo illustra come distribuire ed eseguire un modello MLflow in processi Spark per eseguire l'inferenza per grandi quantità di dati o nell’ambito di processi di data wrangling.

Informazioni su questo esempio

Questo esempio illustra come distribuire un modello MLflow registrato in Azure Machine Learning nei processi Spark in esecuzione in cluster Spark gestiti (anteprima), Azure Databricks o Azure Synapse Analytics per eseguire l'inferenza su grandi quantità di dati.

Il modello si basa sul set di dati UCI Heart Disease. Il database contiene 76 attributi ma viene usato un subset di 14 attributi. Il modello cerca di prevedere la presenza di una malattia cardiaca in un paziente. È un valore intero compreso tra 0 (assenza) e 1 (presenza). È stato eseguito il training usando un classificatore XGBBoost e tutta la pre-elaborazione necessaria è stata inserita in un pacchetto come pipeline scikit-learn, rendendo questo modello una pipeline end-to-end che va dai dati non elaborati alle previsioni.

Le informazioni contenute in questo articolo si basano sugli esempi di codice contenuti nel repository azureml-examples. Per eseguire i comandi in locale senza dover copiare/incollare file, è possibile clonare il repository e modificare le directory in sdk/using-mlflow/deploy.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Prerequisiti

Prima di seguire la procedura descritta in questo articolo, accertarsi di disporre dei prerequisiti seguenti:

  • Installare il pacchetto SDK di MLflow mlflow e il plug-in di Azure Machine Learning per MLflow azureml-mlflow.

    pip install mlflow azureml-mlflow
    

    Suggerimento

    È possibile usare il pacchetto mlflow-skinny, un pacchetto di MLflow leggero senza risorse di archiviazione SQL, server, interfaccia utente o dipendenze di data science. mlflow-skinny è consigliabile per gli utenti a cui occorrono principalmente le funzionalità di rilevamento e registrazione di MLflow senza l’importazione dell’intera suite di funzionalità incluse le distribuzioni.

  • Un'area di lavoro di Azure Machine Learning. È possibile crearne una seguendo l'esercitazione Creare risorse di Machine Learning.

  • Se si esegue il rilevamento remoto, cioè si rilevano esperimenti in esecuzione all'esterno di Azure Machine Learning, configurare MLflow in modo che punti all'URI di rilevamento dell'area di lavoro di Azure Machine Learning. Per altre informazioni su come connettere MLflow all'area di lavoro, vedere Configurare MLflow per Azure Machine Learning.

  • È necessario avere un modello MLflow registrato nella propria area di lavoro. In particolare, questo esempio registrerà un modello sottoposto a training per il set di dati Diabetes.

Connettersi all'area di lavoro

Connettersi innanzitutto all'area di lavoro di Azure Machine Learning in cui è registrato il modello.

Il rilevamento è già configurato automaticamente. Le credenziali predefinite verranno usate anche quando si usa MLflow.

Registrazione del modello

Per eseguire l'inferenza è necessario un modello registrato nel registro di Azure Machine Learning. In questo caso è già presente una copia locale del modello nel repository, per cui basta pubblicare il modello nel registro dell'area di lavoro. È possibile ignorare questo passaggio se il modello di cui si tenta la distribuzione è già registrato.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Se il modello è stato registrato all'interno di un'esecuzione, invece, è possibile registrarlo direttamente.

Suggerimento

Per registrare il modello è necessario conoscere la posizione in cui è stato archiviato. Se si usa la funzionalità autolog di MLflow, il percorso dipende dal tipo e dal framework del modello in uso. È consigliabile controllare l'output dei processi per identificare il nome di questa cartella. È possibile cercare la cartella contenente un file denominato MLModel. Se i modelli vengono registrati manualmente usando log_model, il percorso è l'argomento passato a tale metodo. Ad esempio, se si registra il modello usando mlflow.sklearn.log_model(my_model, "classifier"), il percorso in cui viene archiviato il modello è classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Nota

Il percorso MODEL_PATH è la posizione in cui il modello è stato archiviato nell'esecuzione.


Ottenere i dati di input a cui assegnare il punteggio

Sono necessari alcuni dati di input per l'esecuzione o i processi. In questo esempio verranno scaricati dati di esempio da Internet e verranno inseriti in una risorsa di archiviazione condivisa usata dal cluster Spark.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Spostare i dati in un account di archiviazione montato disponibile per l'intero cluster.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Importante

Il codice precedente usa dbutils, uno strumento disponibile nel cluster Azure Databricks. Usare lo strumento appropriato a seconda della piattaforma in uso.

I dati di input, quindi, vengono collocati nella cartella seguente:

input_data_path = "dbfs:/data"

Eseguire il modello in cluster Spark

La sezione seguente illustra come eseguire modelli MLflow registrati in Azure Machine Learning in processi Spark.

  1. Accertarsi che nel cluster siano installate le librerie seguenti:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Verrà usato un notebook per illustrare come creare una routine di assegnazione dei punteggi con un modello MLflow registrato in Azure Machine Learning. Creare un notebook e usare PySpark come linguaggio predefinito.

  3. Importare gli spazi dei nomi necessari:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Configurare l’URI del modello. L'URI seguente conduce a un modello denominato heart-classifier nella versione più recente.

    model_uri = "models:/heart-classifier/latest"
    
  5. Caricare il modello come funzione UDF. Una funzione UDF (User-Defined Function) è una funzione definita da un utente che consente di riutilizzare la logica personalizzata nell'ambiente dell’utente.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Suggerimento

    Usare l'argomento result_type per controllare il tipo restituito dalla funzione predict().

  6. Leggere i dati a cui assegnare il punteggio:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    In questo caso, i dati di input sono in formato CSV e sono collocati nella cartella dbfs:/data/. Si elimina anche la colonna target perché questo set di dati contiene la variabile di destinazione da stimare. Negli scenari di produzione i dati non avranno questa colonna.

  7. Eseguire la funzione predict_function e collocare le stime in una nuova colonna. In questo caso, le stime vengono collocate nella colonna predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Suggerimento

    predict_function riceve come argomenti le colonne necessarie. In questo caso, tutte le colonne del frame di dati sono previste dal modello, per cui si usa df.columns. Se il modello richiede un subset delle colonne, è possibile introdurle manualmente. Se il modello ha una firma, i tipi devono essere compatibili tra gli input e i tipi previsti.

  8. È possibile riscrivere le stime nella risorsa di archiviazione:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Eseguire il modello in un processo Spark autonomo in Azure Machine Learning

Azure Machine Learning supporta la creazione di un processo Spark autonomo e la creazione di un componente Spark riutilizzabile che può essere usato in pipeline di Azure Machine Learning. In questo esempio verrà distribuito un processo di assegnazione dei punteggi eseguito nel processo Spark autonomo di Azure Machine Learning che esegue un modello MLflow per eseguire l'inferenza.

Nota

Per altre informazioni su processi Spark in Azure Machine Learning, vedere Inviare processi Spark in Azure Machine Learning (anteprima).

  1. Un processo Spark richiede uno script Python che acquisisce argomenti. Creare uno script di assegnazione dei punteggi:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    Lo script precedente acquisisce tre argomenti: --model, --input_data e --scored_data. I primi due sono input e rappresentano il modello da eseguire e i dati di input, l'ultimo è un output ed è la cartella di output in cui verranno collocate le stime.

    Suggerimento

    Installazione di pacchetti Python: lo script di assegnazione dei punteggi precedente carica il modello MLflow in una funzione UDF, ma indica il parametro env_manager="conda". Quando questo parametro è impostato, MLflow ripristinerà i pacchetti necessari come specificato nella definizione del modello in un ambiente isolato in cui viene eseguita solo la funzione UDF. Per altri dettagli, consultare la documentazione di mlflow.pyfunc.spark_udf.

  2. Creare una definizione di processo:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Suggerimento

    Per usare un pool di Spark Synapse collegato, definire la proprietà compute nel file di specifica YAML di esempio mostrato sopra anziché la proprietà resources.

  3. Il file YAML mostrato sopra può essere usato nel comando az ml job create con il parametro --file per creare un processo Spark autonomo, come illustrato di seguito:

    az ml job create -f mlflow-score-spark-job.yml
    

Passaggi successivi