Schedule class

Definition

Defines a schedule on which to submit a pipeline.

Once a Pipeline is published, a Schedule can be used to submit the Pipeline at a specified interval or when changes to a Blob storage location are detected.

Schedule(workspace, id, name, description, pipeline_id, status, recurrence, datastore_name, polling_interval, data_path_parameter_name, continue_on_step_failure, path_on_datastore, _schedule_provider=None)
Inheritance
builtins.object
Schedule

Parameters

workspace
Workspace

The workspace object this Schedule will belong to.

id
str

The ID of the Schedule.

name
str

The name of the Schedule.

description
str

The description of the schedule.

pipeline_id
str

The ID of the pipeline the schedule will submit.

status
str

The status of the schedule, either 'Active' or 'Disabled'.

recurrence
ScheduleRecurrence

The schedule recurrence for the pipeline.

datastore_name
str

The name of the datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported.

polling_interval
int

How long, in minutes, between polling for modified/added blobs.

data_path_parameter_name
str

The name of the data path pipeline parameter to set with the changed blob path.

continue_on_step_failure
bool

Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline.

path_on_datastore
str

Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules.

_schedule_provider
_AevaScheduleProvider object

The schedule provider.

Remarks

Two types of schedules are supported. The first uses time recurrence to submit a Pipeline on a given schedule. The second monitors an AzureBlobDatastore for added or modified blobs and submits a Pipeline when changes are detected.

To create a Schedule which will submit a Pipeline on a recurring schedule, use the ScheduleRecurrence when creating the Schedule.

A ScheduleRecurrence is used when creating a Schedule for a Pipeline as follows:


   from azureml.pipeline.core import Schedule, ScheduleRecurrence

   recurrence = ScheduleRecurrence(frequency="Hour", interval=12)
   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
                              experiment_name="helloworld", recurrence=recurrence)

This Schedule will submit the provided PublishedPipeline every 12 hours. The submitted Pipeline will be created under the Experiment with the name "helloworld".

To create a Schedule which will trigger PipelineRuns on modifications to a Blob storage location, specify a Datastore and related data info when creating the Schedule.


   from azureml.pipeline.core import Schedule
   from azureml.core.datastore import Datastore

   datastore = Datastore(workspace=ws, name="workspaceblobstore")

   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id"
                              experiment_name="helloworld", datastore=datastore,
                              polling_interval=5, path_on_datastore="file/path")

Note that the polling_interval and path_on_datastore parameters are optional. The polling_interval specifies how often to poll for modifications to the Datastore, and by default is 5 minutes. path_on_datastore can be used to specify which folder on the Datastore to monitor for changes. If None, the Datastore container is monitored. Note: blob additions/modifications in sub-folders of the path_on_datastore or the Datastore container (if no path_on_datastore is specified) are not detected.

Additionally, if the Pipeline was constructed to use a DataPath PipelineParameter to describe a step input, use the data_path_parameter_name parameter when creating a Datastore-triggered Schedule to set the input to the changed file when a PipelineRun is submitted by the Schedule.

In the following example, when the Schedule triggers the PipelineRun, the value of the "input_data" PipelineParameter will be set as the file which was modified/added:


   from azureml.pipeline.core import Schedule
   from azureml.core.datastore import Datastore

   datastore = Datastore(workspace=ws, name="workspaceblobstore")

   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
                              experiment_name="helloworld", datastore=datastore,
                              data_path_parameter_name="input_data")

For more information on Schedules, see: https://aka.ms/pl-schedule.

Methods

create(workspace, name, pipeline_id, experiment_name, recurrence=None, description=None, pipeline_parameters=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=5, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None, _workflow_provider=None, _service_endpoint=None)

Create a schedule for a pipeline.

Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions.

disable(wait_for_provisioning=False, wait_timeout=3600)

Set the schedule to 'Disabled' and unavailable to run.

enable(wait_for_provisioning=False, wait_timeout=3600)

Set the schedule to 'Active' and available to run.

get(workspace, id, _workflow_provider=None, _service_endpoint=None)

Get the schedule with the given id.

get_all(workspace, active_only=True, pipeline_id=None, _workflow_provider=None, _service_endpoint=None)

Get all schedules in the current workspace.

DEPRECATED: This method is being deprecated in favor of the list(workspace, active_only=True, pipeline_id=None, _workflow_provider=None, _service_endpoint=None) method.

get_last_pipeline_run()

Fetch the last pipeline run submitted by the schedule. Returns None if no runs have been submitted.

get_pipeline_runs()

Fetch the pipeline runs that were generated from the schedule.

get_schedules_for_pipeline_id(workspace, pipeline_id, _workflow_provider=None, _service_endpoint=None)

Get all schedules for the given pipeline id.

list(workspace, active_only=True, pipeline_id=None, _workflow_provider=None, _service_endpoint=None)

Get all schedules in the current workspace.

load_yaml(workspace, filename, _workflow_provider=None, _service_endpoint=None)

Load and read the Yaml file to get schedule parameters.

Yaml file is one more way to pass Schedule parameters to create schedule.

update(name=None, description=None, recurrence=None, pipeline_parameters=None, status=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=None, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None)

Update the schedule.

create(workspace, name, pipeline_id, experiment_name, recurrence=None, description=None, pipeline_parameters=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=5, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None, _workflow_provider=None, _service_endpoint=None)

Create a schedule for a pipeline.

Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions.

create(workspace, name, pipeline_id, experiment_name, recurrence=None, description=None, pipeline_parameters=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=5, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace object this Schedule will belong to.

name
str

The name of the Schedule.

pipeline_id
str

The ID of the pipeline the schedule will submit.

experiment_name
str

The name of the experiment the schedule will submit runs on.

recurrence
ScheduleRecurrence

The schedule recurrence of the pipeline.

default value: None
description
str

The description of the schedule.

default value: None
pipeline_parameters
dict

A dictionary of parameters to assign new values {param name, param value}

default value: None
wait_for_provisioning
bool

Whether to wait for provisioning of the schedule to complete.

default value: False
wait_timeout
int

The number of seconds to wait before timing out.

default value: 3600
datastore
AzureBlobDatastore

The Datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported. Can not use with a Recurrence.

default value: None
polling_interval
int

How long, in minutes, between polling for modified/added blobs. Default is 5 minutes. Only supported for DataStore schedules.

default value: 5
data_path_parameter_name
str

The name of the data path pipeline parameter to set with the changed blob path. Only supported for DataStore schedules.

default value: None
continue_on_step_failure
bool

Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline.

default value: None
path_on_datastore
str

Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules.

default value: None
_workflow_provider
_AevaWorkflowProvider object

The workflow provider.

default value: None
_service_endpoint
str

The service endpoint.

default value: None

Returns

The created schedule.

Return type

disable(wait_for_provisioning=False, wait_timeout=3600)

Set the schedule to 'Disabled' and unavailable to run.

disable(wait_for_provisioning=False, wait_timeout=3600)

Parameters

wait_for_provisioning
bool

Whether to wait for provisioning of the schedule to complete.

default value: False
wait_timeout
int

Number of seconds to wait before timing out.

default value: 3600

enable(wait_for_provisioning=False, wait_timeout=3600)

Set the schedule to 'Active' and available to run.

enable(wait_for_provisioning=False, wait_timeout=3600)

Parameters

wait_for_provisioning
bool

Whether to wait for provisioning of the schedule to complete.

default value: False
wait_timeout
int

Number of seconds to wait before timing out.

default value: 3600

get(workspace, id, _workflow_provider=None, _service_endpoint=None)

Get the schedule with the given id.

get(workspace, id, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace the schedule was created on.

id
str

Id of the schedule.

_workflow_provider
_AevaWorkflowProvider object

The workflow provider.

default value: None
_service_endpoint
str

The service endpoint.

default value: None

Returns

Schedule object

Return type

get_all(workspace, active_only=True, pipeline_id=None, _workflow_provider=None, _service_endpoint=None)

Get all schedules in the current workspace.

DEPRECATED: This method is being deprecated in favor of the list(workspace, active_only=True, pipeline_id=None, _workflow_provider=None, _service_endpoint=None) method.

get_all(workspace, active_only=True, pipeline_id=None, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

active_only
bool

If true, only return schedules which are currently active. Only applies if no pipeline id is provided.

default value: True
pipeline_id
str

If provided, only return schedules for the pipeline with the given id.

default value: None
_workflow_provider
_AevaWorkflowProvider object

The workflow provider.

default value: None
_service_endpoint
str

The service endpoint.

default value: None

Returns

A list of Schedule.

Return type

get_last_pipeline_run()

Fetch the last pipeline run submitted by the schedule. Returns None if no runs have been submitted.

get_last_pipeline_run()

Returns

The last pipeline run

Return type

get_pipeline_runs()

Fetch the pipeline runs that were generated from the schedule.

get_pipeline_runs()

Returns

A list of PipelineRun.

Return type

get_schedules_for_pipeline_id(workspace, pipeline_id, _workflow_provider=None, _service_endpoint=None)

Get all schedules for the given pipeline id.

get_schedules_for_pipeline_id(workspace, pipeline_id, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

pipeline_id
str

The pipeline id.

_workflow_provider
_AevaWorkflowProvider object

The workflow provider.

default value: None
_service_endpoint
str

The service endpoint.

default value: None

Returns

A list of Schedule.

Return type

list(workspace, active_only=True, pipeline_id=None, _workflow_provider=None, _service_endpoint=None)

Get all schedules in the current workspace.

list(workspace, active_only=True, pipeline_id=None, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

active_only
bool

If true, only return schedules which are currently active. Only applies if no pipeline id is provided.

default value: True
pipeline_id
str

If provided, only return schedules for the pipeline with the given id.

default value: None
_workflow_provider
_AevaWorkflowProvider object

The workflow provider.

default value: None
_service_endpoint
str

The service endpoint.

default value: None

Returns

A list of Schedule.

Return type

load_yaml(workspace, filename, _workflow_provider=None, _service_endpoint=None)

Load and read the Yaml file to get schedule parameters.

Yaml file is one more way to pass Schedule parameters to create schedule.

load_yaml(workspace, filename, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

filename
str

The yaml filename with location.

_workflow_provider
_AevaWorkflowProvider object

The workflow provider.

default value: None
_service_endpoint
str

The service endpoint.

default value: None

Returns

A dictionary of Schedule parameters and values.

Return type

Remarks

Two types of yaml are supported for Schedules. The first reads and loads recurrence info for schedule create to trigger pipeline. The second reads and loads datastore info for schedule create to trigger pipeline.

Example to create a Schedule which will submit a Pipeline on a recurrence, as follows:


   from azureml.pipeline.core import Schedule

   schedule_info = Schedule.load_yaml(workspace=workspace,
                                      filename='./yaml/test_schedule_with_recurrence.yaml')
   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
                              experiment_name="helloworld", recurrence=schedule_info.get("recurrence"),
                              description=schedule_info.get("description"))

Sample Yaml file test_schedule_with_recurrence.yaml:


   Schedule:
       description: "Test create with recurrence"
       recurrence:
           frequency: Week # Can be "Minute", "Hour", "Day", "Week", or "Month".
           interval: 1 # how often fires
           start_time: 2019-06-07T10:50:00
           time_zone: UTC
           hours:
           - 1
           minutes:
           - 0
           time_of_day: null
           week_days:
           - Friday
       pipeline_parameters: {'a':1}
       wait_for_provisioning: True
       wait_timeout: 3600
       datastore_name: ~
       polling_interval: ~
       data_path_parameter_name: ~
       continue_on_step_failure: None
       path_on_datastore: ~

Example to create a Schedule which will submit a Pipeline on a datastore, as follows:


   from azureml.pipeline.core import Schedule

   schedule_info = Schedule.load_yaml(workspace=workspace,
                                      filename='./yaml/test_schedule_with_datastore.yaml')
   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
                              experiment_name="helloworld",datastore=schedule_info.get("datastore_name"),
                              polling_interval=schedule_info.get("polling_interval"),
                              data_path_parameter_name=schedule_info.get("data_path_parameter_name"),
                              continue_on_step_failure=schedule_info.get("continue_on_step_failure"),
                              path_on_datastore=schedule_info.get("path_on_datastore"))

update(name=None, description=None, recurrence=None, pipeline_parameters=None, status=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=None, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None)

Update the schedule.

update(name=None, description=None, recurrence=None, pipeline_parameters=None, status=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=None, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None)

Parameters

name
str

The new name of the Schedule.

default value: None
recurrence
ScheduleRecurrence

The new schedule recurrence of the pipeline.

default value: None
description
str

The new description of the schedule.

default value: None
pipeline_parameters
dict

A dictionary of parameters to assign new values {param name, param value}.

default value: None
status
str

The new status of the schedule: 'Active' or 'Disabled'.

default value: None
wait_for_provisioning
bool

Whether to wait for provisioning of the schedule to complete.

default value: False
wait_timeout
int

The number of seconds to wait before timing out.

default value: 3600
datastore
AzureBlobDatastore

The Datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported.

default value: None
polling_interval
int

How long, in minutes, between polling for modified/added blobs. Default is 5 minutes.

default value: None
data_path_parameter_name
str

The name of the data path pipeline parameter to set with the changed blob path.

default value: None
continue_on_step_failure
bool

Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline.

default value: None
path_on_datastore
str

Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules.

default value: None

Attributes

continue_on_step_failure

Get the value of the continue_on_step_failure setting.

Returns

The value of the continue_on_step_failure setting

Return type

data_path_parameter_name

Get the name of the data path pipeline parameter to set with the changed blob path.

Returns

The data path parameter name.

Return type

str

datastore_name

Get the name of the Datastore used for the schedule.

Returns

The Datastore name.

Return type

str

description

Get the description of the schedule.

Returns

The description of the schedule.

Return type

str

id

Get the ID for the schedule.

Returns

The ID.

Return type

str

name

Get the name of the schedule.

Returns

The name.

Return type

str

path_on_datastore

Get the path on the datastore that the schedule monitors.

Returns

The path on datastore.

Return type

str

pipeline_id

Get the ID of the pipeline the schedule submits.

Returns

The ID.

Return type

str

polling_interval

Get how long, in minutes, between polling for modified/added blobs.

Returns

The polling interval.

Return type

int

recurrence

Get the schedule recurrence.

Returns

The schedule recurrence.

Return type

status

Get the status of the schedule.

Returns

The status of the schedule.

Return type

str