Pipeline Class

Represents a collection of steps which can be executed as a reusable Azure Machine Learning workflow.

Use a Pipeline to create and manage workflows that stitch together various machine learning phases. Each machine learning phase, such as data preparation and model training, can consist of one or more steps in a Pipeline.

For an overview of why and when to use Pipelines, see https://aka.ms/pl-concept.

For an overview on constructing a Pipeline, see https://aka.ms/pl-first-pipeline.

Initialize Pipeline.

Inheritance
builtins.object
Pipeline

Constructor

Pipeline(workspace, steps, description=None, default_datastore=None, default_source_directory=None, resolve_closure=True, _workflow_provider=None, _service_endpoint=None, **kwargs)

Parameters

Name Description
workspace
Required

The workspace to submit the Pipeline on.

steps
Required

The list of steps to execute as part of a Pipeline.

description
Required
str

The description of the Pipeline.

default_datastore
Required

The default datastore to use for data connections.

default_source_directory
Required
str

The default script directory for steps which execute a script.

resolve_closure
Required

Whether to resolve closure or not (automatically bring in dependent steps).

workspace
Required

The workspace to submit the Pipeline on.

steps
Required

The list of steps to execute as part of a Pipeline.

description
Required
str

The description of the Pipeline.

default_datastore
Required

The default datastore to use for data connections.

default_source_directory
Required
str

The default script directory for steps which execute a script.

resolve_closure
Required

Whether resolve closure or not (automatically bring in dependent steps).

_workflow_provider
Required
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>

The workflow provider, if None one is created.

_service_endpoint
Required
str

The service endpoint, if None it is determined using the workspace.

kwargs
Required

Custom keyword arguments, reserved for future development

Remarks

A pipeline is created with a list of steps and a workspace. There are a number of step types which can be used in a pipeline. You will select step type based on your machine learning scenario.

  • Azure Machine Learning Pipelines provides built-in steps for common scenarios. Pre-built steps derived from PipelineStep are steps that are used in one pipeline. For examples, see the steps package and the AutoMLStep class.

  • If your use machine learning workflow calls for creating steps that can be versioned and used across different pipelines, then use the functionality in the Module module.

Submit a pipeline using submit. When submit is called, a PipelineRun is created which in turn creates StepRun objects for each step in the workflow. Use these objects to monitor the run execution.

An example to submit a Pipeline is as follows:


   from azureml.pipeline.core import Pipeline

   pipeline = Pipeline(workspace=ws, steps=steps)
   pipeline_run = experiment.submit(pipeline)

There are a number of optional settings for a Pipeline which can be specified on submission in the submit.

  • continue_on_step_failure: Whether to continue pipeline execution if a step fails; the default is False. If True, only steps that have no dependency on the output of the failed step will continue execution.

  • regenerate_outputs: Whether to force regeneration of all step outputs and disallow data reuse for this run, default is False.

  • pipeline_parameters: Parameters to pipeline execution, dictionary of {name: value}. See PipelineParameter for more details.

  • parent_run_id: You can supply a run id to set the parent run of this pipeline run, which is reflected in RunHistory. The parent run must belong to the same experiment as this pipeline is being submitted to.

An example to submit a Pipeline using these settings is as follows:


   from azureml.pipeline.core import Pipeline

   pipeline = Pipeline(workspace=ws, steps=steps)
   pipeline_run = experiment.submit(pipeline,
                                    continue_on_step_failure=True,
                                    regenerate_outputs=True,
                                    pipeline_parameters={"param1": "value1"},
                                    parent_run_id="<run_id>")

Methods

load_yaml

Load a Pipeline from the specified YAML file.

A YAML file can be used to describe a Pipeline consisting of ModuleSteps.

publish

Publish a pipeline and make it available for rerunning.

Once a Pipeline is published, it can be submitted without the Python code which constructed the Pipeline. Returns the created PublishedPipeline.

service_endpoint

Get the service endpoint associated with the pipeline.

submit

Submit a pipeline run. This is equivalent to using submit.

Returns the submitted PipelineRun. Use this object to monitor and view details of the run.

validate

Validate a pipeline and identify potential errors, such as unconnected inputs.

load_yaml

Load a Pipeline from the specified YAML file.

A YAML file can be used to describe a Pipeline consisting of ModuleSteps.

static load_yaml(workspace, filename, _workflow_provider=None, _service_endpoint=None)

Parameters

Name Description
workspace
Required

The workspace to submit the Pipeline on.

filename
Required
str

The YAML file which describes the Pipeline.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>

The workflow provider.

default value: None
_service_endpoint
str

The service endpoint, if None, it is determined using the workspace.

default value: None

Returns

Type Description

The constructed Pipeline.

Remarks

See below for an example YAML file. The YAML contains a name, default_compute and lists of parameters, data references, and steps for the Pipeline. Each step should specify the module, compute and parameter, input, and output bindings. Additionally, a step runconfig and arguments can be specified if necessary.

Sample Yaml file:


   pipeline:
       description: SamplePipelineFromYaml
       parameters:
           NumIterationsParameter:
               type: int
               default: 40
           DataPathParameter:
               type: datapath
               default:
                   datastore: workspaceblobstore
                   path_on_datastore: sample2.txt
           NodeCountParameter:
               type: int
               default: 4
       data_references:
           DataReference:
               datastore: workspaceblobstore
               path_on_datastore: testfolder/sample.txt
           Dataset:
               dataset_name: 'titanic'
       default_compute: aml-compute
       steps:
           PrepareStep:
               type:  ModuleStep
               name: "TestModule"
               compute: aml-compute2
               runconfig: 'D:\.azureml\default_runconfig.yml'
               arguments:
               -'--input1'
               -input:in1
               -'--input2'
               -input:in2
               -'--input3'
               -input:in3
               -'--output'
               -output:output_data
               -'--param'
               -parameter:NUM_ITERATIONS
               parameters:
                   NUM_ITERATIONS:
                       source: NumIterationsParameter
               inputs:
                   in1:
                       source: Dataset
                       bind_mode: mount
                   in2:
                       source: DataReference
                   in3:
                       source: DataPathParameter
               outputs:
                   output_data:
                       destination: Output1
                       datastore: workspaceblobstore
                       bind_mode: mount
           TrainStep:
               type: ModuleStep
               name: "TestModule2"
               version: "2"
               runconfig: 'D:\.azureml\default_runconfig.yml'
               arguments:
               -'--input'
               -input:train_input
               -'--output'
               -output:result
               -'--param'
               -parameter:NUM_ITERATIONS
               parameters:
                   NUM_ITERATIONS: 10
               runconfig_parameters:
                   NodeCount:
                       source: NodeCountParameter
               inputs:
                   train_input:
                       source: Output1
                       bind_mode: mount
               outputs:
                   result:
                       destination: Output2
                       datastore: workspaceblobstore
                       bind_mode: mount

publish

Publish a pipeline and make it available for rerunning.

Once a Pipeline is published, it can be submitted without the Python code which constructed the Pipeline. Returns the created PublishedPipeline.

publish(name=None, description=None, version=None, continue_on_step_failure=None)

Parameters

Name Description
name
str

The name of the published pipeline.

default value: None
description
str

The description of the published pipeline.

default value: None
version
str

The version of the published pipeline.

default value: None
continue_on_step_failure

Indicates whether to continue execution of other steps in the PipelineRun if a step fails; the default is false. If True, only steps that have no dependency on the output of the failed step will continue execution.

default value: None

Returns

Type Description

Created published pipeline.

service_endpoint

Get the service endpoint associated with the pipeline.

service_endpoint()

Returns

Type Description
str

The service endpoint.

submit

Submit a pipeline run. This is equivalent to using submit.

Returns the submitted PipelineRun. Use this object to monitor and view details of the run.

submit(experiment_name, pipeline_parameters=None, continue_on_step_failure=False, regenerate_outputs=False, parent_run_id=None, credential_passthrough=None, **kwargs)

Parameters

Name Description
experiment_name
Required
str

The name of the experiment to submit the pipeline on.

pipeline_parameters

Parameters to pipeline execution, dictionary of {name: value}. See PipelineParameter for more details.

default value: None
continue_on_step_failure

Indicates whether to continue pipeline execution if a step fails. If True, only steps that have no dependency on the output of the failed step will continue execution.

default value: False
regenerate_outputs

Indicates whether to force regeneration of all step outputs and disallow data reuse for this run. If False, this run may reuse results from previous runs and subsequent runs may reuse the results of this run.

default value: False
parent_run_id
str

Optional run ID to set for the parent run of this pipeline run, which is reflected in RunHistory. The parent run must belong to same experiment as this pipeline is being submitted to.

default value: None
credential_passthrough

Optional, if this flag is enabled the remote pipeline job will use the credentials of the user that initiated the job. This feature is only available in private preview.

default value: None

Returns

Type Description

The submitted pipeline run.

validate

Validate a pipeline and identify potential errors, such as unconnected inputs.

validate()

Returns

Type Description

A list of errors in the pipeline.

Remarks

Examples of validation errors include:

  • missing or unexpected pipeline datasources or step types

  • missing parameters or output definitions for a pipeline datasource or step

  • unconnected inputs

  • pipeline steps that form a loop or cycle

If validation passes (returns an empty list) and your pipeline doesn't work, then see the Debug and troubleshoot machine learning pipelines.

Attributes

graph

Get the graph associated with the pipeline. Steps and data inputs appear as nodes in the graph.

Returns

Type Description

The graph.