Share via


Spark 작업에서 MLflow 모델 배포 및 실행

이 문서에서는 Spark 작업에서 MLflow 모델을 배포하고 실행하여 대량의 데이터 또는 데이터 랭글링 작업의 일부로 유추를 수행하는 방법을 알아봅니다.

이 예에 대해

이 예제에서는 Azure Machine Learning에 등록된 MLflow 모델을 관리형 Spark 클러스터(미리 보기), Azure Databricks 또는 Azure Synapse Analytics에서 실행되는 Spark 작업에 배포하여 많은 양의 데이터에 대한 유추를 수행하는 방법을 보여 줍니다.

이 모델은 UCI 심장 질환 데이터 집합을 기반으로 합니다. 데이터베이스에는 76개의 특성이 포함되어 있지만, 여기서는 그 중 14개만 사용합니다. 이 모델은 환자의 심장병 유무를 예측하려고 시도합니다. 값은 0(심장병 없음)에서 1(심장병 있음) 사이의 정수입니다. 분류자를 사용하여 XGBBoost 학습되었으며 필요한 모든 전처리가 파이프라인으로 scikit-learn 패키지되어 이 모델을 원시 데이터에서 예측으로 가는 엔드 투 엔드 파이프라인으로 만듭니다.

이 문서의 정보는 azureml-examples 리포지토리에 포함된 코드 샘플을 기반으로 합니다. 파일을 복사/붙여넣지 않고 로컬로 명령을 실행하려면 리포지토리를 복제한 다음 디렉터리를 로 변경합니다 sdk/using-mlflow/deploy.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

필수 조건

이 문서의 단계를 수행하기 전에 다음과 같은 필수 구성 요소가 있는지 확인합니다.

  • MLflow SDK 패키지 mlflow 및 MLflow azureml-mlflow용 Azure Machine Learning 플러그 인을 설치합니다.

    pip install mlflow azureml-mlflow
    

    SQL Storage, 서버, UI 또는 데이터 과학 종속성이 없는 경량 MLflow 패키지인 패키지를 사용할 mlflow-skinny 수 있습니다. mlflow-skinny 는 배포를 비롯한 전체 기능 모음을 가져오지 않고 MLflow의 추적 및 로깅 기능이 주로 필요한 사용자에게 권장됩니다.

  • Azure Machine Learning 작업 영역 기계 학습 리소스 만들기 자습서따라 만들 수 있습니다.

    • 작업 영역에서 MLflow 작업을 수행하는 데 필요한 액세스 권한을 확인합니다.
  • 원격 추적(즉, Azure Machine Learning 외부에서 실행되는 실험 추적)을 수행하는 경우 Azure Machine Learning 작업 영역의 추적 URI를 가리키도록 MLflow를 구성합니다. MLflow를 작업 영역에 연결하는 방법에 대한 자세한 내용은 Azure Machine Learning용 MLflow 구성을 참조하세요.

작업 영역에 연결

먼저 모델이 등록된 Azure Machine Learning 작업 영역에 연결해 보겠습니다.

추적이 이미 구성되어 있습니다. MLflow로 작업할 때도 기본 자격 증명이 사용됩니다.

모델 등록

유추를 수행하려면 Azure Machine Learning 레지스트리에 등록된 모델이 필요합니다. 이 예제의 경우 리포지토리에 모델의 로컬 복사본이 이미 있으므로 작업 영역의 레지스트리에만 모델을 게시하면 됩니다. 배포하려는 모델이 이미 등록된 경우 이 단계를 건너뛸 수 있습니다.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

또는 모델이 실행 내부에 기록된 경우 직접 등록할 수 있습니다.

모델을 등록하려면 모델이 저장된 위치를 알아야 합니다. MLflow의 autolog 기능을 사용하는 경우 경로는 사용 중인 모델의 형식 및 프레임워크에 따라 달라집니다. 작업 출력을 검사 이 폴더의 이름을 식별하는 것이 좋습니다. MLModel이라는 파일이 포함된 폴더를 찾을 수 있습니다. log_model을 사용하여 모델을 수동으로 로깅하는 경우 경로는 해당 메서드에 전달하는 인수입니다. 예를 들어 모델을 사용하여 mlflow.sklearn.log_model(my_model, "classifier")로그하는 경우 모델이 저장되는 경로는 다음과 같습니다 classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

참고 항목

경로 MODEL_PATH는 모델이 실행에 저장된 위치입니다.


점수를 매기기 위한 입력 데이터 가져오기

실행하거나 작업을 실행하려면 몇 가지 입력 데이터가 필요합니다. 이 예제에서는 인터넷에서 샘플 데이터를 다운로드하여 Spark 클러스터에서 사용하는 공유 스토리지에 배치합니다.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

데이터를 전체 클러스터에서 사용할 수 있는 탑재된 스토리지 계정으로 이동합니다.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Important

이전 코드에서는 Azure Databricks 클러스터에서 사용할 수 있는 도구인 이 도구를 사용합니다 dbutils. 사용 중인 플랫폼에 따라 적절한 도구를 사용합니다.

그런 다음 입력 데이터가 다음 폴더에 배치됩니다.

input_data_path = "dbfs:/data"

Spark 클러스터에서 모델 실행

다음 섹션에서는 Spark 작업의 Azure Machine Learning에 등록된 MLflow 모델을 실행하는 방법을 설명합니다.

  1. 클러스터에 다음 라이브러리가 설치되어 있는지 확인합니다.

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Notebook을 사용하여 Azure Machine Learning에 등록된 MLflow 모델을 사용하여 점수 매기기 루틴을 만드는 방법을 보여 줍니다. Notebook을 만들고 PySpark를 기본 언어로 사용합니다.

  3. 필요한 네임스페이스를 가져옵니다.

    import mlflow
    import pyspark.sql.functions as f
    
  4. 모델 URI를 구성합니다. 다음 URI는 최신 버전에서 명명된 heart-classifier 모델을 제공합니다.

    model_uri = "models:/heart-classifier/latest"
    
  5. 모델을 UDF 함수로 로드합니다. UDF(사용자 정의 함수)는 사용자 환경에서 사용자 지정 논리를 다시 사용할 수 있도록 사용자가 정의한 함수입니다.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    인수 result_type 를 사용하여 함수에서 반환된 형식을 제어합니다 predict() .

  6. 채점할 데이터를 읽습니다.

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    이 경우 입력 데이터는 형식이며 CSV 폴더 dbfs:/data/에 배치됩니다. 또한 이 데이터 세트에 예측할 대상 변수가 포함되어 있으므로 열을 target 삭제합니다. 프로덕션 시나리오에서는 데이터에 이 열이 없습니다.

  7. 함수 predict_function 를 실행하고 새 열에 예측을 배치합니다. 이 경우 열 predictions에 예측을 배치합니다.

    df.withColumn("predictions", score_function(*df.columns))
    

    predict_function 필요한 열을 인수로 받습니다. 이 경우 데이터 프레임의 모든 열이 모델에 df.columns 의해 예상되므로 사용됩니다. 모델에 열의 하위 집합이 필요한 경우 수동으로 도입할 수 있습니다. 모델에 서명이 있는 경우 형식은 입력과 예상 형식 간에 호환되어야 합니다.

  8. 예측을 스토리지에 다시 쓸 수 있습니다.

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Azure Machine Learning에서 독립 실행형 Spark 작업에서 모델 실행

Azure Machine Learning은 독립 실행형 Spark 작업 만들기 및 Azure Machine Learning 파이프라인에서 사용할 수 있는 재사용 가능한 Spark 구성 요소 만들기를 지원합니다. 이 예제에서는 Azure Machine Learning 독립 실행형 Spark 작업에서 실행되고 MLflow 모델을 실행하여 유추를 수행하는 채점 작업을 배포합니다.

참고 항목

Azure Machine Learning의 Spark 작업에 대한 자세한 내용은 Azure Machine Learning에서 Spark 작업 제출(미리 보기)을 참조하세요.

  1. Spark 작업에는 인수를 사용하는 Python 스크립트가 필요합니다. 다음 채점 스크립트를 만듭니다.

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    위의 스크립트는 세 개의 인수 --model--input_data--scored_data사용합니다. 처음 두 가지는 입력이며 실행하려는 모델과 입력 데이터를 나타내며, 마지막 입력은 출력이며 예측이 배치되는 출력 폴더입니다.

    Python 패키지 설치: 이전 점수 매기기 스크립트는 MLflow 모델을 UDF 함수에 로드하지만 매개 변수 env_manager="conda"를 나타냅니다. 이 매개 변수가 설정되면 MLflow는 UDF 함수만 실행되는 격리된 환경에서 모델 정의에 지정된 대로 필요한 패키지를 복원합니다. 자세한 내용은 설명서를 참조 mlflow.pyfunc.spark_udf 하세요.

  2. 작업 정의를 만듭니다.

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    연결된 Synapse Spark 풀을 사용하려면 속성 대신 위에 표시된 샘플 YAML 사양 파일에서 resources 속성을 정의 compute 합니다.

  3. 위에 표시된 YAML 파일을 az ml job create 명령에 --file 매개 변수와 함께 사용하여 다음과 같이 독립 실행형 Spark 작업을 만들 수 있습니다.

    az ml job create -f mlflow-score-spark-job.yml
    

다음 단계