Create and run machine learning pipelines with Azure Machine Learning SDK

In this article, you learn how to create, publish, run, and track a machine learning pipeline by using the Azure Machine Learning SDK. Use ML pipelines to create a workflow that stitches together various ML phases, and then publish that pipeline into your Azure Machine Learning workspace to access later or share with others. ML pipelines are ideal for batch scoring scenarios, using various computes, reusing steps instead of rerunning them, as well as sharing ML workflows with others.

While you can use a different kind of pipeline called an Azure Pipeline for CI/CD automation of ML tasks, that type of pipeline is never stored inside your workspace. Compare these different pipelines.

Each phase of an ML pipeline, such as data preparation and model training, can include one or more steps.

The ML pipelines you create are visible to the members of your Azure Machine Learning workspace.

ML pipelines use remote compute targets for computation and the storage of the intermediate and final data associated with that pipeline. They can read and write data to and from supported Azure Storage locations.

If you don’t have an Azure subscription, create a free account before you begin. Try the free or paid version of Azure Machine Learning.

Prerequisites

Start by attaching your workspace:

import azureml.core
from azureml.core import Workspace, Datastore

ws = Workspace.from_config()

Set up machine learning resources

Create the resources required to run an ML pipeline:

  • Set up a datastore used to access the data needed in the pipeline steps.

  • Configure a DataReference object to point to data that lives in, or is accessible in, a datastore.

  • Set up the compute targets on which your pipeline steps will run.

Set up a datastore

A datastore stores the data for the pipeline to access. Each workspace has a default datastore. You can register additional datastores.

When you create your workspace, Azure Files and Azure Blob storage are attached to the workspace. A default datastore is registered to connect to the Azure Blob storage. To learn more, see Deciding when to use Azure Files, Azure Blobs, or Azure Disks.

# Default datastore 
def_data_store = ws.get_default_datastore()

# Get the blob storage associated with the workspace
def_blob_store = Datastore(ws, "workspaceblobstore")

# Get file storage associated with the workspace
def_file_store = Datastore(ws, "workspacefilestore")

Upload data files or directories to the datastore for them to be accessible from your pipelines. This example uses the Blob storage as the datastore:

def_blob_store.upload_files(
    ["./data/20news.pkl"],
    target_path="20newsgroups",
    overwrite=True)

A pipeline consists of one or more steps. A step is a unit run on a compute target. Steps might consume data sources and produce “intermediate” data. A step can create data such as a model, a directory with model and dependent files, or temporary data. This data is then available for other steps later in the pipeline.

Configure data reference

You just created a data source that can be referenced in a pipeline as an input to a step. A data source in a pipeline is represented by a DataReference object. The DataReference object points to data that lives in or is accessible from a datastore.

from azureml.data.data_reference import DataReference

blob_input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="test_data",
    path_on_datastore="20newsgroups/20news.pkl")

Intermediate data (or output of a step) is represented by a PipelineData object. output_data1 is produced as the output of a step, and used as the input of one or more future steps. PipelineData introduces a data dependency between steps, and creates an implicit execution order in the pipeline.

from azureml.pipeline.core import PipelineData

output_data1 = PipelineData(
    "output_data1",
    datastore=def_blob_store,
    output_name="output_data1")

Set up compute target

In Azure Machine Learning, the term computes__ (or compute target) refers to the machines or clusters that perform the computational steps in your machine learning pipeline. See compute targets for model training for a full list of compute targets and how to create and attach them to your workspace. The process for creating and or attaching a compute target is the same regardless of whether you are training a model or running a pipeline step. After you create and attach your compute target, use the ComputeTarget object in your pipeline step.

Important

Performing management operations on compute targets is not supported from inside remote jobs. Since machine learning pipelines are submitted as a remote job, do not use management operations on compute targets from inside the pipeline.

Below are examples of creating and attaching compute targets for:

  • Azure Machine Learning Compute
  • Azure Databricks
  • Azure Data Lake Analytics

Azure Machine Learning compute

You can create an Azure Machine Learning compute for running your steps.

from azureml.core.compute import ComputeTarget, AmlCompute

compute_name = "aml-compute"
vm_size = "STANDARD_NC6"
if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target: ' + compute_name)
else:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_size,  # STANDARD_NC6 is GPU-enabled
                                                                min_nodes=0,
                                                                max_nodes=4)
    # create the compute target
    compute_target = ComputeTarget.create(
        ws, compute_name, provisioning_config)

    # Can poll for a minimum number of nodes and for a specific timeout.
    # If no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(
        show_output=True, min_node_count=None, timeout_in_minutes=20)

    # For a more detailed view of current cluster status, use the 'status' property
    print(compute_target.status.serialize())

Azure Databricks

Azure Databricks is an Apache Spark-based environment in the Azure cloud. It can be used as a compute target with an Azure Machine Learning pipeline.

Create an Azure Databricks workspace before using it. To create a workspace resource, see the Run a Spark job on Azure Databricks document.

To attach Azure Databricks as a compute target, provide the following information:

  • Databricks compute name: The name you want to assign to this compute resource.
  • Databricks workspace name: The name of the Azure Databricks workspace.
  • Databricks access token: The access token used to authenticate to Azure Databricks. To generate an access token, see the Authentication document.

The following code demonstrates how to attach Azure Databricks as a compute target with the Azure Machine Learning SDK:

import os
from azureml.core.compute import ComputeTarget, DatabricksCompute
from azureml.exceptions import ComputeTargetException

databricks_compute_name = os.environ.get(
    "AML_DATABRICKS_COMPUTE_NAME", "<databricks_compute_name>")
databricks_workspace_name = os.environ.get(
    "AML_DATABRICKS_WORKSPACE", "<databricks_workspace_name>")
databricks_resource_group = os.environ.get(
    "AML_DATABRICKS_RESOURCE_GROUP", "<databricks_resource_group>")
databricks_access_token = os.environ.get(
    "AML_DATABRICKS_ACCESS_TOKEN", "<databricks_access_token>")

try:
    databricks_compute = ComputeTarget(
        workspace=ws, name=databricks_compute_name)
    print('Compute target already exists')
except ComputeTargetException:
    print('compute not found')
    print('databricks_compute_name {}'.format(databricks_compute_name))
    print('databricks_workspace_name {}'.format(databricks_workspace_name))
    print('databricks_access_token {}'.format(databricks_access_token))

    # Create attach config
    attach_config = DatabricksCompute.attach_configuration(resource_group=databricks_resource_group,
                                                           workspace_name=databricks_workspace_name,
                                                           access_token=databricks_access_token)
    databricks_compute = ComputeTarget.attach(
        ws,
        databricks_compute_name,
        attach_config
    )

    databricks_compute.wait_for_completion(True)

For a more detailed example, see an example notebook on GitHub.

Azure Data Lake Analytics

Azure Data Lake Analytics is a big data analytics platform in the Azure cloud. It can be used as a compute target with an Azure Machine Learning pipeline.

Create an Azure Data Lake Analytics account before using it. To create this resource, see the Get started with Azure Data Lake Analytics document.

To attach Data Lake Analytics as a compute target, you must use the Azure Machine Learning SDK and provide the following information:

  • Compute name: The name you want to assign to this compute resource.
  • Resource Group: The resource group that contains the Data Lake Analytics account.
  • Account name: The Data Lake Analytics account name.

The following code demonstrates how to attach Data Lake Analytics as a compute target:

import os
from azureml.core.compute import ComputeTarget, AdlaCompute
from azureml.exceptions import ComputeTargetException


adla_compute_name = os.environ.get(
    "AML_ADLA_COMPUTE_NAME", "<adla_compute_name>")
adla_resource_group = os.environ.get(
    "AML_ADLA_RESOURCE_GROUP", "<adla_resource_group>")
adla_account_name = os.environ.get(
    "AML_ADLA_ACCOUNT_NAME", "<adla_account_name>")

try:
    adla_compute = ComputeTarget(workspace=ws, name=adla_compute_name)
    print('Compute target already exists')
except ComputeTargetException:
    print('compute not found')
    print('adla_compute_name {}'.format(adla_compute_name))
    print('adla_resource_id {}'.format(adla_resource_group))
    print('adla_account_name {}'.format(adla_account_name))
    # create attach config
    attach_config = AdlaCompute.attach_configuration(resource_group=adla_resource_group,
                                                     account_name=adla_account_name)
    # Attach ADLA
    adla_compute = ComputeTarget.attach(
        ws,
        adla_compute_name,
        attach_config
    )

    adla_compute.wait_for_completion(True)

For a more detailed example, see an example notebook on GitHub.

Tip

Azure Machine Learning pipelines can only work with data stored in the default data store of the Data Lake Analytics account. If the data you need to work with is in a non-default store, you can use a DataTransferStep to copy the data before training.

Construct your pipeline steps

Once you create and attach a compute target to your workspace, you are ready to define a pipeline step. There are many built-in steps available via the Azure Machine Learning SDK. The most basic of these steps is a PythonScriptStep, which runs a Python script in a specified compute target:

from azureml.pipeline.steps import PythonScriptStep

trainStep = PythonScriptStep(
    script_name="train.py",
    arguments=["--input", blob_input_data, "--output", output_data1],
    inputs=[blob_input_data],
    outputs=[output_data1],
    compute_target=compute_target,
    source_directory=project_folder
)

Reuse of previous results (allow_reuse) is key when using pipelines in a collaborative environment since eliminating unnecessary reruns offers agility. Reuse is the default behavior when the script_name, inputs, and the parameters of a step remain the same. When the output of the step is reused, the job is not submitted to the compute, instead, the results from the previous run are immediately available to the next step's run. If allow_reuse is set to false, a new run will always be generated for this step during pipeline execution.

After you define your steps, you build the pipeline by using some or all of those steps.

Note

No file or data is uploaded to Azure Machine Learning when you define the steps or build the pipeline.

# list of steps to run
compareModels = [trainStep, extractStep, compareStep]

from azureml.pipeline.core import Pipeline

# Build the pipeline
pipeline1 = Pipeline(workspace=ws, steps=[compareModels])

The following example uses the Azure Databricks compute target created earlier:

from azureml.pipeline.steps import DatabricksStep

dbStep = DatabricksStep(
    name="databricksmodule",
    inputs=[step_1_input],
    outputs=[step_1_output],
    num_workers=1,
    notebook_path=notebook_path,
    notebook_params={'myparam': 'testparam'},
    run_name='demo run name',
    compute_target=databricks_compute,
    allow_reuse=False
)
# List of steps to run
steps = [dbStep]

# Build the pipeline
pipeline1 = Pipeline(workspace=ws, steps=steps)

For more information, see the azure-pipeline-steps package and Pipeline class reference.

Submit the pipeline

When you submit the pipeline, Azure Machine Learning checks the dependencies for each step and uploads a snapshot of the source directory you specified. If no source directory is specified, the current local directory is uploaded. The snapshot is also stored as part of the experiment in your workspace.

Important

To prevent files from being included in the snapshot, create a .gitignore or .amlignore file in the directory and add the files to it. The .amlignore file uses the same syntax and patterns as the .gitignore file. If both files exist, the .amlignore file takes precedence.

For more information, see Snapshots.

from azureml.core import Experiment

# Submit the pipeline to be run
pipeline_run1 = Experiment(ws, 'Compare_Models_Exp').submit(pipeline1)
pipeline_run1.wait_for_completion()

When you first run a pipeline, Azure Machine Learning:

  • Downloads the project snapshot to the compute target from the Blob storage associated with the workspace.
  • Builds a Docker image corresponding to each step in the pipeline.
  • Downloads the Docker image for each step to the compute target from the container registry.
  • Mounts the datastore if a DataReference object is specified in a step. If mount is not supported, the data is instead copied to the compute target.
  • Runs the step in the compute target specified in the step definition.
  • Creates artifacts, such as logs, stdout and stderr, metrics, and output specified by the step. These artifacts are then uploaded and kept in the user’s default datastore.

Diagram of running an experiment as a pipeline

For more information, see the Experiment class reference.

GitHub tracking and integration

When you start a training run where the source directory is a local Git repository, information about the repository is stored in the run history. For more information, see Git integration for Azure Machine Learning.

Publish a pipeline

You can publish a pipeline to run it with different inputs later. For the REST endpoint of an already published pipeline to accept parameters, you must parameterize the pipeline before publishing.

  1. To create a pipeline parameter, use a PipelineParameter object with a default value.

    from azureml.pipeline.core.graph import PipelineParameter
    
    pipeline_param = PipelineParameter(
      name="pipeline_arg",
      default_value=10)
    
  2. Add this PipelineParameter object as a parameter to any of the steps in the pipeline as follows:

    compareStep = PythonScriptStep(
      script_name="compare.py",
      arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param],
      inputs=[ comp_data1, comp_data2],
      outputs=[out_data3],
      target=compute_target,
      source_directory=project_folder)
    
  3. Publish this pipeline that will accept a parameter when invoked.

    published_pipeline1 = pipeline_run1.publish_pipeline(
         name="My_Published_Pipeline",
         description="My Published Pipeline Description",
         version="1.0")
    

Run a published pipeline

All published pipelines have a REST endpoint. This endpoint invokes the run of the pipeline from external systems, such as non-Python clients. This endpoint enables "managed repeatability" in batch scoring and retraining scenarios.

To invoke the run of the preceding pipeline, you need an Azure Active Directory authentication header token, as described in AzureCliAuthentication class reference or get more details in the Authentication in Azure Machine Learning notebook.

from azureml.pipeline.core import PublishedPipeline
import requests

response = requests.post(published_pipeline1.endpoint,
                         headers=aad_token,
                         json={"ExperimentName": "My_Pipeline",
                               "ParameterAssignments": {"pipeline_arg": 20}})

View results of a published pipeline

See the list of all your published pipelines and their run details:

  1. Sign in to the Azure portal.

  2. View your workspace to find the list of pipelines. list of machine learning pipelines

  3. Select a specific pipeline to see the run results.

These results are also available in your workspace landing page (preview).

Disable a published pipeline

To hide a pipeline from your list of published pipelines, you disable it:

# Get the pipeline by using its ID from the Azure portal
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()

You can enable it again with p.enable(). For more information, see PublishedPipeline class reference.

Caching & reuse

In order to optimize and customize the behavior of your pipelines, you can do a few things around caching and reuse. For example, you can choose to:

  • Turn off the default reuse of the step run output by setting allow_reuse=False during step definition. Reuse is key when using pipelines in a collaborative environment since eliminating unnecessary runs offers agility. However, you can opt out of reuse.
  • Extend hashing beyond the script, to also include an absolute path or relative paths to the source_directory to other files and directories using the hash_paths=['<file or directory']
  • Force output regeneration for all steps in a run with pipeline_run = exp.submit(pipeline, regenerate_outputs=False)

By default, allow_reuse for steps is enabled and only the main script file is hashed. So, if the script for a given step remains the same (script_name, inputs, and the parameters), the output of a previous step run is reused, the job is not submitted to the compute, and the results from the previous run are immediately available to the next step instead.

step = PythonScriptStep(name="Hello World",
                        script_name="hello_world.py",
                        compute_target=aml_compute,
                        source_directory=source_directory,
                        allow_reuse=False,
                        hash_paths=['hello_world.ipynb'])

Next steps

Learn how to run notebooks by following the article, Use Jupyter notebooks to explore this service.