Run batch predictions on large data sets with Azure Machine Learning service

In this article, you'll learn how to make predictions on large quantities of data asynchronously, by using the Azure Machine Learning service.

Batch prediction (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Batch prediction pipelines can scale to perform inference on terabytes of production data. Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.

Tip

If your system requires low-latency processing (to process a single document or small set of documents quickly), use real-time scoring instead of batch prediction.

In the following steps, you create a machine learning pipeline to register a pretrained computer vision model (Inception-V3). Then you use the pretrained model to do batch scoring on images available in your Azure Blob storage account. These images used for scoring are unlabeled images from the ImageNet dataset.

Prerequisites

Set up machine learning resources

The following steps set up the resources you need to run a pipeline:

  • Access the datastore that already has the pretrained model, input labels, and images to score (this is already set up for you).
  • Set up a datastore to store your outputs.
  • Configure DataReference objects to point to the data in the preceding datastores.
  • Set up compute machines or clusters where the pipeline steps will run.

Access the datastores

First, access the datastore that has the model, labels, and images.

You'll use a public blob container, named sampledata, in the pipelinedata account that holds images from the ImageNet evaluation set. The datastore name for this public container is images_datastore. Register this datastore with your workspace:

# Public blob container details
account_name = "pipelinedata"
datastore_name="images_datastore"
container_name="sampledata"

batchscore_blob = Datastore.register_azure_blob_container(ws,
                      datastore_name=datastore_name,
                      container_name= container_name,
                      account_name=account_name,
                      overwrite=True)

Next, set up to use the default datastore for the outputs.

When you create your workspace, Azure Files and Blob storage are attached to the workspace by default. Azure Files is the default datastore for a workspace, but you can also use Blob storage as a datastore. For more information, see Azure storage options.

def_data_store = ws.get_default_datastore()

Configure data references

Now, reference the data in your pipeline as inputs to pipeline steps.

A data source in a pipeline is represented by a DataReference object. The DataReference object points to data that lives in, or is accessible from, a datastore. You need DataReference objects for the directory used for input images, the directory in which the pretrained model is stored, the directory for labels, and the output directory.

input_images = DataReference(datastore=batchscore_blob, 
                             data_reference_name="input_images",
                             path_on_datastore="batchscoring/images",
                             mode="download")

model_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_model",
                          path_on_datastore="batchscoring/models",
                          mode="download")                          

label_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_labels",
                          path_on_datastore="batchscoring/labels",
                          mode="download")                          

output_dir = PipelineData(name="scores", 
                          datastore=def_data_store, 
                          output_path_on_compute="batchscoring/results")

Set up compute target

In Azure Machine Learning, compute (or compute target) refers to the machines or clusters that perform the computational steps in your machine learning pipeline. For example, you can create an Azure Machine Learning compute.

compute_name = "gpucluster"
compute_min_nodes = 0
compute_max_nodes = 4
vm_size = "STANDARD_NC6"

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target. just use it. ' + compute_name)
else:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
                     vm_size = vm_size, # NC6 is GPU-enabled
                     vm_priority = 'lowpriority', # optional
                     min_nodes = compute_min_nodes, 
                     max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, 
                        compute_name, 
                        provisioning_config)

    compute_target.wait_for_completion(
                     show_output=True, 
                     min_node_count=None, 
                     timeout_in_minutes=20)

Prepare the model

Before you can use the pretrained model, you'll need to download the model and register it with your workspace.

Download the pretrained model

Download the pretrained computer vision model (InceptionV3) from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz. Then extract it to the models subfolder.

import os
import tarfile
import urllib.request

model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

url="http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz"
response = urllib.request.urlretrieve(url, "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall(model_dir)

Register the model

Here's how to register the model:

import shutil
from azureml.core.model import Model

# register downloaded model 
model = Model.register(
        model_path = "models/inception_v3.ckpt",
        model_name = "inception", # This is the name of the registered model
        tags = {'pretrained': "inception"},
        description = "Imagenet trained tensorflow inception",
        workspace = ws)

Write your scoring script

Warning

The following code is only a sample of what is contained in the batch_score.py used by the sample notebook. You’ll need to create your own scoring script for your scenario.

The batch_score.py script takes input images in dataset_path, pretrained models in model_dir, and outputs results-label.txt to output_dir.

# Snippets from a sample scoring script
# Refer to the accompanying batch-scoring Notebook
# https://github.com/Azure/MachineLearningNotebooks/blob/master/pipeline/pipeline-batch-scoring.ipynb
# for the implementation script

# Get labels
def get_class_label_dict(label_file):
  label = []
  proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()
  for l in proto_as_ascii_lines:
    label.append(l.rstrip())
  return label

class DataIterator:
  # Definition of the DataIterator here

def main(_):
    # Refer to batch-scoring Notebook for implementation.
    label_file_name = os.path.join(args.label_dir, "labels.txt")
    label_dict = get_class_label_dict(label_file_name)
    classes_num = len(label_dict)
    test_feeder = DataIterator(data_dir=args.dataset_path)
    total_size = len(test_feeder.labels)

    # get model from model registry
    model_path = Model.get_model_path(args.model_name)
    with tf.Session() as sess:
        test_images = test_feeder.input_pipeline(batch_size=args.batch_size)
        with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
            input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])
            logits, _ = inception_v3.inception_v3(input_images,
                                                        num_classes=classes_num,
                                                        is_training=False)
            probabilities = tf.argmax(logits, 1)

        sess.run(tf.global_variables_initializer())
        sess.run(tf.local_variables_initializer())
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        saver = tf.train.Saver()
        saver.restore(sess, model_path)
        out_filename = os.path.join(args.output_dir, "result-labels.txt")

        # copy the file to artifacts
        shutil.copy(out_filename, "./outputs/")

Build and run the batch scoring pipeline

You have everything you need to build the pipeline, so now put it all together.

Prepare the run environment

Specify the conda dependencies for your script. You'll need this object later, when you create the pipeline step.

from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.10.0", "azureml-defaults"])

# Runconfig
amlcompute_run_config = RunConfiguration(conda_dependencies=cd)
amlcompute_run_config.environment.docker.enabled = True
amlcompute_run_config.environment.docker.gpu_support = True
amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE
amlcompute_run_config.environment.spark.precache_packages = False

Specify the parameter for your pipeline

Create a pipeline parameter by using a PipelineParameter object with a default value.

batch_size_param = PipelineParameter(
                    name="param_batch_size", 
                    default_value=20)

Create the pipeline step

Create the pipeline step by using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. Use PythonScriptStep to create the pipeline step.

inception_model_name = "inception_v3.ckpt"

batch_score_step = PythonScriptStep(
    name="batch_scoring",
    script_name="batch_score.py",
    arguments=["--dataset_path", input_images, 
               "--model_name", "inception",
               "--label_dir", label_dir, 
               "--output_dir", output_dir, 
               "--batch_size", batch_size_param],
    compute_target=compute_target,
    inputs=[input_images, label_dir],
    outputs=[output_dir],
    runconfig=amlcompute_run_config,
    source_directory=scripts_folder

Run the pipeline

Now run the pipeline, and examine the output it produced. The output has a score corresponding to each input image.

# Run the pipeline
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={"param_batch_size": 20})

# Wait for the run to finish (this might take several minutes)
pipeline_run.wait_for_completion(show_output=True)

# Download and review the output
step_run = list(pipeline_run.get_children())[0]
step_run.download_file("./outputs/result-labels.txt")

import pandas as pd
df = pd.read_csv("result-labels.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head()

Publish the pipeline

After you're satisfied with the outcome of the run, publish the pipeline so you can run it with different input values later. When you publish a pipeline, you get a REST endpoint. This endpoint accepts invoking of the pipeline with the set of parameters you have already incorporated by using PipelineParameter.

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", 
    description="Batch scoring using Inception v3 model", 
    version="1.0")

Rerun the pipeline by using the REST endpoint

To rerun the pipeline, you'll need an Azure Active Directory authentication header token, as described in AzureCliAuthentication class.

from azureml.pipeline.core import PublishedPipeline

rest_endpoint = published_pipeline.endpoint
# specify batch size when running the pipeline
response = requests.post(rest_endpoint, 
        headers=aad_token, 
        json={"ExperimentName": "batch_scoring",
               "ParameterAssignments": {"param_batch_size": 50}})

# Monitor the run
from azureml.pipeline.core.run import PipelineRun
published_pipeline_run = PipelineRun(ws.experiments["batch_scoring"], run_id)

RunDetails(published_pipeline_run).show()

Next steps

To see this working end-to-end, try the batch scoring notebook in GitHub.

Learn how to run notebooks by following the article, Use Jupyter notebooks to explore this service.