Share via


Spark işlerinde MLflow modellerini dağıtma ve çalıştırma

Bu makalede, büyük miktarda veriden çıkarım yapmak veya veri düzenleme işlerinin bir parçası olarak MLflow modelinizi Spark işlerinde dağıtmayı ve çalıştırmayı öğrenin.

Bu örnek hakkında

Bu örnekte, azure machine learning'de kayıtlı bir MLflow modelini yönetilen Spark kümelerinde (önizleme), Azure Databricks'te veya Azure Synapse Analytics'te çalıştırılan Spark işlerine dağıtarak büyük miktarda veriden çıkarım gerçekleştirebileceğiniz gösterilmektedir.

Model, UCI Kalp Hastalığı Veri Kümesini temel alır. Veritabanı 76 öznitelik içeriyor, ancak bunların 14'ünün alt kümesini kullanıyoruz. Model, bir hastada kalp hastalığının varlığını tahmin etmeye çalışır. 0 'dan (iletişim durumu yok) 1'e (iletişim durumu) değerlenen tamsayıdır. Sınıflandırıcı kullanılarak XGBBoost eğitilmiştir ve gerekli tüm ön işleme işlem scikit-learn hattı olarak paketlenmiştir ve bu da modeli ham verilerden tahminlere giden uçtan uca bir işlem hattı haline getirmiştir.

Bu makaledeki bilgiler, azureml-examples deposunda yer alan kod örneklerini temel alır. Dosyaları kopyalamak/yapıştırmak zorunda kalmadan komutları yerel olarak çalıştırmak için depoyu kopyalayın ve dizinleri olarak sdk/using-mlflow/deploydeğiştirin.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Önkoşullar

Bu makaledeki adımları takip etmeden önce aşağıdaki önkoşullara sahip olduğunuzdan emin olun:

  • MLflow SDK paketini mlflow ve MLflow azureml-mlflowiçin Azure Machine Learning eklentisini yükleyin.

    pip install mlflow azureml-mlflow
    

    İpucu

    SQL depolama, sunucu, kullanıcı arabirimi veya veri bilimi bağımlılıkları olmadan basit bir MLflow paketi olan paketini kullanabilirsiniz mlflow-skinny . mlflow-skinny dağıtımlar da dahil olmak üzere tüm özellik paketini içeri aktarmadan öncelikle MLflow'un izleme ve günlüğe kaydetme özelliklerine ihtiyaç duyan kullanıcılar için önerilir.

  • Azure Machine Learning çalışma alanı. Makine öğrenmesi kaynakları oluşturma öğreticisini izleyerek bir tane oluşturabilirsiniz.

  • Uzaktan izleme (yani Azure Machine Learning dışında çalışan izleme denemeleri) gerçekleştiriyorsanız, MLflow'ı Azure Machine Learning çalışma alanınızın izleme URI'sine işaret eden şekilde yapılandırın. MLflow'u çalışma alanınıza bağlama hakkında daha fazla bilgi için bkz . Azure Machine Learning için MLflow'u yapılandırma.

  • Çalışma alanınızda kayıtlı bir MLflow modeli olmalıdır. Özellikle bu örnekte Diabetes veri kümesi için eğitilmiş bir model kaydedilecektir.

Çalışma alanınıza bağlanma

İlk olarak, modelinizin kayıtlı olduğu Azure Machine Learning çalışma alanına bağlanalım.

İzleme sizin için zaten yapılandırılmış. MLflow ile çalışırken varsayılan kimlik bilgileriniz de kullanılır.

Modeli kaydetme

Çıkarım gerçekleştirmek için Azure Machine Learning kayıt defterine kaydedilmiş bir modele ihtiyacımız var. Bu durumda, depoda modelin yerel bir kopyası zaten var, bu nedenle modeli yalnızca çalışma alanında kayıt defterinde yayımlamamız gerekiyor. Dağıtmaya çalıştığınız model zaten kayıtlıysa bu adımı atlayabilirsiniz.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Alternatif olarak, modeliniz çalıştırmanın içinde günlüğe kaydedildiyse doğrudan kaydedebilirsiniz.

İpucu

Modeli kaydetmek için modelin depolandığı konumu bilmeniz gerekir. MLflow özelliğini kullanıyorsanız autolog yol, kullanılan modelin türüne ve çerçevesine bağlıdır. Bu klasörün adını belirlemek için işlerin çıkışını denetlemenizi öneririz. adlı MLModelbir dosya içeren klasörü arayabilirsiniz. kullanarak modellerinizi el ile log_modelgünlüğe günlüğe geçiriyorsanız yol, bu tür bir yönteme geçirdiğiniz bağımsız değişkendir. Örneğin, kullanarak mlflow.sklearn.log_model(my_model, "classifier")modeli günlüğe kaydederseniz, modelin depolandığı yol olur classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Not

Yol MODEL_PATH , modelin çalıştırmada depolandığı konumdur.


Puan almak için giriş verilerini alma

Üzerinde çalışmak veya işleri yapmak için bazı giriş verilerine ihtiyacımız olacak. Bu örnekte, örnek verileri İnternet'ten indireceğiz ve Spark kümesi tarafından kullanılan paylaşılan bir depolama alanına yerleştireceğiz.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Verileri kümenin tamamında kullanılabilen bağlı bir depolama hesabına taşıyın.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Önemli

Önceki kod, dbutilsAzure Databricks kümesinde kullanılabilen bir araçtır. Kullandığınız platforma bağlı olarak uygun aracı kullanın.

Giriş verileri daha sonra aşağıdaki klasöre yerleştirilir:

input_data_path = "dbfs:/data"

Spark kümelerinde modeli çalıştırma

Aşağıdaki bölümde, Spark işlerinde Azure Machine Learning'de kayıtlı MLflow modellerini çalıştırma işlemi açıklanmaktadır.

  1. Kümede aşağıdaki kitaplıkların yüklü olduğundan emin olun:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Azure Machine Learning'de kayıtlı bir MLflow modeliyle puanlama yordamı oluşturmayı göstermek için bir not defteri kullanacağız. Not defteri oluşturun ve pyspark'ı varsayılan dil olarak kullanın.

  3. Gerekli ad alanlarını içeri aktarın:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Model URI'sini yapılandırın. Aşağıdaki URI, en son sürümünde adlı heart-classifier bir model getirir.

    model_uri = "models:/heart-classifier/latest"
    
  5. Modeli UDF işlevi olarak yükleyin. Kullanıcı tanımlı işlev (UDF), kullanıcı tarafından tanımlanan ve özel mantığın kullanıcı ortamında yeniden kullanılmasına olanak sağlayan bir işlevdir.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    İpucu

    İşlev tarafından döndürülen türü denetlemek için bağımsız değişkenini result_typepredict() kullanın.

  6. Puanlamak istediğiniz verileri okuyun:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    Bizim örneğimizde giriş verileri biçimindedir CSV ve klasörüne dbfs:/data/yerleştirilir. Bu veri kümesi tahmin etmek için hedef değişkeni içerdiğinden sütunu target da bırakıyoruz. Üretim senaryolarında verileriniz bu sütuna sahip olmaz.

  7. İşlevi predict_function çalıştırın ve tahminleri yeni bir sütuna yerleştirin. Bu örnekte tahminleri sütununa predictionsyerleştiriyoruz.

    df.withColumn("predictions", score_function(*df.columns))
    

    İpucu

    , predict_function gerekli sütunları bağımsız değişken olarak alır. Bizim örneğimizde, veri çerçevesinin tüm sütunları model tarafından beklenir ve bu nedenle df.columns kullanılır. Modeliniz sütunların bir alt kümesini gerektiriyorsa bunları el ile tanıtabilirsiniz. Modeliniz bir imzaya sahipse, türlerin girişlerle beklenen türler arasında uyumlu olması gerekir.

  8. Tahminlerinizi depolama alanına geri yazabilirsiniz:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Azure Machine Learning'de modeli tek başına bir Spark işinde çalıştırma

Azure Machine Learning, tek başına spark işi oluşturmayı ve Azure Machine Learning işlem hatlarında kullanılabilecek yeniden kullanılabilir bir Spark bileşeni oluşturmayı destekler. Bu örnekte, Azure Machine Learning tek başına Spark işinde çalışan ve çıkarım yapmak için bir MLflow modeli çalıştıran bir puanlama işi dağıtacağız.

Not

Azure Machine Learning'deki Spark işleri hakkında daha fazla bilgi edinmek için bkz . Azure Machine Learning'de Spark işleri gönderme (önizleme).

  1. Spark işi, bağımsız değişkenleri alan bir Python betiği gerektirir. Puanlama betiği oluşturma:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    Yukarıdaki betik üç bağımsız değişken --model--input_data alır ve --scored_data. İlk ikisi girişlerdir ve çalıştırmak istediğimiz modeli ve giriş verilerini temsil eder; sonuncusu bir çıkıştır ve tahminlerin yerleştirileceği çıkış klasörüdür.

    İpucu

    Python paketlerinin yüklenmesi: Önceki puanlama betiği, MLflow modelini bir UDF işlevine yükler, ancak parametresini env_manager="conda"gösterir. Bu parametre ayarlandığında, MLflow yalnızca UDF işlevinin çalıştığı yalıtılmış bir ortamda model tanımında belirtilen gerekli paketleri geri yükler. Daha fazla ayrıntı için belgelere bakın mlflow.pyfunc.spark_udf .

  2. İş tanımı oluşturma:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    İpucu

    Ekli bir Synapse Spark havuzu kullanmak için, özellik yerine resources yukarıda gösterilen örnek YAML belirtim dosyasında özelliği tanımlayıncompute.

  3. Yukarıda gösterilen YAML dosyaları komutunda az ml job create parametresiyle birlikte --file , gösterildiği gibi tek başına bir Spark işi oluşturmak için kullanılabilir:

    az ml job create -f mlflow-score-spark-job.yml
    

Sonraki adımlar