Schedule machine learning pipelines with Azure Machine Learning SDK for Python

In this article, you'll learn how to programmatically schedule a pipeline to run on Azure. You can choose to create a schedule based on elapsed time or on file-system changes. Time-based schedules can be used to take care of routine tasks, such as monitoring for data drift. Change-based schedules can be used to react to irregular or unpredictable changes, such as new data being uploaded or old data being edited. After learning how to create schedules, you'll learn how to retrieve and deactivate them.

Prerequisites

Initialize the workspace & get data

To schedule a pipeline, you'll need a reference to your workspace, the identifier of your published pipeline, and the name of the experiment in which you wish to create the schedule. You can get these values with the following code:

import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment

ws = Workspace.from_config()

experiments = Experiment.list(ws)
for experiment in experiments:
    print(experiment.name)

published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in  published_pipelines:
    print(f"{published_pipeline.name},'{published_pipeline.id}'")

experiment_name = "MyExperiment" 
pipeline_id = "aaaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" 

Create a schedule

To run a pipeline on a recurring basis, you'll create a schedule. A Schedule associates a pipeline, an experiment, and a trigger. The trigger can either be aScheduleRecurrence that describes the wait between runs or a Datastore path that specifies a directory to watch for changes. In either case, you'll need the pipeline identifier and the name of the experiment in which to create the schedule.

Create a time-based schedule

The ScheduleRecurrence constructor has a required frequency argument that must be one of the following strings: "Minute", "Hour", "Day", "Week", or "Month". It also requires an integer interval argument specifying how many of the frequency units should elapse between schedule starts. Optional arguments allow you to be more specific about starting times, as detailed in the ScheduleRecurrence SDK docs.

Create a Schedule that begins a run every 15 minutes:

recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
                            description="Based on time",
                            pipeline_id=pipeline_id, 
                            experiment_name=experiment_name, 
                            recurrence=recurrence)

Create a change-based schedule

Pipelines that are triggered by file changes may be more efficient than time-based schedules. For instance, you may want to perform a preprocessing step when a file is changed, or when a new file is added to a data directory. You can monitor any changes to a datastore or changes within a specific directory within the datastore. If you monitor a specific directory, changes within subdirectories of that directory will not trigger a run.

To create a file-reactive Schedule, you must set the datastore parameter in the call to Schedule.create. To monitor a folder, set the path_on_datastore argument.

The polling_interval argument allows you to specify, in minutes, the frequency at which the datastore is checked for changes.

If the pipeline was constructed with a DataPath PipelineParameter, you can set that variable to the name of the changed file by setting the data_path_parameter_name argument.

datastore = Datastore(workspace=ws, name="workspaceblobstore")

reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on time",
                            pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, data_path_parameter_name="input_data")

Optional arguments when creating a schedule

In addition to the arguments discussed previously, you may set the status argument to "Disabled" to create an inactive schedule. Finally, the continue_on_step_failure allows you to pass a Boolean that will override the pipeline's default failure behavior.

View your scheduled pipelines

In your Web browser, navigate to Azure Machine Learning. From the Endpoints section of the navigation panel, choose Pipeline endpoints. This takes you to a list of the pipelines published in the Workspace.

Pipelines page of AML

In this page you can see summary information about all the pipelines in the Workspace: names, descriptions, status, and so forth. Drill in by clicking in your pipeline. On the resulting page, there are more details about your pipeline and you may drill down into individual runs.

Deactivate the pipeline

If you have a Pipeline that is published, but not scheduled, you can disable it with:

pipeline = PublishedPipeline.get(ws, id=pipeline_id)
pipeline.disable()

If the pipeline is scheduled, you must cancel the schedule first. Retrieve the schedule's identifier from the portal or by running:

ss = Schedule.list(ws)
for s in ss:
    print(s)

Once you have the schedule_id you wish to disable, run:

def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    s.disable()
    return s

stop_by_schedule(ws, schedule_id)

If you then run Schedule.list(ws) again, you should get an empty list.

Next steps

In this article, you used the Azure Machine Learning SDK for Python to schedule a pipeline in two different ways. One schedule recurs based on elapsed clock time. The other schedule runs if a file is modified on a specified Datastore or within a directory on that store. You saw how to use the portal to examine the pipeline and individual runs. Finally, you learned how to disable a schedule so that the pipeline stops running.

For more information, see: