您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 Azure 机器学习对大量数据运行批处理推理Run batch inference on large amounts of data by using Azure Machine Learning

适用于:是基本版是企业版               (升级到企业版APPLIES TO: yesBasic edition yesEnterprise edition                    (Upgrade to Enterprise edition)

了解如何使用 Azure 机器学习以异步方式和并行方式处理大量数据。Learn how to process large amounts of data asynchronously and in parallel by using Azure Machine Learning. 此处所述的 ParallelRunStep 功能当前为公共预览版。The ParallelRunStep capability described here is in public preview. 这是一种用于生成推理和处理数据的高性能和高吞吐量的方式。It's a high-performance and high-throughput way to generate inferences and processing data. 它提供现成的异步功能。It provides asynchronous capabilities out of the box.

通过 ParallelRunStep,可以轻松地将离线推理扩展到生产数据 TB 级的大型计算机群集,从而提高工作效率和优化成本。With ParallelRunStep, it's straightforward to scale offline inferences to large clusters of machines on terabytes of production data resulting in improved productivity and optimized cost.

本文介绍如何执行以下任务:In this article, you learn the following tasks:

  • 创建远程计算资源。Create a remote compute resource.
  • 编写自定义推理脚本。Write a custom inference script.
  • 创建机器学习管道,以根据 MNIST 数据集注册预先训练的图像分类模型。Create a machine learning pipeline to register a pre-trained image classification model based on the MNIST dataset.
  • 使用该模型可对 Azure Blob 存储帐户中提供的示例映像运行批处理推理。Use the model to run batch inference on sample images available in your Azure Blob storage account.

先决条件Prerequisites

  • 如果没有 Azure 订阅,请在开始操作前先创建一个免费帐户。If you don't have an Azure subscription, create a free account before you begin. 试用 Azure 机器学习免费版或付费版Try the free or paid version of the Azure Machine Learning.

  • 对于引导式快速入门,如果你还没有 Azure 机器学习的工作区或笔记本虚拟机,请完成安装教程For a guided quickstart, complete the setup tutorial if you don't already have an Azure Machine Learning workspace or notebook virtual machine.

  • 若要管理你自己的环境和依赖项,请参阅关于配置你自己的环境的操作指南To manage your own environment and dependencies, see the how-to guide on configuring your own environment. 在你的环境中运行 pip install azureml-sdk[notebooks] azureml-pipeline-core azureml-contrib-pipeline-steps 以下载必要的依赖项。Run pip install azureml-sdk[notebooks] azureml-pipeline-core azureml-contrib-pipeline-steps in your environment to download the necessary dependencies.

设置机器学习资源Set up machine learning resources

以下操作设置运行批处理推理管道所需的资源:The following actions set up the resources that you need to run a batch inference pipeline:

  • 创建一个数据存储以指向包含要推理的映像的 blob 容器。Create a datastore that points to a blob container that has images to inference.
  • 将数据引用设置为批处理推理管道步骤的输入和输出。Set up data references as inputs and outputs for the batch inference pipeline step.
  • 设置计算群集以运行批处理推理步骤。Set up a compute cluster to run the batch inference step.

使用示例映像创建数据存储Create a datastore with sample images

在名为 pipelinedata 的帐户上获取公共 blob 容器 sampledata 中的 MNIST 评估集。Get the MNIST evaluation set from the public blob container sampledata on an account named pipelinedata. 创建一个指向此容器的名为 mnist_datastore 的数据存储。Create a datastore with the name mnist_datastore, which points to this container. 在对 register_azure_blob_container 的以下调用中,将 overwrite 标志设置为 True 会覆盖以前用该名称创建的所有数据存储。In the following call to register_azure_blob_container, setting the overwrite flag to True overwrites any datastore that was created previously with that name.

可以通过为 datastore_namecontainer_nameaccount_name 提供自己的值,将此步骤更改为指向 blob 容器。You can change this step to point to your blob container by providing your own values for datastore_name, container_name, and account_name.

from azureml.core import Datastore
from azureml.core import Workspace

# Load workspace authorization details from config.json
ws = Workspace.from_config()

mnist_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name="mnist_datastore", 
                      container_name="sampledata", 
                      account_name="pipelinedata",
                      overwrite=True)

接下来,将工作区的默认数据存储指定为输出数据存储。Next, specify the workspace default datastore as the output datastore. 你将使用它进行推理输出。You'll use it for inference output.

创建工作区时,会默认将 Azure 文件存储 和 Blob 存储 附加到工作区。When you create your workspace, Azure Files and Blob storage are attached to the workspace by default. Azure 文件存储是工作区的默认数据存储,但你也可以使用 Blob 存储作为数据存储。Azure Files is the default datastore for a workspace, but you can also use Blob storage as a datastore. 有关详细信息,请参阅 Azure 存储选项For more information, see Azure storage options.

def_data_store = ws.get_default_datastore()

配置数据输入和输出Configure data inputs and outputs

现在你需要配置数据输入和输出,其中包括:Now you need to configure data inputs and outputs, including:

  • 包含输入图像的目录。The directory that contains the input images.
  • 存储预先训练的模型的目录。The directory where the pre-trained model is stored.
  • 包含标签的目录。The directory that contains the labels.
  • 输出的目录。The directory for output.

Dataset 是一个用于在 Azure 机器学习中浏览、转换和管理数据的类。Dataset is a class for exploring, transforming, and managing data in Azure Machine Learning. 此类有两种类型:TabularDatasetFileDatasetThis class has two types: TabularDataset and FileDataset. 在此示例中,将使用 FileDataset 作为批处理推理管道步骤的输入。In this example, you'll use FileDataset as the inputs to the batch inference pipeline step.

备注

现在,批处理推理中的 FileDataset 支持仅限于 Azure Blob 存储。FileDataset support in batch inference is restricted to Azure Blob storage for now.

你还可以在自定义推理脚本中引用其他数据集。You can also reference other datasets in your custom inference script. 例如,你可以使用它来访问脚本中的标签,以便使用 Dataset.registerDataset.get_by_name 来标记映像。For example, you can use it to access labels in your script for labeling images by using Dataset.register and Dataset.get_by_name.

有关 Azure 机器学习数据集的详细信息,请参阅创建和访问数据集(预览版)For more information about Azure Machine Learning datasets, see Create and access datasets (preview).

PipelineData 对象用于在管道步骤之间传输中间数据。PipelineData objects are used for transferring intermediate data between pipeline steps. 在此示例中,将其用于推理输出。In this example, you use it for inference outputs.

from azureml.core.dataset import Dataset

mnist_ds_name = 'mnist_sample_data'

path_on_datastore = mnist_blob.path('mnist/')
input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)
registered_mnist_ds = input_mnist_ds.register(ws, mnist_ds_name, create_new_version=True)
named_mnist_ds = registered_mnist_ds.as_named_input(mnist_ds_name)

output_dir = PipelineData(name="inferences", 
                          datastore=def_data_store, 
                          output_path_on_compute="mnist/results")

设置计算目标Set up a compute target

在 Azure 机器学习中,计算(或计算目标)是指在机器学习管道中执行计算步骤的计算机或群集。In Azure Machine Learning, compute (or compute target) refers to the machines or clusters that perform the computational steps in your machine learning pipeline. 运行以下代码以创建基于 CPU 的 AmlCompute 目标。Run the following code to create a CPU based AmlCompute target.

from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException

# choose a name for your cluster
compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME", "cpu-cluster")
compute_min_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MIN_NODES", 0)
compute_max_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MAX_NODES", 4)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size = os.environ.get("AML_COMPUTE_CLUSTER_SKU", "STANDARD_D2_V2")


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                                min_nodes = compute_min_nodes, 
                                                                max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

准备模型Prepare the model

下载预先训练的映像分类模型,然后将其解压缩到 models 目录。Download the pre-trained image classification model, and then extract it to the models directory.

import os
import tarfile
import urllib.request

model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

url="https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz"
response = urllib.request.urlretrieve(url, "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall(model_dir)

然后,将该模型注册到你的工作区,使其可用于你的远程计算资源。Then register the model with your workspace so it's available to your remote compute resource.

from azureml.core.model import Model

# Register the downloaded model 
model = Model.register(model_path="models/",
                       model_name="mnist",
                       tags={'pretrained': "mnist"},
                       description="Mnist trained tensorflow model",
                       workspace=ws)

编写推理脚本Write your inference script

警告

下面的代码只是示例笔记本使用的示例。The following code is only a sample that the sample notebook uses. 你需要为方案创建自己的脚本。You'll need to create your own script for your scenario.

脚本必须包含 两个函数:The script must contain two functions:

  • init()设置用户帐户 :此函数适用于后续推理的任何成本高昂或常见的准备工作。init(): Use this function for any costly or common preparation for later inference. 例如,使用它将模型加载到全局对象。For example, use it to load the model into a global object. 此函数将在进程开始时调用一次。This function will be called only once at beginning of process.
  • run(mini_batch)设置用户帐户 :将针对每个 mini_batch 实例运行此函数。run(mini_batch): The function will run for each mini_batch instance.
    • mini_batch设置用户帐户 :并行运行步骤将调用 run 方法,并将列表或 Pandas 数据帧作为参数传递给该方法。mini_batch: Parallel run step will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. min_batch 中的每个条目是文件路径(如果输入是 FileDataset)或 Pandas 数据帧(如果输入是 TabularDataset)。Each entry in min_batch will be - a file path if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.
    • response:run() 方法应返回 Pandas 数据帧或数组。response: run() method should return a Pandas DataFrame or an array. 对于 append_row output_action,这些返回的元素将追加到公共输出文件中。For append_row output_action, these returned elements are appended into the common output file. 对于 summary_only,将忽略元素的内容。For summary_only, the contents of the elements are ignored. 对于所有的输出操作,每个返回的输出元素都指示输入微型批处理中输入元素的一次成功运行。For all output actions, each returned output element indicates one successful run of input element in the input mini-batch. 你应确保运行结果中包含足够的数据,以便将输入映射到运行结果。You should make sure that enough data is included in run result to map input to run result. 运行输出将写入输出文件中,并且不保证按顺序写入,你应使用输出中的某个键将其映射到输入。Run output will be written in output file and not guaranteed to be in order, you should use some key in the output to map it to input.
# Snippets from a sample script.
# Refer to the accompanying digit_identification.py
# (https://aka.ms/batch-inference-notebooks)
# for the implementation script.

import os
import numpy as np
import tensorflow as tf
from PIL import Image
from azureml.core import Model


def init():
    global g_tf_sess

    # Pull down the model from the workspace
    model_path = Model.get_model_path("mnist")

    # Construct a graph to execute
    tf.reset_default_graph()
    saver = tf.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta'))
    g_tf_sess = tf.Session()
    saver.restore(g_tf_sess, os.path.join(model_path, 'mnist-tf.model'))


def run(mini_batch):
    print(f'run method start: {__file__}, run({mini_batch})')
    resultList = []
    in_tensor = g_tf_sess.graph.get_tensor_by_name("network/X:0")
    output = g_tf_sess.graph.get_tensor_by_name("network/output/MatMul:0")

    for image in mini_batch:
        # Prepare each image
        data = Image.open(image)
        np_im = np.array(data).reshape((1, 784))
        # Perform inference
        inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess)
        # Find the best probability, and add it to the result list
        best_result = np.argmax(inference_result)
        resultList.append("{}: {}".format(os.path.basename(image), best_result))

    return resultList

如何在 entry_script 中访问源目录中的其他文件How to access other files in source directory in entry_script

如果入口脚本所在的同一目录中包含另一个文件或文件夹,可以通过查找当前工作目录来引用此文件或文件夹。If you have another file or folder in the same directory as your entry script, you can reference it by finding the current working directory.

script_dir = os.path.realpath(os.path.join(__file__, '..',))
file_path = os.path.join(script_dir, "<file_name>")

构建并运行包含 ParallelRunStep 的管道Build and run the pipeline containing ParallelRunStep

现在,你有了生成管道所需的一切。Now you have everything you need to build the pipeline.

准备运行环境Prepare the run environment

首先,指定脚本的依赖项。First, specify the dependencies for your script. 稍后创建管道步骤时,将使用此对象。You use this object later when you create the pipeline step.

from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

batch_conda_deps = CondaDependencies.create(pip_packages=["tensorflow==1.13.1", "pillow"])

batch_env = Environment(name="batch_environment")
batch_env.python.conda_dependencies = batch_conda_deps
batch_env.docker.enabled = True
batch_env.docker.base_image = DEFAULT_GPU_IMAGE
batch_env.spark.precache_packages = False

为批处理推理管道步骤指定参数Specify the parameters for your batch inference pipeline step

ParallelRunConfig 是新引入的批处理推理 ParallelRunStep 实例在 Azure 机器学习管道中的主要配置。ParallelRunConfig is the major configuration for the newly introduced batch inference ParallelRunStep instance within the Azure Machine Learning pipeline. 使用它来包装脚本并配置所需的参数,包括以下所有参数:You use it to wrap your script and configure necessary parameters, including all of the following parameters:

  • entry_script设置用户帐户 :作为将在多个节点上并行运行的本地文件路径的用户脚本。entry_script: A user script as a local file path that will be run in parallel on multiple nodes. 如果 source_directory 存在,则使用相对路径。If source_directory is present, use a relative path. 否则,请使用计算机上可访问的任何路径。Otherwise, use any path that's accessible on the machine.
  • mini_batch_size设置用户帐户 :传递给单个 run() 调用的微型批处理的大小。mini_batch_size: The size of the mini-batch passed to a single run() call. (可选;对于 FileDataset,默认值为 10 文件,对于 TabularDataset,默认值为 1MB。)(optional; the default value is 10 files for FileDataset and 1MB for TabularDataset.)
    • 对于 FileDataset,它是最小值为 1 的文件数。For FileDataset, it's the number of files with a minimum value of 1. 可以将多个文件合并成一个微型批处理。You can combine multiple files into one mini-batch.
    • 对于 TabularDataset,它是数据的大小。For TabularDataset, it's the size of data. 示例值为 10241024KB10MB1GBExample values are 1024, 1024KB, 10MB, and 1GB. 建议值为 1MBThe recommended value is 1MB. TabularDataset 中的微批永远不会跨越文件边界。The mini-batch from TabularDataset will never cross file boundaries. 例如,如果你有各种大小的 .csv 文件,最小的文件为 100 KB,最大的文件为 10 MB。For example, if you have .csv files with various sizes, the smallest file is 100 KB and the largest is 10 MB. 如果设置 mini_batch_size = 1MB,则大小小于 1 MB 的文件将被视为一个微型批处理。If you set mini_batch_size = 1MB, then files with a size smaller than 1 MB will be treated as one mini-batch. 大小大于 1 MB 的文件将被拆分为多个微型批处理。Files with a size larger than 1 MB will be split into multiple mini-batches.
  • error_threshold设置用户帐户 :在处理过程中应忽略的 TabularDataset 记录失败数和 FileDataset 文件失败数。error_threshold: The number of record failures for TabularDataset and file failures for FileDataset that should be ignored during processing. 如果整个输入的错误计数超出此值,则作业将中止。If the error count for the entire input goes above this value, the job will be aborted. 错误阈值适用于整个输入,而非适用于发送给 run() 方法的单个微型批处理。The error threshold is for the entire input and not for individual mini-batches sent to the run() method. 范围为 [-1, int.max]The range is [-1, int.max]. -1 部分指示在处理过程中忽略所有失败。The -1 part indicates ignoring all failures during processing.
  • output_action设置用户帐户 :以下值之一指示将如何组织输出:output_action: One of the following values indicates how the output will be organized:
    • summary_only设置用户帐户 :用户脚本将存储输出。summary_only: The user script will store the output. ParallelRunStep 仅将输出用于错误阈值计算。ParallelRunStep will use the output only for the error threshold calculation.
    • append_row设置用户帐户 :对于所有输入文件,将只在输出文件夹中创建一个文件,以追加所有按行分隔的输出。append_row: For all input files, only one file will be created in the output folder to append all outputs separated by line. 文件名将是 parallel_run_step.txtThe file name will be parallel_run_step.txt.
  • source_directory设置用户帐户 :文件夹的路径,这些文件夹包含要在计算目标上执行的所有文件(可选)。source_directory: Paths to folders that contain all files to execute on the compute target (optional).
  • compute_target设置用户帐户 :仅支持 AmlComputecompute_target: Only AmlCompute is supported.
  • node_count设置用户帐户 :用于运行用户脚本的计算节点数。node_count: The number of compute nodes to be used for running the user script.
  • process_count_per_node设置用户帐户 :每个节点的进程数。process_count_per_node: The number of processes per node.
  • environment设置用户帐户 :Python 环境定义。environment: The Python environment definition. 你可以将其配置为使用现有的 Python 环境进行试验,或设置临时环境进行试验。You can configure it to use an existing Python environment or to set up a temporary environment for the experiment. 定义还负责设置所需的应用程序依赖项(可选)。The definition is also responsible for setting the required application dependencies (optional).
  • logging_level设置用户帐户 :日志详细程度。logging_level: Log verbosity. 递增详细程度的值为:WARNINGINFODEBUGValues in increasing verbosity are: WARNING, INFO, and DEBUG. (可选;默认值为 INFO(optional; the default value is INFO)
  • run_invocation_timeout设置用户帐户 :run() 方法调用超时(以秒为单位)。run_invocation_timeout: The run() method invocation timeout in seconds. (可选;默认值为 60(optional; default value is 60)
from azureml.contrib.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    source_directory=scripts_folder,
    entry_script="digit_identification.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=compute_target,
    node_count=4)

创建管道步骤Create the pipeline step

使用脚本、环境配置和参数创建管道步骤。Create the pipeline step by using the script, environment configuration, and parameters. 将已附加到工作区的计算目标指定为脚本的执行目标。Specify the compute target that you already attached to your workspace as the target of execution for the script. 使用 ParallelRunStep 创建批处理推理管道步骤,该步骤采用以下所有参数:Use ParallelRunStep to create the batch inference pipeline step, which takes all the following parameters:

  • name设置用户帐户 :步骤的名称,但具有以下命名限制:唯一、3-32 个字符和正则表达式 ^[a-z]([-a-z0-9]*[a-z0-9])?$。name: The name of the step, with the following naming restrictions: unique, 3-32 characters, and regex ^[a-z]([-a-z0-9]*[a-z0-9])?$.
  • models设置用户帐户 :在 Azure 机器学习模型注册表中已注册的零个或多个模型名称。models: Zero or more model names already registered in the Azure Machine Learning model registry.
  • parallel_run_config设置用户帐户 :ParallelRunConfig 对象,如前文所述。parallel_run_config: A ParallelRunConfig object, as defined earlier.
  • inputs设置用户帐户 :一个或多个单类型 Azure 机器学习数据集。inputs: One or more single-typed Azure Machine Learning datasets.
  • output设置用户帐户 :与输出目录相对应的 PipelineData 对象。output: A PipelineData object that corresponds to the output directory.
  • arguments设置用户帐户 :传递给用户脚本的参数列表(可选)。arguments: A list of arguments passed to the user script (optional).
  • allow_reuse设置用户帐户 :当使用相同的设置/输入运行时,该步骤是否应重用以前的结果。allow_reuse: Whether the step should reuse previous results when run with the same settings/inputs. 如果此参数为 False,则在管道执行过程中将始终为此步骤生成新的运行。If this parameter is False, a new run will always be generated for this step during pipeline execution. (可选;默认值为 True。)(optional; the default value is True.)
from azureml.contrib.pipeline.steps import ParallelRunStep

parallelrun_step = ParallelRunStep(
    name="batch-mnist",
    models=[model],
    parallel_run_config=parallel_run_config,
    inputs=[named_mnist_ds],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

备注

上一步依赖于 azureml-contrib-pipeline-steps,如必备组件中所述。The above step depends on azureml-contrib-pipeline-steps, as described in Prerequisites.

提交管道Submit the pipeline

现在请运行管道。Now, run the pipeline. 首先,使用工作区引用和创建的管道步骤创建一个 Pipeline 对象。First, create a Pipeline object by using your workspace reference and the pipeline step that you created. steps 参数是步骤数组。The steps parameter is an array of steps. 在本例中,批量评分只有一个步骤。In this case, there's only one step for batch scoring. 若要生成包含多个步骤的管道,请将步骤按顺序放入此数组。To build pipelines that have multiple steps, place the steps in order in this array.

接下来,使用 Experiment.submit() 函数提交管道以供执行。Next, use the Experiment.submit() function to submit the pipeline for execution.

from azureml.pipeline.core import Pipeline
from azureml.core.experiment import Experiment

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'digit_identification').submit(pipeline)

监视并行运行作业Monitor the parallel run job

批处理推理作业可能需要很长时间才能完成。A batch inference job can take a long time to finish. 此示例使用 Jupyter 小组件监视进度。This example monitors progress by using a Jupyter widget. 你还可以使用以下项来管理作业的进度:You can also manage the job's progress by using:

  • Azure 机器学习工作室。Azure Machine Learning Studio.
  • 来自 PipelineRun 对象的控制台输出。Console output from the PipelineRun object.
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

pipeline_run.wait_for_completion(show_output=True)

后续步骤Next steps

若要了解此过程的端到端运行机制,请尝试批处理推理笔记本To see this process working end to end, try the batch inference notebook.

有关 ParallelRunStep 的调试和故障排除指导,请参阅操作指南For debugging and troubleshooting guidance for ParallelRunStep, see the how-to guide.

有关管道的调试和故障排除指南,请参阅操作指南For debugging and troubleshooting guidance for pipelines, see the how-to guide.

阅读使用 Jupyter 笔记本探索此服务一文,了解如何运行笔记本。Learn how to run notebooks by following the article Use Jupyter notebooks to explore this service.