Pipeline Class

Represents a collection of steps which can be executed as a reusable Azure Machine Learning workflow.

Use a Pipeline to create and manage workflows that stitch together various machine learning phases. Each machine learning phase, such as data preparation and model training, can consist of one or more steps in a Pipeline.

For an overview of why and when to use Pipelines, see https://aka.ms/pl-concept.

For an overview on constructing a Pipeline, see https://aka.ms/pl-first-pipeline.

Initialize Pipeline.

Inheritance
builtins.object
Pipeline

Constructor

Pipeline(workspace, steps, description=None, default_datastore=None, default_source_directory=None, resolve_closure=True, _workflow_provider=None, _service_endpoint=None, **kwargs)

Parameters

workspace
Workspace
Required

The workspace to submit the Pipeline on.

steps
list
Required

The list of steps to execute as part of a Pipeline.

description
str
Required

The description of the Pipeline.

default_datastore
AbstractAzureStorageDatastore or AzureDataLakeDatastore
Required

The default datastore to use for data connections.

default_source_directory
str
Required

The default script directory for steps which execute a script.

resolve_closure
bool
Required

Whether to resolve closure or not (automatically bring in dependent steps).

workspace
Workspace
Required

The workspace to submit the Pipeline on.

steps
list
Required

The list of steps to execute as part of a Pipeline.

description
str
Required

The description of the Pipeline.

default_datastore
AbstractAzureStorageDatastore or AzureDataLakeDatastore
Required

The default datastore to use for data connections.

default_source_directory
str
Required

The default script directory for steps which execute a script.

resolve_closure
bool
Required

Whether resolve closure or not (automatically bring in dependent steps).

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
Required

The workflow provider, if None one is created.

_service_endpoint
str
Required

The service endpoint, if None it is determined using the workspace.

kwargs
dict
Required

Custom keyword arguments, reserved for future development

Remarks

A pipeline is created with a list of steps and a workspace. There are a number of step types which can be used in a pipeline. You will select step type based on your machine learning scenario.

  • Azure Machine Learning Pipelines provides built-in steps for common scenarios. Pre-built steps derived from PipelineStep are steps that are used in one pipeline. For examples, see the steps package and the AutoMLStep class.

  • If your use machine learning workflow calls for creating steps that can be versioned and used across different pipelines, then use the functionality in the Module module.

Submit a pipeline using submit. When submit is called, a PipelineRun is created which in turn creates StepRun objects for each step in the workflow. Use these objects to monitor the run execution.

An example to submit a Pipeline is as follows:


   from azureml.pipeline.core import Pipeline

   pipeline = Pipeline(workspace=ws, steps=steps)
   pipeline_run = experiment.submit(pipeline)

There are a number of optional settings for a Pipeline which can be specified on submission in the submit.

  • continue_on_step_failure: Whether to continue pipeline execution if a step fails; the default is False. If True, only steps that have no dependency on the output of the failed step will continue execution.

  • regenerate_outputs: Whether to force regeneration of all step outputs and disallow data reuse for this run, default is False.

  • pipeline_parameters: Parameters to pipeline execution, dictionary of {name: value}. See PipelineParameter for more details.

  • parent_run_id: You can supply a run id to set the parent run of this pipeline run, which is reflected in RunHistory. The parent run must belong to the same experiment as this pipeline is being submitted to.

An example to submit a Pipeline using these settings is as follows:


   from azureml.pipeline.core import Pipeline

   pipeline = Pipeline(workspace=ws, steps=steps)
   pipeline_run = experiment.submit(pipeline,
                                    continue_on_step_failure=True,
                                    regenerate_outputs=True,
                                    pipeline_parameters={"param1": "value1"},
                                    parent_run_id="<run_id>")

Methods

load_yaml

Load a Pipeline from the specified YAML file.

A YAML file can be used to describe a Pipeline consisting of ModuleSteps.

publish

Publish a pipeline and make it available for rerunning.

Once a Pipeline is published, it can be submitted without the Python code which constructed the Pipeline. Returns the created PublishedPipeline.

service_endpoint

Get the service endpoint associated with the pipeline.

submit

Submit a pipeline run. This is equivalent to using submit.

Returns the submitted PipelineRun. Use this object to monitor and view details of the run.

validate

Validate a pipeline and identify potential errors, such as unconnected inputs.

load_yaml

Load a Pipeline from the specified YAML file.

A YAML file can be used to describe a Pipeline consisting of ModuleSteps.

static load_yaml(workspace, filename, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace
Required

The workspace to submit the Pipeline on.

filename
str
Required

The YAML file which describes the Pipeline.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint, if None, it is determined using the workspace.

Returns

The constructed Pipeline.

Return type

Remarks

See below for an example YAML file. The YAML contains a name, default_compute and lists of parameters, data references, and steps for the Pipeline. Each step should specify the module, compute and parameter, input, and output bindings. Additionally, a step runconfig and arguments can be specified if necessary.

Sample Yaml file:


   pipeline:
       description: SamplePipelineFromYaml
       parameters:
           NumIterationsParameter:
               type: int
               default: 40
           DataPathParameter:
               type: datapath
               default:
                   datastore: workspaceblobstore
                   path_on_datastore: sample2.txt
           NodeCountParameter:
               type: int
               default: 4
       data_references:
           DataReference:
               datastore: workspaceblobstore
               path_on_datastore: testfolder/sample.txt
           Dataset:
               dataset_name: 'titanic'
       default_compute: aml-compute
       steps:
           PrepareStep:
               type:  ModuleStep
               name: "TestModule"
               compute: aml-compute2
               runconfig: 'D:\.azureml\default_runconfig.yml'
               arguments:
               -'--input1'
               -input:in1
               -'--input2'
               -input:in2
               -'--input3'
               -input:in3
               -'--output'
               -output:output_data
               -'--param'
               -parameter:NUM_ITERATIONS
               parameters:
                   NUM_ITERATIONS:
                       source: NumIterationsParameter
               inputs:
                   in1:
                       source: Dataset
                       bind_mode: mount
                   in2:
                       source: DataReference
                   in3:
                       source: DataPathParameter
               outputs:
                   output_data:
                       destination: Output1
                       datastore: workspaceblobstore
                       bind_mode: mount
           TrainStep:
               type: ModuleStep
               name: "TestModule2"
               version: "2"
               runconfig: 'D:\.azureml\default_runconfig.yml'
               arguments:
               -'--input'
               -input:train_input
               -'--output'
               -output:result
               -'--param'
               -parameter:NUM_ITERATIONS
               parameters:
                   NUM_ITERATIONS: 10
               runconfig_parameters:
                   NodeCount:
                       source: NodeCountParameter
               inputs:
                   train_input:
                       source: Output1
                       bind_mode: mount
               outputs:
                   result:
                       destination: Output2
                       datastore: workspaceblobstore
                       bind_mode: mount

publish

Publish a pipeline and make it available for rerunning.

Once a Pipeline is published, it can be submitted without the Python code which constructed the Pipeline. Returns the created PublishedPipeline.

publish(name=None, description=None, version=None, continue_on_step_failure=None)

Parameters

name
str
default value: None

The name of the published pipeline.

description
str
default value: None

The description of the published pipeline.

version
str
default value: None

The version of the published pipeline.

continue_on_step_failure
bool
default value: None

Indicates whether to continue execution of other steps in the PipelineRun if a step fails; the default is false. If True, only steps that have no dependency on the output of the failed step will continue execution.

Returns

Created published pipeline.

Return type

service_endpoint

Get the service endpoint associated with the pipeline.

service_endpoint()

Returns

The service endpoint.

Return type

str

submit

Submit a pipeline run. This is equivalent to using submit.

Returns the submitted PipelineRun. Use this object to monitor and view details of the run.

submit(experiment_name, pipeline_parameters=None, continue_on_step_failure=False, regenerate_outputs=False, parent_run_id=None, credential_passthrough=None, **kwargs)

Parameters

experiment_name
str
Required

The name of the experiment to submit the pipeline on.

pipeline_parameters
dict
default value: None

Parameters to pipeline execution, dictionary of {name: value}. See PipelineParameter for more details.

continue_on_step_failure
bool
default value: False

Indicates whether to continue pipeline execution if a step fails. If True, only steps that have no dependency on the output of the failed step will continue execution.

regenerate_outputs
bool
default value: False

Indicates whether to force regeneration of all step outputs and disallow data reuse for this run. If False, this run may reuse results from previous runs and subsequent runs may reuse the results of this run.

parent_run_id
str
default value: None

Optional run ID to set for the parent run of this pipeline run, which is reflected in RunHistory. The parent run must belong to same experiment as this pipeline is being submitted to.

credential_passthrough
default value: None

Optional, if this flag is enabled the remote pipeline job will use the credentials of the user that initiated the job. This feature is only available in private preview.

Returns

The submitted pipeline run.

Return type

validate

Validate a pipeline and identify potential errors, such as unconnected inputs.

validate()

Returns

A list of errors in the pipeline.

Return type

Remarks

Examples of validation errors include:

  • missing or unexpected pipeline datasources or step types

  • missing parameters or output definitions for a pipeline datasource or step

  • unconnected inputs

  • pipeline steps that form a loop or cycle

If validation passes (returns an empty list) and your pipeline doesn't work, then see the Debug and troubleshoot machine learning pipelines.

Attributes

graph

Get the graph associated with the pipeline. Steps and data inputs appear as nodes in the graph.

Returns

The graph.

Return type