在具有 Azure Machine Learning 管線的大型資料集上執行批次預測Run batch predictions on large data sets with Azure Machine Learning pipelines

在本文中,您將瞭解如何使用 Azure Machine Learning 的 ML 管線,以非同步方式對大量資料進行預測。In this article, you learn how to make predictions on large quantities of data asynchronously using the ML pipelines with Azure Machine Learning.

批次預測 (或批次評分) 會針對非同步的應用程式,利用獨特的輸送量來提供符合成本效益的推斷。Batch prediction (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. 批次預測管線可以調整規模,以針對數個 TB 的生產資料執行推斷。Batch prediction pipelines can scale to perform inference on terabytes of production data. 批次預測會對適用於大型資料集合的高輸送量且射後不理的預測進行最佳化。Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.

提示

如果您的系統需要低延遲處理 (為了快速處理單一文件或小型文件集),請使用即時評分而不是批次預測。If your system requires low-latency processing (to process a single document or small set of documents quickly), use real-time scoring instead of batch prediction.

在下列步驟中, 您會建立機器學習管線來註冊預先定型的電腦視覺模型 (開始-V3)。In the following steps, you create a machine learning pipeline to register a pre-trained computer vision model (Inception-V3). 然後, 您可以使用預先定型的模型, 對 Azure Blob 儲存體帳戶中可用的映射執行批次評分。Then you use the pre-trained model to do batch scoring on images available in your Azure Blob storage account. 這些用於計分的影像均為來自 ImageNet (英文) 資料集的未標記影像。These images used for scoring are unlabeled images from the ImageNet dataset.

必要條件Prerequisites

設定機器學習資源Set up machine learning resources

下列步驟會設定執行管線所需的資源:The following steps set up the resources you need to run a pipeline:

  • 存取已經具備預先定型之模型、輸入標籤及要評分之映像 (已經為您設定此項) 的資料存放區。Access the datastore that already has the pretrained model, input labels, and images to score (this is already set up for you).
  • 設定資料存放區來儲存您的輸出。Set up a datastore to store your outputs.
  • 設定  DataReference 物件,以指向上述資料存放區中的資料。Configure DataReference objects to point to the data in the preceding datastores.
  • 設定管線步驟執行所在的計算機器或叢集。Set up compute machines or clusters where the pipeline steps will run.

存取資料存放區Access the datastores

首先,存取具備模型、標籤及影像的資料存放區。First, access the datastore that has the model, labels, and images.

使用名為sampledata的公用 blob 容器, 其位於保存 ImageNet 評估集之映射的pipelinedata帳戶中。Use a public blob container, named sampledata, in the pipelinedata account that holds images from the ImageNet evaluation set. 此公用容器的資料存放區名稱為 images_datastoreThe datastore name for this public container is images_datastore. 向您的工作區註冊此資料存放區:Register this datastore with your workspace:

from azureml.core import Datastore

account_name = "pipelinedata"
datastore_name = "images_datastore"
container_name = "sampledata"

batchscore_blob = Datastore.register_azure_blob_container(ws,
                                                          datastore_name=datastore_name,
                                                          container_name=container_name,
                                                          account_name=account_name,
                                                          overwrite=True)

接下來,設定以針對輸出使用預設的資料存放區。Next, set up to use the default datastore for the outputs.

當您建立工作區時,預設會將檔案儲存體 和 Blob 儲存體 連結至工作區。When you create your workspace, Azure Files and Blob storage are attached to the workspace by default. 檔案儲存體是工作區的預設資料存放區,但您也可以使用 Blob 儲存體作為資料存放區。Azure Files is the default datastore for a workspace, but you can also use Blob storage as a datastore. 如需詳細資訊,請參閱 Azure 儲存體選項For more information, see Azure storage options.

def_data_store = ws.get_default_datastore()

設定資料參考Configure data references

現在,請參考管線中的資料作為管線步驟的輸入。Now, reference the data in your pipeline as inputs to pipeline steps.

管線中的資料來源會由 DataReference 物件來表示。A data source in a pipeline is represented by a DataReference object.  DataReference 物件會指向位於資料存放區中或可從資料存放區存取的資料。The DataReference object points to data that lives in, or is accessible from, a datastore. 您需要針對輸入影像使用的目錄、儲存預先定型之模型的目錄、適用於標籤的目錄及輸出目錄,使用 DataReference 物件。You need DataReference objects for the directory used for input images, the directory in which the pretrained model is stored, the directory for labels, and the output directory.

from azureml.data.data_reference import DataReference

input_images = DataReference(datastore=batchscore_blob,
                             data_reference_name="input_images",
                             path_on_datastore="batchscoring/images",
                             mode="download")

model_dir = DataReference(datastore=batchscore_blob,
                          data_reference_name="input_model",
                          path_on_datastore="batchscoring/models",
                          mode="download")

label_dir = DataReference(datastore=batchscore_blob,
                          data_reference_name="input_labels",
                          path_on_datastore="batchscoring/labels",
                          mode="download")

output_dir = PipelineData(name="scores",
                          datastore=def_data_store,
                          output_path_on_compute="batchscoring/results")

設定計算目標Set up compute target

在 Azure Machine Learning 中,計算 (或計算目標) 係指會在您機器學習管線中執行計算步驟的機器或叢集。In Azure Machine Learning, compute (or compute target) refers to the machines or clusters that perform the computational steps in your machine learning pipeline. 例如,您可以使用AmlCompute類別來建立 Azure Machine Learning 計算。For example, you can create an Azure Machine Learning compute with the AmlCompute class.

from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget

compute_name = "gpucluster"
compute_min_nodes = 0
compute_max_nodes = 4
vm_size = "STANDARD_NC6"

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target. just use it. ' + compute_name)
else:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=vm_size,  # NC6 is GPU-enabled
        vm_priority='lowpriority',  # optional
        min_nodes=compute_min_nodes,
        max_nodes=compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws,
                                          compute_name,
                                          provisioning_config)

    compute_target.wait_for_completion(
        show_output=True,
        min_node_count=None,
        timeout_in_minutes=20)

準備模型Prepare the model

在您可以使用預先定型的模型之前,必須先下載模型,並向您的工作區註冊該模型。Before you can use the pretrained model, you'll need to download the model and register it with your workspace.

下載預先定型的模型Download the pretrained model

http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz 下載預先定型的電腦視覺模型 (InceptionV3)。Download the pretrained computer vision model (InceptionV3) from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz. 然後,將其解壓縮到 models 子資料夾。Then extract it to the models subfolder.

import os
import tarfile
import urllib.request

model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

url = "http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz"
response = urllib.request.urlretrieve(url, "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall(model_dir)

註冊模型Register the model

註冊模型的方法如下:Here's how to register the model:

import shutil
from azureml.core.model import Model

# register downloaded model
model = Model.register(
    model_path="models/inception_v3.ckpt",
    model_name="inception",  # This is the name of the registered model
    tags={'pretrained': "inception"},
    description="Imagenet trained tensorflow inception",
    workspace=ws)

撰寫您的計分指令碼Write your scoring script

警告

下列程式碼只是範例 Notebook (英文) 所使用之 batch_score.p 中內含的範例。The following code is only a sample of what is contained in the batch_score.py used by the sample notebook. 您將需針對您的案例建立自己的計分指令碼。You’ll need to create your own scoring script for your scenario.

batch_score.py 指令碼會接受  dataset_path 中的輸入影像、 model_dir 中預先定型的模型,並將 results-label.txt 輸出至 output_dirThe batch_score.py script takes input images in dataset_path, pretrained models in model_dir, and outputs results-label.txt to output_dir.

# Snippets from a sample scoring script
# Refer to the accompanying batch-scoring Notebook
# https://github.com/Azure/MachineLearningNotebooks/blob/master/pipeline/pipeline-batch-scoring.ipynb
# for the implementation script

# Get labels
def get_class_label_dict(label_file):
  label = []
  proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()
  for l in proto_as_ascii_lines:
    label.append(l.rstrip())
  return label

class DataIterator:
  # Definition of the DataIterator here

def main(_):
    # Refer to batch-scoring Notebook for implementation.
    label_file_name = os.path.join(args.label_dir, "labels.txt")
    label_dict = get_class_label_dict(label_file_name)
    classes_num = len(label_dict)
    test_feeder = DataIterator(data_dir=args.dataset_path)
    total_size = len(test_feeder.labels)

    # get model from model registry
    model_path = Model.get_model_path(args.model_name)
    with tf.Session() as sess:
        test_images = test_feeder.input_pipeline(batch_size=args.batch_size)
        with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
            input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])
            logits, _ = inception_v3.inception_v3(input_images,
                                                        num_classes=classes_num,
                                                        is_training=False)
            probabilities = tf.argmax(logits, 1)

        sess.run(tf.global_variables_initializer())
        sess.run(tf.local_variables_initializer())
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        saver = tf.train.Saver()
        saver.restore(sess, model_path)
        out_filename = os.path.join(args.output_dir, "result-labels.txt")

        # copy the file to artifacts
        shutil.copy(out_filename, "./outputs/")

建置並執行批次評分管線Build and run the batch scoring pipeline

準備執行環境Prepare the run environment

針對指令碼指定 conda 相依性。Specify the conda dependencies for your script. 當您稍後建立管線步驟時,將需要這個物件。You'll need this object later, when you create the pipeline step.

from azureml.core.runconfig import DEFAULT_GPU_IMAGE
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

cd = CondaDependencies.create(
    pip_packages=["tensorflow-gpu==1.10.0", "azureml-defaults"])

# Runconfig
amlcompute_run_config = RunConfiguration(conda_dependencies=cd)
amlcompute_run_config.environment.docker.enabled = True
amlcompute_run_config.environment.docker.gpu_support = True
amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE
amlcompute_run_config.environment.spark.precache_packages = False

指定管線的參數Specify the parameter for your pipeline

使用  PipelineParameter (英文) 物件搭配預設值來建立管線參數。Create a pipeline parameter by using a PipelineParameter object with a default value.

from azureml.pipeline.core.graph import PipelineParameter
batch_size_param = PipelineParameter(
    name="param_batch_size",
    default_value=20)

建立管線步驟Create the pipeline step

使用指令碼、環境設定和參數來建立管線步驟。Create the pipeline step by using the script, environment configuration, and parameters. 指定您已經附加至工作區的計算目標以作為指令碼執行目標。Specify the compute target you already attached to your workspace as the target of execution of the script. 使用 PythonScriptStep (英文) 來建立管線步驟。Use PythonScriptStep to create the pipeline step.

from azureml.pipeline.steps import PythonScriptStep
inception_model_name = "inception_v3.ckpt"

batch_score_step = PythonScriptStep(
    name="batch_scoring",
    script_name="batch_score.py",
    arguments=["--dataset_path", input_images,
               "--model_name", "inception",
               "--label_dir", label_dir,
               "--output_dir", output_dir,
               "--batch_size", batch_size_param],
    compute_target=compute_target,
    inputs=[input_images, label_dir],
    outputs=[output_dir],
    runconfig=amlcompute_run_config,
    source_directory=scripts_folder

執行管道Run the pipeline

現在執行管線,並檢查它所產生的輸出。Now run the pipeline, and examine the output it produced. 輸出具有對應至每個輸入影像的分數。The output has a score corresponding to each input image.

import pandas as pd
from azureml.pipeline.core import Pipeline

# Run the pipeline
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(
    pipeline, pipeline_params={"param_batch_size": 20})

# Wait for the run to finish (this might take several minutes)
pipeline_run.wait_for_completion(show_output=True)

# Download and review the output
step_run = list(pipeline_run.get_children())[0]
step_run.download_file("./outputs/result-labels.txt")

df = pd.read_csv("result-labels.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head()

發佈管線Publish the pipeline

在滿意執行結果之後,請發佈管線,以便稍後可以使用不同的輸入值來執行它。After you're satisfied with the outcome of the run, publish the pipeline so you can run it with different input values later. 當您發佈管線時,您會取得一個 REST 端點。When you publish a pipeline, you get a REST endpoint. 此端點會接受利用一組您已經使用 PipelineParameter (英文) 合併的參數來叫用管線。This endpoint accepts invoking of the pipeline with the set of parameters you have already incorporated by using PipelineParameter.

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring",
    description="Batch scoring using Inception v3 model",
    version="1.0")

使用 REST 端點重新執行管線Rerun the pipeline by using the REST endpoint

若要重新執行管線,您將需要 Azure Active Directory 驗證標頭權杖,如 AzureCliAuthentication 類別 (英文) 中所述。To rerun the pipeline, you'll need an Azure Active Directory authentication header token, as described in AzureCliAuthentication class.

from azureml.pipeline.core.run import PipelineRun
from azureml.pipeline.core import PublishedPipeline

rest_endpoint = published_pipeline.endpoint
# specify batch size when running the pipeline
response = requests.post(rest_endpoint,
                         headers=aad_token,
                         json={"ExperimentName": "batch_scoring",
                               "ParameterAssignments": {"param_batch_size": 50}})

# Monitor the run
published_pipeline_run = PipelineRun(ws.experiments["batch_scoring"], run_id)

RunDetails(published_pipeline_run).show()

後續步驟Next steps

若要查看此端對端工作,請嘗試GitHub中的批次評分筆記本,或移至Azure 架構中心以查看範例解決方案架構。To see this working end-to-end, try the batch scoring notebook in GitHub, or go to the Azure architecture center to see a sample solution architecture.