Создание конвейера пакетного вывода

Завершено

Чтобы создать конвейер пакетного вывода, выполните следующие задачи.

1. Регистрация модели

Чтобы использовать обученную модель в конвейере пакетного вывода, ее необходимо зарегистрировать в рабочей области Машинного обучения Azure.

Чтобы зарегистрировать модель из локального файла, можно использовать метод register объекта Model, как показано в следующем примере кода.

from azureml.core import Model

classification_model = Model.register(workspace=your_workspace,
                                      model_name='classification_model',
                                      model_path='model.pkl', # local path
                                      description='A classification model')

Кроме того, если есть ссылка на объект Run, использовавшийся для обучения модели, можно применить его метод register_model, как показано в следующем примере кода:

run.register_model( model_name='classification_model',
                    model_path='outputs/model.pkl', # run outputs path
                    description='A classification model')

2. Создание сценария оценки

Службе пакетного вывода требуется сценарий оценки для загрузки модели и использования ее для прогнозирования новых значений. Он должен включать две функции:

  • init(): вызывается при инициализации конвейера;
  • run(mini_batch): вызывается для каждого пакета данных, подлежащих обработке.

Как правило, для загрузки модели из реестра модели используется функция init, а для создания прогнозов на основе каждого пакета данных и возврата результатов — функция run. Это показано в следующем примере скрипта:

import os
import numpy as np
from azureml.core import Model
import joblib

def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('classification_model')
    model = joblib.load(model_path)

def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for model input
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

3. Создание конвейера с помощью ParallelRunStep

Машинное обучение Azure предоставляет тип шага конвейера, предназначенный специально для параллельного вывода пакетов. С помощью класса ParallelRunStep можно считывать пакеты файлов из набора данных File и записывать выходные данные обработки в OutputFileDatasetConfig. Кроме того, можно задать параметр output_action для шага append_row, который гарантирует, что результаты всех экземпляров шага, выполняемых параллельно, будут объединены в один выходной файл с именем parallel_run_step.txt:

from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.core import Pipeline

# Get the batch dataset for input
batch_data_set = ws.datasets['batch-data']

# Set the output location
default_ds = ws.get_default_datastore()
output_dir = OutputFileDatasetConfig(name='inferences')

# Define the parallel run step step configuration
parallel_run_config = ParallelRunConfig(
    source_directory='batch_scripts',
    entry_script="batch_scoring_script.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=aml_cluster,
    node_count=4)

# Create the parallel run step
parallelrun_step = ParallelRunStep(
    name='batch-score',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('batch_data')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)
# Create the pipeline
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])

4. Запуск конвейера и получение выходных данных шага

После определения конвейера его можно запустить и дождаться его завершения. Затем можно получить файл parallel_run_step.txt из выходных данных шага, чтобы просмотреть результаты, как показано в следующем примере кода.

from azureml.core import Experiment

# Run the pipeline as an experiment
pipeline_run = Experiment(ws, 'batch_prediction_pipeline').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

# Get the outputs from the first (and only) step
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='results')

# Find the parallel_run_step.txt file
for root, dirs, files in os.walk('results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# Load and display the results
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]
print(df)