Bereitstellen und Ausführen von MLflow-Modellen in Spark-Aufträgen

In diesem Artikel erfahren Sie, wie Sie Ihr MLflow-Modell in Spark-Aufträgen bereitstellen und ausführen, um Rückschlüsse zu großen Datenmengen oder als Teil von Data Wrangling-Aufträgen zu ermöglichen.

Über dieses Beispiel

Dieses Beispiel zeigt, wie Sie ein in Azure Machine Learning registriertes MLflow-Modell für Spark-Aufträge bereitstellen können, die in verwalteten Spark-Clustern (Vorschau), Azure Databricks oder Azure Synapse Analytics ausgeführt werden, um Rückschlüsse bei großen Datenmengen zu ermöglichen.

Das Modell basiert auf dem UCI Heart Disease Data Set. Die Datenbank enthält 76 Attribute, von denen wir eine Teilmenge von 14 verwenden. Das Modell versucht, das Vorhandensein einer Herzerkrankung bei einem Patienten vorherzusagen. Der Wert ist eine ganze Zahl, entweder 0 (nicht vorhanden) oder 1 (vorhanden). Es wurde mit einem XGBBoost-Klassifizierer trainiert, und alle erforderlichen Vorverarbeitungen wurden in einer scikit-learn-Pipeline zusammengefasst. Somit handelt es sich bei diesem Modell um eine End-to-End-Pipeline von den Rohdaten bis zu den Vorhersagen.

Die Informationen in diesem Artikel basieren auf Codebeispielen, die im Repository azureml-examples enthalten sind. Klonen Sie das Repository, und wechseln Sie dann in das Verzeichnis sdk/using-mlflow/deploy, um die Befehle lokal auszuführen, ohne Dateien kopieren/einfügen zu müssen.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Voraussetzungen

Stellen Sie vor dem Ausführen der Schritte in diesem Artikel sicher, dass Sie über die folgenden erforderlichen Komponenten verfügen:

  • Installieren Sie das Mlflow SDK-Paket mlflow und das Azure Machine Learning-Plug-In für MLflow azureml-mlflow.

    pip install mlflow azureml-mlflow
    

    Tipp

    Sie können auch das Paket mlflow-skinny verwenden. Dabei handelt es sich um ein abgespecktes MLflow-Paket ohne SQL-Speicher, Server, Benutzeroberfläche oder Data Science-Abhängigkeiten. mlflow-skinny wird für diejenigen empfohlen, die in erster Linie die Funktionen für Nachverfolgung und Protokollierung benötigen und nicht sämtliche Funktionen von MLflow (einschließlich Bereitstellungen) importieren möchten.

  • Ein Azure Machine Learning-Arbeitsbereich. Sie können eines erstellen, indem Sie das Tutorial „Erstellen von Ressourcen für maschinelles Lernen“ ausführen.

  • Wenn Sie eine Remote-Nachverfolgung durchführen (d.h. Nachverfolgung von Experimenten, die außerhalb von Azure Machine Learning ausgeführt werden), konfigurieren Sie MLflow so, dass es auf den Nachverfolgungs-URI Ihres Azure Machine Learning-Arbeitsbereichs verweist. Weitere Informationen zum Herstellen einer Verbindung zwischen MLflow und dem Arbeitsbereich finden Sie unter Konfigurieren von MLflow für Azure Machine Learning.

  • Sie müssen ein MLflow-Modell in Ihrem Arbeitsbereich registriert haben. In diesem Beispiel wird insbesondere ein Modell registriert, das für das Dataset zu Diabetes trainiert wurde.

Herstellen einer Verbindung mit Ihrem Arbeitsbereich

Zunächst stellen wir eine Verbindung mit dem Azure Machine Learning-Arbeitsbereich her, in dem Ihr Modell registriert ist.

Die Nachverfolgung ist bereits für Sie konfiguriert. Ihre Standardanmeldeinformationen werden auch beim Arbeiten mit MLflow verwendet.

Registrieren des Modells

Wir benötigen ein Modell, das in der Azure Machine Learning-Registrierung registriert ist, um Rückschlüsse zu ziehen. In diesem Fall ist bereits eine lokale Kopie des Modells im Repository vorhanden, daher muss das Modell nur in der Registrierung im Arbeitsbereich veröffentlicht werden. Sie können diesen Schritt überspringen, wenn das Modell, das Sie bereitstellen möchten, bereits registriert ist.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Wenn Ihr Modell innerhalb einer Ausführung protokolliert wurde, können Sie es auch direkt registrieren.

Tipp

Um das Modell zu registrieren, müssen Sie den Speicherort kennen, an dem das Modell gespeichert wurde. Wenn Sie das Feature autolog von MLflow verwenden, hängt der Pfad vom Typ und Framework des verwendeten Modells ab. Wir empfehlen, die Ausgabe des Auftrags zu überprüfen, um den Namen dieses Ordners zu ermitteln. Sie können nach dem Ordner suchen, der eine Datei mit dem Namen MLModel enthält. Wenn Sie Ihre Modelle manuell mit log_model protokollieren, ist der Pfad das Argument, das Sie an diese Methode übergeben. Wenn Sie das Modell beispielsweise mit mlflow.sklearn.log_model(my_model, "classifier") protokollieren, ist classifier der Pfad, in dem das Modell gespeichert ist.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Hinweis

Der Pfad MODEL_PATH ist der Speicherort, an dem das Modell in der Ausführung gespeichert wurde.


Abrufen zu bewertender Eingabedaten

Wir benötigen einige Eingabedaten, auf die Aufträge angewendet werden sollen. In diesem Beispiel laden wir Beispieldaten aus dem Internet herunter und legen sie in einem freigegebenen Speicher ab, der vom Spark-Cluster verwendet wird.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Verschieben Sie die Daten in ein eingebundenes Speicherkonto, das für den gesamten Cluster verfügbar ist.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Wichtig

Im vorherige Code wird mit dbutils ein Tool verwendet, das im Azure Databricks-Cluster verfügbar ist. Verwenden Sie das entsprechende Tool für die von Ihnen verwendete Plattform.

Die Eingabedaten werden dann im folgenden Ordner abgelegt:

input_data_path = "dbfs:/data"

Ausführen des Modells in Spark-Clustern

Im folgenden Abschnitt wird erläutert, wie Sie MLflow-Modelle ausführen, die in Azure Machine Learning in Spark-Aufträgen registriert sind.

  1. Stellen Sie sicher, dass die folgenden Bibliotheken im Cluster installiert sind:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Wir verwenden ein Notebook, um zu veranschaulichen, wie Sie mit einem in Azure Machine Learning registrierten MLflow-Modell eine Bewertungsroutine erstellen. Erstellen Sie ein Notebook, und verwenden Sie als Standardsprache PySpark.

  3. Importieren Sie die erforderlichen Namespaces:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Konfigurieren Sie den Modell-URI. Der folgende URI stellt ein Modell mit dem Namen heart-classifier in der neuesten Version bereit.

    model_uri = "models:/heart-classifier/latest"
    
  5. Laden Sie das Modell als eine benutzerdefinierte Funktion. Eine benutzerdefinierte Funktion (User-Defined Function, UDF) ist eine vom Benutzer definierte Funktion, mit der benutzerdefinierte Logik in der Benutzerumgebung wiederverwendet werden kann.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Tipp

    Verwenden Sie das Argument result_type, um den von der Funktion predict() zurückgegebenen Typ zu steuern.

  6. Lesen Sie die Daten, die Sie bewerten möchten:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    In unserem Fall liegen die Eingabedaten im Format CSV vor und befinden sich im Ordner dbfs:/data/. Wir lassen auch die Spalte target weg, da dieses Dataset die vorherzusagende Zielvariable enthält. In Produktionsszenarien weisen Ihre Daten diese Spalte nicht auf.

  7. Führen Sie die Funktion predict_function aus, und legen Sie die Vorhersagen in einer neuen Spalte ab. In diesem Fall legen wir die Vorhersagen in der Spalte predictions ab.

    df.withColumn("predictions", score_function(*df.columns))
    

    Tipp

    predict_function empfängt die erforderlichen Spalten als Argumente. In unserem Fall werden alle Spalten des Dataframes von dem Modell erwartet, weshalb df.columns verwendet wird. Wenn Ihr Modell eine Teilmenge der Spalten erfordert, können Sie sie manuell einführen. Wenn das Modell über eine Signatur verfügt, müssen Typen zwischen Eingaben und erwarteten Typen kompatibel sein.

  8. Sie können Ihre Vorhersagen zurück in den Speicher schreiben:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Ausführen des Modells in einem eigenständigen Spark-Auftrag in Azure Machine Learning

Azure Machine Learning unterstützt die Erstellung eines eigenständigen Spark-Auftrags und die Erstellung einer wiederverwendbaren Spark-Komponente, die in Azure Machine Learning-Pipelines verwendet werden kann. In diesem Beispiel stellen wir einen Bewertungsauftrag bereit, der in Azure Machine Learning als eigenständiger Spark-Auftrag ausgeführt wird und ein MLflow-Modell für Rückschlüsse einsetzt.

Hinweis

Weitere Informationen zu Spark-Aufträgen in Azure Machine Learning finden Sie unter Übermitteln von Spark-Aufträgen in Azure Machine Learning (Vorschau).

  1. Ein Spark-Auftrag erfordert ein Python-Skript, das Argumente akzeptiert. Erstellen Sie ein Bewertungsskript:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    Das obige Skript akzeptiert die drei Argumente --model, --input_data und --scored_data. Die ersten beiden sind Eingaben und stellen das Modell dar, das wir ausführen möchten, sowie die Eingabedaten. Die letzte ist eine Ausgabe und der Ausgabeordner, in dem die Vorhersagen gespeichert werden.

    Tipp

    Installation von Python-Paketen: Das vorherige Bewertungsskript lädt das MLflow-Modell in eine UDF-Funktion, gibt jedoch den Parameter env_manager="conda" an. Wenn dieser Parameter festgelegt ist, stellt MLflow die erforderlichen Pakete wie in der Modelldefinition in einer isolierten Umgebung wieder her, in der nur die UDF-Funktion ausgeführt wird. Weitere Informationen finden Sie in der Dokumentation mlflow.pyfunc.spark_udf.

  2. Erstellen Sie eine Auftragsdefinition:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Tipp

    Wenn Sie einen angefügten Synapse Spark-Pool verwenden möchten, definieren Sie im oben gezeigten Beispiel einer YAML-Spezifikationsdatei die Eigenschaft compute anstelle der Eigenschaft resources.

  3. Die oben gezeigten YAML-Dateien können im Befehl az ml job create mit dem Parameter --file verwendet werden, um einen eigenständigen Spark-Auftrag wie dargestellt zu erstellen:

    az ml job create -f mlflow-score-spark-job.yml
    

Nächste Schritte