How to use pipeline component to build nested pipeline job (V2)

APPLIES TO: Azure CLI ml extension v2 (current) Python SDK azure-ai-ml v2 (current)

When developing a complex machine learning pipeline, it's common to have sub-pipelines that use multi-step to perform tasks such as data preprocessing and model training. These sub-pipelines can be developed and tested standalone. Pipeline component groups multi-step as a component that can be used as a single step to create complex pipelines. Which will help you share your work and better collaborate with team members.

By using a pipeline component, the author can focus on developing sub-tasks and easily integrate them with the entire pipeline job. Furthermore, a pipeline component has a well-defined interface in terms of inputs and outputs, which means that user of the pipeline component doesn't need to know the implementation details of the component.

In this article, you'll learn how to use pipeline component in Azure Machine Learning pipeline.

Prerequisites

The difference between pipeline job and pipeline component

In general, pipeline components are similar to pipeline jobs because they both contain a group of jobs/components.

Here are some main differences you need to be aware of when defining pipeline components:

  • Pipeline component only defines the interface of inputs/outputs, which means when defining a pipeline component you need to explicitly define the type of inputs/outputs instead of directly assigning values to them.
  • Pipeline component can't have runtime settings, you can't hard-code compute, or data node in the pipeline component. Instead you need to promote them as pipeline level inputs and assign values during runtime.
  • Pipeline level settings such as default_datastore and default_compute are also runtime settings. They aren't part of pipeline component definition.

CLI v2

The example used in this article can be found in azureml-example repo. Navigate to azureml-examples/cli/jobs/pipelines-with-components/pipeline_with_pipeline_component to check the example.

You can use multi-components to build a pipeline component. Similar to how you built pipeline job with component. This is two step pipeline component.

$schema: https://azuremlschemas.azureedge.net/latest/pipelineComponent.schema.json
type: pipeline

name: train_pipeline_component
display_name: train_pipeline_component
description: Dummy train-score-eval pipeline component with local components

inputs:
  training_data: 
    type: uri_folder  # default/path is not supported for data type
  test_data: 
    type: uri_folder  # default/path is not supported for data type
  training_max_epochs:
    type: integer
  training_learning_rate: 
    type: number
  learning_rate_schedule:
    type: string
    default: 'time-based'
  train_node_compute: # example to show how to promote compute as input
    type: string

outputs: 
  trained_model:
    type: uri_folder
  evaluation_report:
    type: uri_folder

jobs:
  train_job:
    type: command
    component: ./train/train.yml
    inputs:
      training_data: ${{parent.inputs.training_data}}
      max_epochs: ${{parent.inputs.training_max_epochs}}
      learning_rate: ${{parent.inputs.training_learning_rate}}
      learning_rate_schedule: ${{parent.inputs.learning_rate_schedule}}
      
    outputs:
      model_output: ${{parent.outputs.trained_model}}
    compute: ${{parent.inputs.train_node_compute}}
  
  score_job:
    type: command
    component: ./score/score.yml
    inputs:
      model_input: ${{parent.jobs.train_job.outputs.model_output}}
      test_data: ${{parent.inputs.test_data}}
    outputs:
      score_output: 
        mode: upload

  evaluate_job:
    type: command
    component: ./eval/eval.yml
    inputs:
      scoring_result: ${{parent.jobs.score_job.outputs.score_output}}
    outputs:
      eval_output: ${{parent.outputs.evaluation_report}}

When reference pipeline component to define child job in a pipeline job, just like reference other type of component. You can provide runtime settings such as default_datastore, default_compute in pipeline job level, any parameter you want to change during run time need promote as pipeline job inputs, otherwise, they'll be hard-code in next pipeline component. We're support to promote compute as pipeline component input to support heterogenous pipeline, which may need different compute target in different steps.

$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json

display_name: pipeline_with_pipeline_component
experiment_name: pipeline_with_pipeline_component
description: Select best model trained with different learning rate
type: pipeline

inputs:
  pipeline_job_training_data: 
    type: uri_folder
    path: ./data
  pipeline_job_test_data: 
    type: uri_folder
    path: ./data
  pipeline_job_training_learning_rate1: 0.1
  pipeline_job_training_learning_rate2: 0.01
  compute_train_node: cpu-cluster
  compute_compare_node: cpu-cluster

outputs: 
  pipeline_job_best_model:
    mode: upload
  pipeline_job_best_result:
    mode: upload

settings:
  default_datastore: azureml:workspaceblobstore
  default_compute: azureml:cpu-cluster
  continue_on_step_failure: false

jobs:
  train_and_evaluate_model1:
    type: pipeline
    component: ./components/train_pipeline_component.yml
    inputs:
      training_data: ${{parent.inputs.pipeline_job_training_data}}
      test_data: ${{parent.inputs.pipeline_job_test_data}}
      training_max_epochs: 20
      training_learning_rate: ${{parent.inputs.pipeline_job_training_learning_rate1}}
      train_node_compute: ${{parent.inputs.compute_train_node}}

  train_and_evaluate_model2:
    type: pipeline
    component: ./components/train_pipeline_component.yml
    inputs:
      training_data: ${{parent.inputs.pipeline_job_training_data}}
      test_data: ${{parent.inputs.pipeline_job_test_data}}
      training_max_epochs: 20
      training_learning_rate: ${{parent.inputs.pipeline_job_training_learning_rate2}}
      train_node_compute: ${{parent.inputs.compute_train_node}}

  compare:
    type: command
    component: ./components/compare2/compare2.yml
    compute: ${{parent.inputs.compute_compare_node}} # example to show how to promote compute as pipeline level inputs
    inputs:
      model1: ${{parent.jobs.train_and_evaluate_model1.outputs.trained_model}}
      eval_result1: ${{parent.jobs.train_and_evaluate_model1.outputs.evaluation_report}}
      model2: ${{parent.jobs.train_and_evaluate_model2.outputs.trained_model}}
      eval_result2: ${{parent.jobs.train_and_evaluate_model2.outputs.evaluation_report}}
    outputs: 
      best_model: ${{parent.outputs.pipeline_job_best_model}}
      best_result: ${{parent.outputs.pipeline_job_best_result}}

Python SDK

The python SDK example can be found in azureml-example repo. Navigate to azureml-examples/sdk/python/jobs/pipelines/1j_pipeline_with_pipeline_component/pipeline_with_train_eval_pipeline_component to check the example.

You can define a pipeline component using a Python function, which is similar to defining a pipeline job using a function. You can also promote the compute of some step to be used as inputs for the pipeline component.

@pipeline()
def train_pipeline_component(
    training_input: Input,
    test_input: Input,
    training_learning_rate: float,
    train_compute: str,
    training_max_epochs: int = 20,
    learning_rate_schedule: str = "time-based",
):
    """E2E dummy train-score-eval pipeline with components defined via yaml."""
    # Call component obj as function: apply given inputs & parameters to create a node in pipeline
    train_with_sample_data = train_model(
        training_data=training_input,
        max_epochs=training_max_epochs,
        learning_rate=training_learning_rate,
        learning_rate_schedule=learning_rate_schedule,
    )
    train_with_sample_data.compute = train_compute

    score_with_sample_data = score_data(
        model_input=train_with_sample_data.outputs.model_output, test_data=test_input
    )
    score_with_sample_data.outputs.score_output.mode = "upload"

    eval_with_sample_data = eval_model(
        scoring_result=score_with_sample_data.outputs.score_output
    )

    # Return: pipeline outputs
    return {
        "trained_model": train_with_sample_data.outputs.model_output,
        "evaluation_report": eval_with_sample_data.outputs.eval_output,
    }

You can use pipeline component as a step like other components in pipeline job.

# Construct pipeline
@pipeline
def pipeline_with_pipeline_component(
    training_input,
    test_input,
    compute_train_node,
    training_learning_rate1=0.1,
    training_learning_rate2=0.01,
):
    # Create two training pipeline component with different learning rate
    # Use anonymous pipeline function for step1
    train_and_evaluate_model1 = train_pipeline_component(
        training_input=training_input,
        test_input=test_input,
        training_learning_rate=training_learning_rate1,
        train_compute=compute_train_node,
    )
    # Use registered pipeline function for step2
    train_and_evaluate_model2 = registered_pipeline_component(
        training_input=training_input,
        test_input=test_input,
        training_learning_rate=training_learning_rate2,
        train_compute=compute_train_node,
    )

    compare2_models = compare2(
        model1=train_and_evaluate_model1.outputs.trained_model,
        eval_result1=train_and_evaluate_model1.outputs.evaluation_report,
        model2=train_and_evaluate_model2.outputs.trained_model,
        eval_result2=train_and_evaluate_model2.outputs.evaluation_report,
    )
    # Return: pipeline outputs
    return {
        "best_model": compare2_models.outputs.best_model,
        "best_result": compare2_models.outputs.best_result,
    }


pipeline_job = pipeline_with_pipeline_component(
    training_input=Input(type="uri_folder", path="./data/"),
    test_input=Input(type="uri_folder", path="./data/"),
    compute_train_node="cpu-cluster",
)

# set pipeline level compute
pipeline_job.settings.default_compute = "cpu-cluster"

Pipeline job with pipeline component in studio

You can use az ml component create or ml_client.components.create_or_update to register pipeline component as a registered component. After that you can view the component in asset library and component list page.

Using pipeline component to build pipeline job

After you register the pipeline component, you can drag and drop the pipeline component into the designer canvas and use the UI to build pipeline job.

Screenshot of the designer canvas page to build pipeline job with pipeline component.

View pipeline job using pipeline component

After submitted pipeline job, you can go to pipeline job detail page to change pipeline component status, you can also drill down to child component in pipeline component to debug specific component.

Screenshot of view pipeline component on the pipeline job detail page.

Sample notebooks

Next steps