Trigger machine learning pipelines

In this article, you'll learn how to programmatically schedule a pipeline to run on Azure. You can create a schedule based on elapsed time or on file-system changes. Time-based schedules can be used to take care of routine tasks, such as monitoring for data drift. Change-based schedules can be used to react to irregular or unpredictable changes, such as new data being uploaded or old data being edited. After learning how to create schedules, you'll learn how to retrieve and deactivate them. Finally, you'll learn how to use other Azure services, Azure Logic App and Azure Data Factory, to run pipelines. An Azure Logic App allows for more complex triggering logic or behavior. Azure Data Factory pipelines allow you to call a machine learning pipeline as part of a larger data orchestration pipeline.

Prerequisites

Trigger pipelines with Azure Machine Learning SDK for Python

To schedule a pipeline, you'll need a reference to your workspace, the identifier of your published pipeline, and the name of the experiment in which you wish to create the schedule. You can get these values with the following code:

import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment

ws = Workspace.from_config()

experiments = Experiment.list(ws)
for experiment in experiments:
    print(experiment.name)

published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in  published_pipelines:
    print(f"{published_pipeline.name},'{published_pipeline.id}'")

experiment_name = "MyExperiment" 
pipeline_id = "aaaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" 

Create a schedule

To run a pipeline on a recurring basis, you'll create a schedule. A Schedule associates a pipeline, an experiment, and a trigger. The trigger can either be aScheduleRecurrence that describes the wait between runs or a Datastore path that specifies a directory to watch for changes. In either case, you'll need the pipeline identifier and the name of the experiment in which to create the schedule.

At the top of your python file, import the Schedule and ScheduleRecurrence classes:


from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

Create a time-based schedule

The ScheduleRecurrence constructor has a required frequency argument that must be one of the following strings: "Minute", "Hour", "Day", "Week", or "Month". It also requires an integer interval argument specifying how many of the frequency units should elapse between schedule starts. Optional arguments allow you to be more specific about starting times, as detailed in the ScheduleRecurrence SDK docs.

Create a Schedule that begins a run every 15 minutes:

recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
                            description="Based on time",
                            pipeline_id=pipeline_id, 
                            experiment_name=experiment_name, 
                            recurrence=recurrence)

Create a change-based schedule

Pipelines that are triggered by file changes may be more efficient than time-based schedules. When you want to do something before a file is changed, or when a new file is added to a data directory, you can preprocess that file. You can monitor any changes to a datastore or changes within a specific directory within the datastore. If you monitor a specific directory, changes within subdirectories of that directory will not trigger a run.

To create a file-reactive Schedule, you must set the datastore parameter in the call to Schedule.create. To monitor a folder, set the path_on_datastore argument.

The polling_interval argument allows you to specify, in minutes, the frequency at which the datastore is checked for changes.

If the pipeline was constructed with a DataPath PipelineParameter, you can set that variable to the name of the changed file by setting the data_path_parameter_name argument.

datastore = Datastore(workspace=ws, name="workspaceblobstore")

reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
                            pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, data_path_parameter_name="input_data")

Optional arguments when creating a schedule

In addition to the arguments discussed previously, you may set the status argument to "Disabled" to create an inactive schedule. Finally, the continue_on_step_failure allows you to pass a Boolean that will override the pipeline's default failure behavior.

View your scheduled pipelines

In your Web browser, navigate to Azure Machine Learning. From the Endpoints section of the navigation panel, choose Pipeline endpoints. This takes you to a list of the pipelines published in the Workspace.

Pipelines page of AML

In this page you can see summary information about all the pipelines in the Workspace: names, descriptions, status, and so forth. Drill in by clicking in your pipeline. On the resulting page, there are more details about your pipeline and you may drill down into individual runs.

Deactivate the pipeline

If you have a Pipeline that is published, but not scheduled, you can disable it with:

pipeline = PublishedPipeline.get(ws, id=pipeline_id)
pipeline.disable()

If the pipeline is scheduled, you must cancel the schedule first. Retrieve the schedule's identifier from the portal or by running:

ss = Schedule.list(ws)
for s in ss:
    print(s)

Once you have the schedule_id you wish to disable, run:

def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    s.disable()
    return s

stop_by_schedule_id(ws, schedule_id)

If you then run Schedule.list(ws) again, you should get an empty list.

Use Azure Logic Apps for complex triggers

More complex trigger rules or behavior can be created using an Azure Logic App.

To use an Azure Logic App to trigger a Machine Learning pipeline, you'll need the REST endpoint for a published Machine Learning pipeline. Create and publish your pipeline. Then find the REST endpoint of your PublishedPipeline by using the pipeline ID:

# You can find the pipeline ID in Azure Machine Learning studio

published_pipeline = PublishedPipeline.get(ws, id="<pipeline-id-here>")
published_pipeline.endpoint 

Create a Logic App

Now create an Azure Logic App instance. If you wish, use an integration service environment (ISE) and set up a customer-managed key for use by your Logic App.

Once your Logic App has been provisioned, use these steps to configure a trigger for your pipeline:

  1. Create a system-assigned managed identity to give the app access to your Azure Machine Learning Workspace.

  2. Navigate to the Logic App Designer view and select the Blank Logic App template.

    Blank template

  3. In the Designer, search for blob. Select the When a blob is added or modified (properties only) trigger and add this trigger to your Logic App.

    Add trigger

  4. Fill in the connection info for the Blob storage account you wish to monitor for blob additions or modifications. Select the Container to monitor.

    Choose the Interval and Frequency to poll for updates that work for you.

    Note

    This trigger will monitor the selected Container but won't monitor subfolders.

  5. Add an HTTP action that will run when a new or modified blob is detected. Select + New Step, then search for and select the HTTP action.

Search for HTTP action

Use the following settings to configure your action:

Setting Value
HTTP action POST
URI the endpoint to the published pipeline that you found as a Prerequisite
Authentication mode Managed Identity
  1. Set up your schedule to set the value of any DataPath PipelineParameters you may have:

    {
      "DataPathAssignments": {
        "input_datapath": {
          "DataStoreName": "<datastore-name>",
          "RelativePath": "@{triggerBody()?['Name']}" 
        }
      },
      "ExperimentName": "MyRestPipeline",
      "ParameterAssignments": {
        "input_string": "sample_string3"
      },
      "RunSource": "SDK"
    }
    

    Use the DataStoreName you added to your workspace as a Prerequisite.

    HTTP settings

  2. Select Save and your schedule is now ready.

Important

If you are using Azure role-based access control (Azure RBAC) to manage access to your pipeline, set the permissions for your pipeline scenario (training or scoring).

Call machine learning pipelines from Azure Data Factory pipelines

In an Azure Data Factory pipeline, the Machine Learning Execute Pipeline activity runs an Azure Machine Learning pipeline. You can find this activity in the Data Factory's authoring page under the Machine Learning category:

Screenshot showing the ML pipeline activity in the Azure Data Factory authoring environment

Next steps

In this article, you used the Azure Machine Learning SDK for Python to schedule a pipeline in two different ways. One schedule recurs based on elapsed clock time. The other schedule runs if a file is modified on a specified Datastore or within a directory on that store. You saw how to use the portal to examine the pipeline and individual runs. You learned how to disable a schedule so that the pipeline stops running. Finally, you created an Azure Logic App to trigger a pipeline.

For more information, see: