Build & use an Azure Machine Learning pipeline for batch scoring

In this tutorial, you use a pipeline in Azure Machine Learning to run a batch scoring job. The example uses the pretrained Inception-V3 convolutional neural network Tensorflow model to classify unlabeled images. After you build and publish a pipeline, you configure a REST endpoint that you can use to trigger the pipeline from any HTTP library on any platform.

Machine learning pipelines optimize your workflow with speed, portability, and reuse, so you can focus on your expertise - machine learning - instead of on infrastructure and automation. Learn more about machine learning pipelines.

In this tutorial, you complete the following tasks:

  • Configure workspace and download sample data
  • Create data objects to fetch and output data
  • Download, prepare, and register the model in your workspace
  • Provision compute targets and create a scoring script
  • Build, run, and publish a pipeline
  • Enable a REST endpoint for the pipeline

If you don’t have an Azure subscription, create a free account before you begin. Try the free or paid version of Azure Machine Learning today.

Prerequisites

  • If you don't already have an Azure Machine Learning workspace or notebook virtual machine, complete Part 1 of the setup tutorial.
  • When you finish the setup tutorial, use the same notebook server to open the tutorials/tutorial-pipeline-batch-scoring-classification.ipynb notebook.

If you want to run the setup tutorial in your own local environment, you can access the tutorial on GitHub. Run pip install azureml-sdk[notebooks] azureml-pipeline-core azureml-pipeline-steps pandas requests to get the required packages.

Configure workspace and create a datastore

Create a workspace object from the existing Azure Machine Learning workspace.

  • A workspace is a class that accepts your Azure subscription and resource information. The workspace also creates a cloud resource you can use to monitor and track your model runs.
  • Workspace.from_config() reads the config.json file and then loads the authentication details into an object named ws. The ws object is used in the code throughout this tutorial.
from azureml.core import Workspace
ws = Workspace.from_config()

Create a datastore for sample images

On the pipelinedata account, get the ImageNet evaluation public data sample from the sampledata public blob container. Call register_azure_blob_container() to make the data available to the workspace under the name images_datastore. Then, set the workspace default datastore as the output datastore. Use the output datastore to score output in the pipeline.

from azureml.core.datastore import Datastore

batchscore_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name="images_datastore", 
                      container_name="sampledata", 
                      account_name="pipelinedata", 
                      overwrite=True)

def_data_store = ws.get_default_datastore()

Create data objects

When you build a pipeline, a DataReference object reads data from the workspace datastore. A PipelineData object transfers intermediate data between pipeline steps.

Important

The batch scoring example in this tutorial uses only one pipeline step. In use cases that have multiple steps, the typical flow will include these steps:

  1. Use DataReference objects as inputs to fetch raw data, perform some transformation, and then output a PipelineData object.

  2. Use the PipelineData output object in the preceding step as an input object. Repeat it for subsequent steps.

In this scenario, you create DataReference objects that correspond to the datastore directories for both the input images and the classification labels (y-test values). You also create a PipelineData object for the batch scoring output data.

from azureml.data.data_reference import DataReference
from azureml.pipeline.core import PipelineData

input_images = DataReference(datastore=batchscore_blob, 
                             data_reference_name="input_images",
                             path_on_datastore="batchscoring/images",
                             mode="download"
                            )

label_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_labels",
                          path_on_datastore="batchscoring/labels",
                          mode="download"                          
                         )

output_dir = PipelineData(name="scores", 
                          datastore=def_data_store, 
                          output_path_on_compute="batchscoring/results")

Download and register the model

Download the pretrained Tensorflow model to use it for batch scoring in a pipeline. First, create a local directory where you store the model. Then, download and extract the model.

import os
import tarfile
import urllib.request

if not os.path.isdir("models"):
    os.mkdir("models")
    
response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall("models")

Next, register the model to your workspace, so you can easily retrieve the model in the pipeline process. In the register() static function, the model_name parameter is the key you use to locate your model throughout the SDK.

from azureml.core.model import Model
 
model = Model.register(model_path="models/inception_v3.ckpt",
                       model_name="inception",
                       tags={"pretrained": "inception"},
                       description="Imagenet trained tensorflow inception",
                       workspace=ws)

Create and attach the remote compute target

Machine learning pipelines can't be run locally, so you run them on cloud resources or remote compute targets. A remote compute target is a reusable virtual compute environment where you run experiments and machine learning workflows.

Run the following code to create a GPU-enabled AmlCompute target, and then attach it to your workspace. For more information about compute targets, see the conceptual article.

from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException
compute_name = "gpu-cluster"

# checks to see if compute target already exists in workspace, else create it
try:
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
    config = AmlCompute.provisioning_configuration(vm_size="STANDARD_NC6",
                                                   vm_priority="lowpriority", 
                                                   min_nodes=0, 
                                                   max_nodes=1)

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Write a scoring script

To do the scoring, create a batch scoring script called batch_scoring.py, and then write it to the current directory. The script takes input images, applies the classification model, and then outputs the predictions to a results file.

The batch_scoring.py script takes the following parameters, which are passed from the pipeline step that you create later in this tutorial:

  • --model_name: The name of the model being used.
  • --label_dir: The directory that holds the labels.txt file.
  • --dataset_path: The directory that contains the input images.
  • --output_dir: The output directory for the results-label.txt file after the script runs the model on the data.
  • --batch_size: The batch size used in running the model.

The pipeline infrastructure uses the ArgumentParser class to pass parameters into pipeline steps. For example, in the following code, the first argument --model_name is given the property identifier model_name. In the main() function, Model.get_model_path(args.model_name) is used to access this property.

%%writefile batch_scoring.py

import os
import argparse
import datetime
import time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3
from azureml.core.model import Model

slim = tf.contrib.slim

parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
parser.add_argument('--model_name', dest="model_name", required=True)
parser.add_argument('--label_dir', dest="label_dir", required=True)
parser.add_argument('--dataset_path', dest="dataset_path", required=True)
parser.add_argument('--output_dir', dest="output_dir", required=True)
parser.add_argument('--batch_size', dest="batch_size", type=int, required=True)

args = parser.parse_args()

image_size = 299
num_channel = 3

# create output directory if it does not exist
os.makedirs(args.output_dir, exist_ok=True)


def get_class_label_dict(label_file):
    label = []
    proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()
    for l in proto_as_ascii_lines:
        label.append(l.rstrip())
    return label


class DataIterator:
    def __init__(self, data_dir):
        self.file_paths = []
        image_list = os.listdir(data_dir)
        self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list]
        self.labels = [1 for file_name in self.file_paths]

    @property
    def size(self):
        return len(self.labels)

    def input_pipeline(self, batch_size):
        images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)
        labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)
        input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)
        labels = input_queue[1]
        images_content = tf.read_file(input_queue[0])

        image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name="jpeg_reader")
        float_caster = tf.cast(image_reader, tf.float32)
        new_size = tf.constant([image_size, image_size], dtype=tf.int32)
        images = tf.image.resize_images(float_caster, new_size)
        images = tf.divide(tf.subtract(images, [0]), [255])

        image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)
        return image_batch


def main(_):
    label_file_name = os.path.join(args.label_dir, "labels.txt")
    label_dict = get_class_label_dict(label_file_name)
    classes_num = len(label_dict)
    test_feeder = DataIterator(data_dir=args.dataset_path)
    total_size = len(test_feeder.labels)
    count = 0

    # get model from model registry
    model_path = Model.get_model_path(args.model_name)

    with tf.Session() as sess:
        test_images = test_feeder.input_pipeline(batch_size=args.batch_size)
        with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
            input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])
            logits, _ = inception_v3.inception_v3(input_images,
                                                  num_classes=classes_num,
                                                  is_training=False)
            probabilities = tf.argmax(logits, 1)

        sess.run(tf.global_variables_initializer())
        sess.run(tf.local_variables_initializer())
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        saver = tf.train.Saver()
        saver.restore(sess, model_path)
        out_filename = os.path.join(args.output_dir, "result-labels.txt")
        with open(out_filename, "w") as result_file:
            i = 0
            while count < total_size and not coord.should_stop():
                test_images_batch = sess.run(test_images)
                file_names_batch = test_feeder.file_paths[i * args.batch_size:
                                                          min(test_feeder.size, (i + 1) * args.batch_size)]
                results = sess.run(probabilities, feed_dict={input_images: test_images_batch})
                new_add = min(args.batch_size, total_size - count)
                count += new_add
                i += 1
                for j in range(new_add):
                    result_file.write(os.path.basename(file_names_batch[j]) + ": " + label_dict[results[j]] + "\n")
                result_file.flush()
            coord.request_stop()
            coord.join(threads)

        shutil.copy(out_filename, "./outputs/")

if __name__ == "__main__":
    tf.app.run()

Tip

The pipeline in this tutorial has only one step, and it writes the output to a file. For multi-step pipelines, you also use ArgumentParser to define a directory to write output data for input to subsequent steps. For an example of passing data between multiple pipeline steps by using the ArgumentParser design pattern, see the notebook.

Build and run the pipeline

Before you run the pipeline, create an object that defines the Python environment and creates the dependencies that your batch_scoring.py script requires. The main dependency required is Tensorflow, but you also install azureml-defaults from the SDK for background processes. Create a RunConfiguration object by using the dependencies. Also, specify Docker and Docker-GPU support.

from azureml.core.runconfig import DEFAULT_GPU_IMAGE
from azureml.core.runconfig import CondaDependencies, RunConfiguration

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.13.1", "azureml-defaults"])

amlcompute_run_config = RunConfiguration(conda_dependencies=cd)
amlcompute_run_config.environment.docker.enabled = True
amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE
amlcompute_run_config.environment.spark.precache_packages = False

Parameterize the pipeline

Define a custom parameter for the pipeline to control the batch size. After the pipeline is published and exposed via a REST endpoint, any configured parameters are also exposed. You can specify custom parameters in the JSON payload when you rerun the pipeline via an HTTP request.

Create a PipelineParameter object to enable this behavior and to define a name and default value.

from azureml.pipeline.core.graph import PipelineParameter
batch_size_param = PipelineParameter(name="param_batch_size", default_value=20)

Create the pipeline step

A pipeline step is an object that encapsulates everything you need to run a pipeline, including:

  • Environment and dependency settings
  • The compute resource to run the pipeline on
  • Input and output data, and any custom parameters
  • Reference to a script or SDK logic to run during the step

Multiple classes inherit from the parent class PipelineStep. You can choose classes to use specific frameworks or stacks to build a step. In this example, you use the PythonScriptStep class to define your step logic by using a custom Python script. If an argument to your script is either an input to the step or an output of the step, the argument must be defined both in the arguments array and in either the input or the output parameter, respectively.

In scenarios where there is more than one step, an object reference in the outputs array becomes available as an input for a subsequent pipeline step.

from azureml.pipeline.steps import PythonScriptStep

batch_score_step = PythonScriptStep(
    name="batch_scoring",
    script_name="batch_scoring.py",
    arguments=["--dataset_path", input_images, 
               "--model_name", "inception",
               "--label_dir", label_dir, 
               "--output_dir", output_dir, 
               "--batch_size", batch_size_param],
    compute_target=compute_target,
    inputs=[input_images, label_dir],
    outputs=[output_dir],
    runconfig=amlcompute_run_config
)

For a list of all the classes you can use for different step types, see the steps package.

Run the pipeline

Now, run the pipeline. First, create a Pipeline object by using your workspace reference and the pipeline step you created. The steps parameter is an array of steps. In this case, there's only one step for batch scoring. To build pipelines that have multiple steps, place the steps in order in this array.

Next, use the Experiment.submit() function to submit the pipeline for execution. You also specify the custom parameter param_batch_size. The wait_for_completion function outputs logs during the pipeline build process. You can use the logs to see current progress.

Important

The first pipeline run takes roughly 15 minutes. All dependencies must be downloaded, a Docker image is created, and the Python environment is provisioned and created. Running the pipeline again takes significantly less time because those resources are reused instead of created. However, total run time for the pipeline depends on the workload of your scripts and the processes that are running in each pipeline step.

from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_parameters={"param_batch_size": 20})
pipeline_run.wait_for_completion(show_output=True)

Download and review output

Run the following code to download the output file that's created from the batch_scoring.py script. Then, explore the scoring results.

import pandas as pd

step_run = list(pipeline_run.get_children())[0]
step_run.download_file("./outputs/result-labels.txt")

df = pd.read_csv("result-labels.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head(10)
Filename Prediction
0 ILSVRC2012_val_00000102.JPEG Rhodesian Ridgeback
1 ILSVRC2012_val_00000103.JPEG tripod
2 ILSVRC2012_val_00000104.JPEG typewriter keyboard
3 ILSVRC2012_val_00000105.JPEG silky terrier
4 ILSVRC2012_val_00000106.JPEG Windsor tie
5 ILSVRC2012_val_00000107.JPEG harvestman
6 ILSVRC2012_val_00000108.JPEG violin
7 ILSVRC2012_val_00000109.JPEG loudspeaker
8 ILSVRC2012_val_00000110.JPEG apron
9 ILSVRC2012_val_00000111.JPEG American lobster

Publish and run from a REST endpoint

Run the following code to publish the pipeline to your workspace. In your workspace in the Azure portal, you can see metadata for the pipeline, including run history and durations. You can also run the pipeline manually from the portal.

Publishing the pipeline enables a REST endpoint that you can use to run the pipeline from any HTTP library on any platform.

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", description="Batch scoring using Inception v3 model", version="1.0")

published_pipeline

To run the pipeline from the REST endpoint, you need an OAuth2 Bearer-type authentication header. The following example uses interactive authentication (for illustration purposes), but for most production scenarios that require automated or headless authentication, use service principal authentication as described in this notebook.

Service principal authentication involves creating an App Registration in Azure Active Directory. First, you generate a client secret, and then you grant your service principal role access to your machine learning workspace. Use the ServicePrincipalAuthentication class to manage your authentication flow.

Both InteractiveLoginAuthentication and ServicePrincipalAuthentication inherit from AbstractAuthentication. In both cases, use the get_authentication_header() function in the same way to fetch the header:

from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

Get the REST URL from the endpoint property of the published pipeline object. You can also find the REST URL in your workspace in the Azure portal.

Build an HTTP POST request to the endpoint. Specify your authentication header in the request. Add a JSON payload object that has the experiment name and the batch size parameter. As noted earlier in the tutorial, param_batch_size is passed through to your batch_scoring.py script because you defined it as a PipelineParameter object in the step configuration.

Make the request to trigger the run. Include code to access the Id key from the response dictionary to get the value of the run ID.

import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "batch_scoring",
                               "ParameterAssignments": {"param_batch_size": 50}})
run_id = response.json()["Id"]

Use the run ID to monitor the status of the new run. The new run takes another 10-15 min to finish.

The new run will look similar to the pipeline you ran earlier in the tutorial. You can choose not to view the full output.

from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments["batch_scoring"], run_id)
RunDetails(published_pipeline_run).show()

Clean up resources

Don't complete this section if you plan to run other Azure Machine Learning tutorials.

Stop the notebook VM

If you used a cloud notebook server, to reduce costs, stop the VM when your'e not using it:

  1. In your workspace, select Notebook VMs.
  2. In the list of VMs, select the VM you want to stop.
  3. Select Stop.
  4. When you're ready to use the server again, select Start.

Delete everything

If you don't plan to use the resources you created, delete them, so you don't incur any charges:

  1. In the Azure portal, in the left menu, select Resource groups.
  2. In the list of resource groups, select the resource group you created.
  3. Select Delete resource group.
  4. Enter the resource group name. Then, select Delete.

You can also keep the resource group but delete a single workspace. Display the workspace properties, and then select Delete.

Next steps

In this machine learning pipelines tutorial, you did the following tasks:

  • Built a pipeline with environment dependencies to run on a remote GPU compute resource.
  • Created a scoring script to run batch predictions by using a pretrained Tensorflow model.
  • Published a pipeline and enabled it to be run from a REST endpoint.

For more examples of how to build pipelines by using the machine learning SDK, see the notebook repository.