您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 Azure 机器学习服务对大型数据集运行批量预测Run batch predictions on large data sets with Azure Machine Learning service

本文介绍如何使用 Azure 机器学习服务对大量数据进行异步预测。In this article, you'll learn how to make predictions on large quantities of data asynchronously, by using the Azure Machine Learning service.

批量预测(或批量评分)可以针对异步应用程序空前未有的吞吐量进行经济有效的推理。Batch prediction (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. 批量预测管道可以进行缩放,以便在数 TB 生产数据的基础上进行推理。Batch prediction pipelines can scale to perform inference on terabytes of production data. 批量预测针对高吞吐量的即发即弃预测进行优化,适用于大量的数据。Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.

提示

如果系统要求低延迟处理(以便快速处理单个文档或少量文档),则请使用实时评分而不是批量预测。If your system requires low-latency processing (to process a single document or small set of documents quickly), use real-time scoring instead of batch prediction.

在以下步骤中,需创建一个机器学习管道来注册预先训练的计算机视觉模型 (Inception-V3)。In the following steps, you create a machine learning pipeline to register a pretrained computer vision model (Inception-V3). 然后使用该预先训练的模型对 Azure Blob 存储帐户中提供的图像进行批量评分。Then you use the pretrained model to do batch scoring on images available in your Azure Blob storage account. 这些用于评分的图像是 ImageNet 数据集中的未标记图像。These images used for scoring are unlabeled images from the ImageNet dataset.

必备组件Prerequisites

设置机器学习资源Set up machine learning resources

以下步骤会设置运行管道所需的资源:The following steps set up the resources you need to run a pipeline:

  • 访问数据存储,该数据存储已经有用于评分的预先训练的模型、输入标签和图像(已经为你设置好)。Access the datastore that already has the pretrained model, input labels, and images to score (this is already set up for you).
  • 设置用于存储输出的数据存储。Set up a datastore to store your outputs.
  • 配置 DataReference 对象,使之指向前述数据存储中的数据。Configure DataReference objects to point to the data in the preceding datastores.
  • 设置将在其中运行管道步骤的计算机或计算群集。Set up compute machines or clusters where the pipeline steps will run.

访问数据存储Access the datastores

首先,访问包含模型、标签和图像的数据存储。First, access the datastore that has the model, labels, and images.

将使用 pipelinedata 帐户中名为 sampledata 的公共 Blob 容器,其中包含 ImageNet 评估集中的图像。You'll use a public blob container, named sampledata, in the pipelinedata account that holds images from the ImageNet evaluation set. 此公共容器的数据存储名称为 images_datastoreThe datastore name for this public container is images_datastore. 将此数据存储注册到工作区:Register this datastore with your workspace:

# Public blob container details
account_name = "pipelinedata"
datastore_name="images_datastore"
container_name="sampledata"
 
batchscore_blob = Datastore.register_azure_blob_container(ws,
                      datastore_name=datastore_name,
                      container_name= container_name,
                      account_name=account_name,
                      overwrite=True)

接下来进行设置,以便使用输出的默认数据存储。Next, set up to use the default datastore for the outputs.

创建工作区时,会默认将 Azure 文件存储 和 Blob 存储 附加到工作区。When you create your workspace, Azure Files and Blob storage are attached to the workspace by default. Azure 文件存储是工作区的默认数据存储,但你也可以使用 Blob 存储作为数据存储。Azure Files is the default datastore for a workspace, but you can also use Blob storage as a datastore. 有关详细信息,请参阅 Azure 存储选项For more information, see Azure storage options.

def_data_store = ws.get_default_datastore()

配置数据引用Configure data references

现在,请将管道中的数据作为管道步骤的输入引入。Now, reference the data in your pipeline as inputs to pipeline steps.

管道中的数据源由 DataReference 对象表示。A data source in a pipeline is represented by a DataReference object.  DataReference 对象指向驻留在数据存储中的或者可从数据存储访问的数据。The DataReference object points to data that lives in, or is accessible from, a datastore. 不管是用于输入图像的目录、用于存储预先训练的模型的目录、用于标签的目录还是输出目录,都需要 DataReference 对象。You need DataReference objects for the directory used for input images, the directory in which the pretrained model is stored, the directory for labels, and the output directory.

input_images = DataReference(datastore=batchscore_blob, 
                             data_reference_name="input_images",
                             path_on_datastore="batchscoring/images",
                             mode="download")
                           
model_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_model",
                          path_on_datastore="batchscoring/models",
                          mode="download")                          
                         
label_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_labels",
                          path_on_datastore="batchscoring/labels",
                          mode="download")                          
                         
output_dir = PipelineData(name="scores", 
                          datastore=def_data_store, 
                          output_path_on_compute="batchscoring/results")

设置计算目标Set up compute target

在 Azure 机器学习中,计算(或计算目标)是指在机器学习管道中执行计算步骤的计算机或群集。In Azure Machine Learning, compute (or compute target) refers to the machines or clusters that perform the computational steps in your machine learning pipeline. 例如,可以创建 Azure Machine Learning computeFor example, you can create an Azure Machine Learning compute.

compute_name = "gpucluster"
compute_min_nodes = 0
compute_max_nodes = 4
vm_size = "STANDARD_NC6"

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target. just use it. ' + compute_name)
else:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
                     vm_size = vm_size, # NC6 is GPU-enabled
                     vm_priority = 'lowpriority', # optional
                     min_nodes = compute_min_nodes, 
                     max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, 
                        compute_name, 
                        provisioning_config)
    
    compute_target.wait_for_completion(
                     show_output=True, 
                     min_node_count=None, 
                     timeout_in_minutes=20)

准备模型Prepare the model

在使用预先训练的模型之前,需下载该模型并将其注册到工作区。Before you can use the pretrained model, you'll need to download the model and register it with your workspace.

下载预先训练的模型Download the pretrained model

http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz 下载预先训练的计算机视觉模型 (InceptionV3)。Download the pretrained computer vision model (InceptionV3) from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz. 然后将其提取到 models 子文件夹。Then extract it to the models subfolder.

import os
import tarfile
import urllib.request

model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

url="http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz"
response = urllib.request.urlretrieve(url, "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall(model_dir)

注册模型Register the model

下面展示了如何注册模型:Here's how to register the model:

import shutil
from azureml.core.model import Model

# register downloaded model 
model = Model.register(
        model_path = "models/inception_v3.ckpt",
        model_name = "inception", # This is the name of the registered model
        tags = {'pretrained': "inception"},
        description = "Imagenet trained tensorflow inception",
        workspace = ws)

编写评分脚本Write your scoring script

警告

以下代码只是一个示例,说明了在示例笔记本使用的 batch_score.py 中包含了什么内容。The following code is only a sample of what is contained in the batch_score.py used by the sample notebook. 你需要根据具体情况创建自己的评分脚本。You’ll need to create your own scoring script for your scenario.

batch_score.py 脚本采用 dataset_path 中的输入图像和 model_dir 中的预先训练的模型,并将 results-label.txt 输出到 output_dirThe batch_score.py script takes input images in dataset_path, pretrained models in model_dir, and outputs results-label.txt to output_dir.

# Snippets from a sample scoring script
# Refer to the accompanying batch-scoring Notebook
# https://github.com/Azure/MachineLearningNotebooks/blob/master/pipeline/pipeline-batch-scoring.ipynb
# for the implementation script

# Get labels
def get_class_label_dict(label_file):
  label = []
  proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()
  for l in proto_as_ascii_lines:
    label.append(l.rstrip())
  return label

class DataIterator:
  # Definition of the DataIterator here
  
def main(_):
    # Refer to batch-scoring Notebook for implementation.
    label_file_name = os.path.join(args.label_dir, "labels.txt")
    label_dict = get_class_label_dict(label_file_name)
    classes_num = len(label_dict)
    test_feeder = DataIterator(data_dir=args.dataset_path)
    total_size = len(test_feeder.labels)

    # get model from model registry
    model_path = Model.get_model_path(args.model_name)
    with tf.Session() as sess:
        test_images = test_feeder.input_pipeline(batch_size=args.batch_size)
        with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
            input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])
            logits, _ = inception_v3.inception_v3(input_images,
                                                        num_classes=classes_num,
                                                        is_training=False)
            probabilities = tf.argmax(logits, 1)

        sess.run(tf.global_variables_initializer())
        sess.run(tf.local_variables_initializer())
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        saver = tf.train.Saver()
        saver.restore(sess, model_path)
        out_filename = os.path.join(args.output_dir, "result-labels.txt")
            
        # copy the file to artifacts
        shutil.copy(out_filename, "./outputs/")

生成并运行批量评分管道Build and run the batch scoring pipeline

有了生成管道所需的一切以后,即可将这一切组合在一起。You have everything you need to build the pipeline, so now put it all together.

准备运行环境Prepare the run environment

指定脚本的 conda 依赖项。Specify the conda dependencies for your script. 稍后创建管道步骤时,需要此对象。You'll need this object later, when you create the pipeline step.

from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.10.0", "azureml-defaults"])

# Runconfig
amlcompute_run_config = RunConfiguration(conda_dependencies=cd)
amlcompute_run_config.environment.docker.enabled = True
amlcompute_run_config.environment.docker.gpu_support = True
amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE
amlcompute_run_config.environment.spark.precache_packages = False

指定管道的参数Specify the parameter for your pipeline

使用带默认值的 PipelineParameter 对象创建一个管道参数。Create a pipeline parameter by using a PipelineParameter object with a default value.

batch_size_param = PipelineParameter(
                    name="param_batch_size", 
                    default_value=20)

创建管道步骤Create the pipeline step

使用脚本、环境配置和参数创建管道步骤。Create the pipeline step by using the script, environment configuration, and parameters. 将已附加到工作区的计算目标指定为执行脚本时的目标。Specify the compute target you already attached to your workspace as the target of execution of the script. 使用 PythonScriptStep 创建管道步骤。Use PythonScriptStep to create the pipeline step.

inception_model_name = "inception_v3.ckpt"

batch_score_step = PythonScriptStep(
    name="batch_scoring",
    script_name="batch_score.py",
    arguments=["--dataset_path", input_images, 
               "--model_name", "inception",
               "--label_dir", label_dir, 
               "--output_dir", output_dir, 
               "--batch_size", batch_size_param],
    compute_target=compute_target,
    inputs=[input_images, label_dir],
    outputs=[output_dir],
    runconfig=amlcompute_run_config,
    source_directory=scripts_folder

运行管道Run the pipeline

现在请运行管道并检查其生成的输出。Now run the pipeline, and examine the output it produced. 输出会有一个与每个输入图像相对应的分数。The output has a score corresponding to each input image.

# Run the pipeline
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={"param_batch_size": 20})

# Wait for the run to finish (this might take several minutes)
pipeline_run.wait_for_completion(show_output=True)

# Download and review the output
step_run = list(pipeline_run.get_children())[0]
step_run.download_file("./outputs/result-labels.txt")

import pandas as pd
df = pd.read_csv("result-labels.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head()

发布管道Publish the pipeline

对运行结果满意以后,请发布管道,以便以后使用不同的输入值来运行它。After you're satisfied with the outcome of the run, publish the pipeline so you can run it with different input values later. 发布管道时,你会获得一个 REST 终结点。When you publish a pipeline, you get a REST endpoint. 此终结点允许使用已通过 PipelineParameter 纳入的参数集来调用管道。This endpoint accepts invoking of the pipeline with the set of parameters you have already incorporated by using PipelineParameter.

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", 
    description="Batch scoring using Inception v3 model", 
    version="1.0")

使用 REST 终结点重新运行管道Rerun the pipeline by using the REST endpoint

若要重新运行管道,需要一个 Azure Active Directory 身份验证标头令牌,如 AzureCliAuthentication 类中所述。To rerun the pipeline, you'll need an Azure Active Directory authentication header token, as described in AzureCliAuthentication class.

from azureml.pipeline.core import PublishedPipeline

rest_endpoint = published_pipeline.endpoint
# specify batch size when running the pipeline
response = requests.post(rest_endpoint, 
        headers=aad_token, 
        json={"ExperimentName": "batch_scoring",
               "ParameterAssignments": {"param_batch_size": 50}})

# Monitor the run
from azureml.pipeline.core.run import PipelineRun
published_pipeline_run = PipelineRun(ws.experiments["batch_scoring"], run_id)

RunDetails(published_pipeline_run).show()

后续步骤Next steps

若要了解此方面的端到端运行机制,请尝试 GitHub 中的批量评分笔记本。To see this working end-to-end, try the batch scoring notebook in GitHub.

按照文章使用 Jupyter Notebook 来探索此服务了解如何运行 Notebook。Learn how to run notebooks by following the article, Use Jupyter notebooks to explore this service.