Create and run a machine learning pipeline by using Azure Machine Learning SDK

In this article, you learn how to create, publish, run, and track a machine learning pipeline by using the Azure Machine Learning SDK. These pipelines help create and manage the workflows that stitch together various machine learning phases. Each phase of a pipeline, such as data preparation and model training, can include one or more steps.

The pipelines you create are visible to the members of your Azure Machine Learning service workspace.

Pipelines use remote compute targets for computation and the storage of the intermediate and final data associated with that pipeline. Pipelines can read and write data to and from supported Azure Storage locations.

If you don’t have an Azure subscription, create a free account before you begin. Try the free or paid version of Azure Machine Learning service.

Prerequisites

Set up machine learning resources

Create the resources required to run a pipeline:

  • Set up a datastore used to access the data needed in the pipeline steps.

  • Configure a DataReference object to point to data that lives in, or is accessible in, a datastore.

  • Set up the compute targets on which your pipeline steps will run.

Set up a datastore

A datastore stores the data for the pipeline to access. Each workspace has a default datastore. You can register additional datastores.

When you create your workspace, Azure Files and Azure Blob storage are attached to the workspace by default. Azure Files is the default datastore for a workspace, but you can also use Blob storage as a datastore. To learn more, see Deciding when to use Azure Files, Azure Blobs, or Azure Disks.

# Default datastore (Azure file storage)
def_data_store = ws.get_default_datastore() 

# The above call is equivalent to this 
def_data_store = Datastore(ws, "workspacefilestore")

# Get blob storage associated with the workspace
def_blob_store = Datastore(ws, "workspaceblobstore")

Upload data files or directories to the datastore for them to be accessible from your pipelines. This example uses the Blob storage version of the datastore:

def_blob_store.upload_files(
    ["./data/20news.pkl"],
    target_path="20newsgroups", 
    overwrite=True)

A pipeline consists of one or more steps. A step is a unit run on a compute target. Steps might consume data sources and produce “intermediate” data. A step can create data such as a model, a directory with model and dependent files, or temporary data. This data is then available for other steps later in the pipeline.

Configure data reference

You just created a data source that can be referenced in a pipeline as an input to a step. A data source in a pipeline is represented by a DataReference object. The DataReference object points to data that lives in or is accessible from a datastore.

blob_input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="test_data",
    path_on_datastore="20newsgroups/20news.pkl")

Intermediate data (or output of a step) is represented by a PipelineData object. output_data1 is produced as the output of a step, and used as the input of one or more future steps. PipelineData introduces a data dependency between steps, and creates an implicit execution order in the pipeline.

output_data1 = PipelineData(
    "output_data1",
    datastore=def_blob_store,
    output_name="output_data1")

Set up compute target

In Azure Machine Learning, the term compute (or compute target) refers to the machines or clusters that perform the computational steps in your machine learning pipeline. See compute targets for model training for a full list of compute targets and how to create and attach them to your workspace. The process for creating and or attaching a compute target is the same regardless of whether you are training a model or running a pipeline step. After you create and attach your compute target, use the ComputeTarget object in your pipeline step.

Important

Performing management operations on compute targets is not supported from inside remote jobs. Since machine learning pipelines are submitted as a remote job, do not use management operations on compute targets from inside the pipeline.

Below are examples of creating and attaching compute targets for:

  • Azure Machine Learning Compute
  • Azure Databricks
  • Azure Data Lake Analytics

Azure Machine Learning Compute

You can create an Azure Machine Learning compute for running your steps.

compute_name = "aml-compute"
 if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target: ' + compute_name)
else:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size, # NC6 is GPU-enabled
                                                                min_nodes = 1, 
                                                                max_nodes = 4)
     # create the compute target
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)

    # Can poll for a minimum number of nodes and for a specific timeout. 
    # If no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

     # For a more detailed view of current cluster status, use the 'status' property    
    print(compute_target.status.serialize())

Azure Databricks

Azure Databricks is an Apache Spark-based environment in the Azure cloud. It can be used as a compute target with an Azure Machine Learning pipeline.

Create an Azure Databricks workspace before using it. To create these resource, see the Run a Spark job on Azure Databricks document.

To attach Azure Databricks as a compute target, provide the following information:

  • Databricks compute name: The name you want to assign to this compute resource.
  • Databricks workspace name: The name of the Azure Databricks workspace.
  • Databricks access token: The access token used to authenticate to Azure Databricks. To generate an access token, see the Authentication document.

The following code demonstrates how to attach Azure Databricks as a compute target with the Azure Machine Learning SDK:

import os
from azureml.core.compute import ComputeTarget, DatabricksCompute
from azureml.exceptions import ComputeTargetException

databricks_compute_name = os.environ.get("AML_DATABRICKS_COMPUTE_NAME", "<databricks_compute_name>")
databricks_workspace_name = os.environ.get("AML_DATABRICKS_WORKSPACE", "<databricks_workspace_name>")
databricks_resource_group = os.environ.get("AML_DATABRICKS_RESOURCE_GROUP", "<databricks_resource_group>")
databricks_access_token = os.environ.get("AML_DATABRICKS_ACCESS_TOKEN", "<databricks_access_token>")

try:
    databricks_compute = ComputeTarget(workspace=ws, name=databricks_compute_name)
    print('Compute target already exists')
except ComputeTargetException:
    print('compute not found')
    print('databricks_compute_name {}'.format(databricks_compute_name))
    print('databricks_workspace_name {}'.format(databricks_workspace_name))
    print('databricks_access_token {}'.format(databricks_access_token))

    # Create attach config
    attach_config = DatabricksCompute.attach_configuration(resource_group = databricks_resource_group,
                                                           workspace_name = databricks_workspace_name,
                                                           access_token = databricks_access_token)
    databricks_compute = ComputeTarget.attach(
             ws,
             databricks_compute_name,
             attach_config
         )

    databricks_compute.wait_for_completion(True)

Azure Data Lake Analytics

Azure Data Lake Analytics is a big data analytics platform in the Azure cloud. It can be used as a compute target with an Azure Machine Learning pipeline.

Create an Azure Data Lake Analytics account before using it. To create this resource, see the Get started with Azure Data Lake Analytics document.

To attach Data Lake Analytics as a compute target, you must use the Azure Machine Learning SDK and provide the following information:

  • Compute name: The name you want to assign to this compute resource.
  • Resource Group: The resource group that contains the Data Lake Analytics account.
  • Account name: The Data Lake Analytics account name.

The following code demonstrates how to attach Data Lake Analytics as a compute target:

import os
from azureml.core.compute import ComputeTarget, AdlaCompute
from azureml.exceptions import ComputeTargetException


adla_compute_name = os.environ.get("AML_ADLA_COMPUTE_NAME", "<adla_compute_name>")
adla_resource_group = os.environ.get("AML_ADLA_RESOURCE_GROUP", "<adla_resource_group>")
adla_account_name = os.environ.get("AML_ADLA_ACCOUNT_NAME", "<adla_account_name>")

try:
    adla_compute = ComputeTarget(workspace=ws, name=adla_compute_name)
    print('Compute target already exists')
except ComputeTargetException:
    print('compute not found')
    print('adla_compute_name {}'.format(adla_compute_name))
    print('adla_resource_id {}'.format(adla_resource_group))
    print('adla_account_name {}'.format(adla_account_name))
    # create attach config
    attach_config = AdlaCompute.attach_configuration(resource_group = adla_resource_group,
                                                     account_name = adla_account_name)
    # Attach ADLA
    adla_compute = ComputeTarget.attach(
             ws,
             adla_compute_name,
             attach_config
         )

    adla_compute.wait_for_completion(True)

Tip

Azure Machine Learning pipelines can only work with data stored in the default data store of the Data Lake Analytics account. If the data you need to work with is in a non-default store, you can use a DataTransferStep to copy the data before training.

Construct your pipeline steps

Once you create and attach a compute target to your workspace, you are ready to define a pipeline step. There are many built-in steps available via the Azure Machine Learning SDK. The most basic of these steps is a PythonScriptStep, which runs a Python script in a specified compute target.

trainStep = PythonScriptStep(
    script_name="train.py",
    arguments=["--input", blob_input_data, "--output", processed_data1],
    inputs=[blob_input_data],
    outputs=[processed_data1],
    compute_target=compute_target,
    source_directory=project_folder
)

After you define your steps, you build the pipeline by using some or all of those steps.

Note

No file or data is uploaded to the Azure Machine Learning service when you define the steps or build the pipeline.

# list of steps to run
compareModels = [trainStep, extractStep, compareStep]

# Build the pipeline
pipeline1 = Pipeline(workspace=ws, steps=[compareModels])

The following example uses the Azure Databricks compute target created earlier:

dbStep = DatabricksStep(
    name="databricksmodule",
    inputs=[step_1_input],
    outputs=[step_1_output],
    num_workers=1,
    notebook_path=notebook_path,
    notebook_params={'myparam': 'testparam'},
    run_name='demo run name',
    compute_target=databricks_compute,
    allow_reuse=False
)
# List of steps to run
steps = [dbStep]

# Build the pipeline
pipeline1 = Pipeline(workspace=ws, steps=steps)

Submit the pipeline

When you submit the pipeline, Azure Machine Learning service checks the dependencies for each step and uploads a snapshot of the source directory you specified. If no source directory is specified, the current local directory is uploaded.

# Submit the pipeline to be run
pipeline_run1 = Experiment(ws, 'Compare_Models_Exp').submit(pipeline1)
pipeline_run1.wait_for_completion()

When you first run a pipeline, Azure Machine Learning:

  • Downloads the project snapshot to the compute target from the Blob storage associated with the workspace.
  • Builds a Docker image corresponding to each step in the pipeline.
  • Downloads the docker image for each step to the compute target from the container registry.
  • Mounts the datastore, if a DataReference object is specified in a step. If mount is not supported, the data is instead copied to the compute target.
  • Runs the step in the compute target specified in the step definition.
  • Creates artifacts, such as logs, stdout and stderr, metrics, and output specified by the step. These artifacts are then uploaded and kept in the user’s default datastore.

Diagram of running an experiment as a pipeline

Publish a pipeline

You can publish a pipeline to run it with different inputs later. For the REST endpoint of an already published pipeline to accept parameters, you must parameterize the pipeline before publishing.

  1. To create a pipeline parameter, use a PipelineParameter object with a default value.

    pipeline_param = PipelineParameter(
      name="pipeline_arg", 
      default_value=10)
    
  2. Add this PipelineParameter object as a parameter to any of the steps in the pipeline as follows:

    compareStep = PythonScriptStep(
      script_name="compare.py",
      arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param],
      inputs=[ comp_data1, comp_data2],
      outputs=[out_data3],    
      target=compute_target, 
      source_directory=project_folder)
    
  3. Publish this pipeline that will accept a parameter when invoked.

    published_pipeline1 = pipeline1.publish(
        name="My_Published_Pipeline", 
        description="My Published Pipeline Description")
    

Run a published pipeline

All published pipelines have a REST endpoint. This endpoint invokes the run of the pipeline from external systems, such as non-Python clients. This endpoint enables "managed repeatability" in batch scoring and retraining scenarios.

To invoke the run of the preceding pipeline, you need an Azure Active Directory authentication header token, as described in AzureCliAuthentication class or get more details at Authentication in Azure Machine Learning notebook.

response = requests.post(published_pipeline1.endpoint, 
    headers=aad_token, 
    json={"ExperimentName": "My_Pipeline",
        "ParameterAssignments": {"pipeline_arg": 20}})

View results

See the list of all your pipelines and their run details:

  1. Sign in to the Azure portal.

  2. View your workspace to find the list of pipelines. list of machine learning pipelines

  3. Select a specific pipeline to see the run results.

Next steps

Learn how to run notebooks by following the article, Use Jupyter notebooks to explore this service.