Schedule Class

Defines a schedule on which to submit a pipeline.

Once a Pipeline is published, a Schedule can be used to submit the Pipeline at a specified interval or when changes to a Blob storage location are detected.

Inheritance
builtins.object
Schedule

Constructor

Schedule(workspace, id, name, description, pipeline_id, status, recurrence, datastore_name, polling_interval, data_path_parameter_name, continue_on_step_failure, path_on_datastore, _schedule_provider=None, pipeline_endpoint_id=None)

Parameters

workspace
Workspace

The workspace object this Schedule will belong to.

id
str

The ID of the Schedule.

name
str

The name of the Schedule.

description
str

The description of the schedule.

pipeline_id
str

The ID of the pipeline the schedule will submit.

status
str

The status of the schedule, either 'Active' or 'Disabled'.

recurrence
ScheduleRecurrence

The schedule recurrence for the pipeline.

datastore_name
str

The name of the datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported.

polling_interval
int

How long, in minutes, between polling for modified/added blobs.

data_path_parameter_name
str

The name of the data path pipeline parameter to set with the changed blob path.

continue_on_step_failure
bool

Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline.

path_on_datastore
str

Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules.

_schedule_provider
<xref:azureml.pipeline.core._aeva_provider._AevaScheduleProvider>

The schedule provider.

Remarks

Two types of schedules are supported. The first uses time recurrence to submit a Pipeline on a given schedule. The second monitors an AzureBlobDatastore for added or modified blobs and submits a Pipeline when changes are detected.

To create a Schedule which will submit a Pipeline on a recurring schedule, use the ScheduleRecurrence when creating the Schedule.

A ScheduleRecurrence is used when creating a Schedule for a Pipeline as follows:


   from azureml.pipeline.core import Schedule, ScheduleRecurrence

   recurrence = ScheduleRecurrence(frequency="Hour", interval=12)
   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
                              experiment_name="helloworld", recurrence=recurrence)

This Schedule will submit the provided PublishedPipeline every 12 hours. The submitted Pipeline will be created under the Experiment with the name "helloworld".

To create a Schedule which will trigger PipelineRuns on modifications to a Blob storage location, specify a Datastore and related data info when creating the Schedule.


   from azureml.pipeline.core import Schedule
   from azureml.core.datastore import Datastore

   datastore = Datastore(workspace=ws, name="workspaceblobstore")

   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id"
                              experiment_name="helloworld", datastore=datastore,
                              polling_interval=5, path_on_datastore="file/path")

Note that the polling_interval and path_on_datastore parameters are optional. The polling_interval specifies how often to poll for modifications to the Datastore, and by default is 5 minutes. path_on_datastore can be used to specify which folder on the Datastore to monitor for changes. If None, the Datastore container is monitored. Note: blob additions/modifications in sub-folders of the path_on_datastore or the Datastore container (if no path_on_datastore is specified) are not detected.

Additionally, if the Pipeline was constructed to use a DataPath PipelineParameter to describe a step input, use the data_path_parameter_name parameter when creating a Datastore-triggered Schedule to set the input to the changed file when a PipelineRun is submitted by the Schedule.

In the following example, when the Schedule triggers the PipelineRun, the value of the "input_data" PipelineParameter will be set as the file which was modified/added:


   from azureml.pipeline.core import Schedule
   from azureml.core.datastore import Datastore

   datastore = Datastore(workspace=ws, name="workspaceblobstore")

   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
                              experiment_name="helloworld", datastore=datastore,
                              data_path_parameter_name="input_data")

For more information on Schedules, see: https://aka.ms/pl-schedule.

Methods

create

Create a schedule for a pipeline.

Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions.

create_for_pipeline_endpoint

Create a schedule for a pipeline endpoint.

Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions.

disable

Set the schedule to 'Disabled' and unavailable to run.

enable

Set the schedule to 'Active' and available to run.

get

Get the schedule with the given ID.

get_all

Get all schedules in the current workspace.

DEPRECATED: This method is being deprecated in favor of the list method.

get_last_pipeline_run

Fetch the last pipeline run submitted by the schedule. Returns None if no runs have been submitted.

get_pipeline_runs

Fetch the pipeline runs that were generated from the schedule.

get_schedules_for_pipeline_endpoint_id

Get all schedules for the given pipeline endpoint id.

get_schedules_for_pipeline_id

Get all schedules for the given pipeline id.

list

Get all schedules in the current workspace.

load_yaml

Load and read the YAML file to get schedule parameters.

YAML file is one more way to pass Schedule parameters to create schedule.

update

Update the schedule.

create

Create a schedule for a pipeline.

Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions.

create(workspace, name, pipeline_id, experiment_name, recurrence=None, description=None, pipeline_parameters=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=5, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace object this Schedule will belong to.

name
str

The name of the Schedule.

pipeline_id
str

The ID of the pipeline the schedule will submit.

experiment_name
str

The name of the experiment the schedule will submit runs on.

recurrence
ScheduleRecurrence
default value: None

The schedule recurrence of the pipeline.

description
str
default value: None

The description of the schedule.

pipeline_parameters
dict
default value: None

A dictionary of parameters to assign new values {param name, param value}

wait_for_provisioning
bool
default value: False

Whether to wait for provisioning of the schedule to complete.

wait_timeout
int
default value: 3600

The number of seconds to wait before timing out.

datastore
AzureBlobDatastore
default value: None

The Datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported. Can not use with a Recurrence.

polling_interval
int
default value: 5

How long, in minutes, between polling for modified/added blobs. Default is 5 minutes. Only supported for DataStore schedules.

data_path_parameter_name
str
default value: None

The name of the data path pipeline parameter to set with the changed blob path. Only supported for DataStore schedules.

continue_on_step_failure
bool
default value: None

Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline.

path_on_datastore
str
default value: None

Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint.

Returns

The created schedule.

Return type

create_for_pipeline_endpoint

Create a schedule for a pipeline endpoint.

Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions.

create_for_pipeline_endpoint(workspace, name, pipeline_endpoint_id, experiment_name, recurrence=None, description=None, pipeline_parameters=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=5, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace object this Schedule will belong to.

name
str

The name of the Schedule.

pipeline_endpoint_id
str

The ID of the pipeline endpoint the schedule will submit.

experiment_name
str

The name of the experiment the schedule will submit runs on.

recurrence
ScheduleRecurrence
default value: None

The schedule recurrence of the pipeline.

description
str
default value: None

The description of the schedule.

pipeline_parameters
dict
default value: None

A dictionary of parameters to assign new values {param name, param value}

wait_for_provisioning
bool
default value: False

Whether to wait for provisioning of the schedule to complete.

wait_timeout
int
default value: 3600

The number of seconds to wait before timing out.

datastore
AzureBlobDatastore
default value: None

The Datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported. Can not use with a Recurrence.

polling_interval
int
default value: 5

How long, in minutes, between polling for modified/added blobs. Default is 5 minutes. Only supported for DataStore schedules.

data_path_parameter_name
str
default value: None

The name of the data path pipeline parameter to set with the changed blob path. Only supported for DataStore schedules.

continue_on_step_failure
bool
default value: None

Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline.

path_on_datastore
str
default value: None

Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint.

Returns

The created schedule.

Return type

disable

Set the schedule to 'Disabled' and unavailable to run.

disable(wait_for_provisioning=False, wait_timeout=3600)

Parameters

wait_for_provisioning
bool
default value: False

Whether to wait for provisioning of the schedule to complete.

wait_timeout
int
default value: 3600

Number of seconds to wait before timing out.

enable

Set the schedule to 'Active' and available to run.

enable(wait_for_provisioning=False, wait_timeout=3600)

Parameters

wait_for_provisioning
bool
default value: False

Whether to wait for provisioning of the schedule to complete.

wait_timeout
int
default value: 3600

Number of seconds to wait before timing out.

get

Get the schedule with the given ID.

get(workspace, id, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace the schedule was created on.

id
str

ID of the schedule.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint.

Returns

Schedule object

Return type

get_all

Get all schedules in the current workspace.

DEPRECATED: This method is being deprecated in favor of the list method.

get_all(workspace, active_only=True, pipeline_id=None, pipeline_endpoint_id=None, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

active_only
bool
default value: True

If true, only return schedules which are currently active. Only applies if no pipeline id is provided.

pipeline_id
str
default value: None

If provided, only return schedules for the pipeline with the given id.

pipeline_endpoint_id
str
default value: None

If provided, only return schedules for the pipeline endpoint with the given id.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint.

Returns

A list of Schedule.

Return type

get_last_pipeline_run

Fetch the last pipeline run submitted by the schedule. Returns None if no runs have been submitted.

get_last_pipeline_run()

Returns

The last pipeline run.

Return type

get_pipeline_runs

Fetch the pipeline runs that were generated from the schedule.

get_pipeline_runs()

Returns

A list of PipelineRun.

Return type

get_schedules_for_pipeline_endpoint_id

Get all schedules for the given pipeline endpoint id.

get_schedules_for_pipeline_endpoint_id(workspace, pipeline_endpoint_id, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

pipeline_endpoint_id
str

The pipeline endpoint id.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint.

Returns

A list of Schedule.

Return type

get_schedules_for_pipeline_id

Get all schedules for the given pipeline id.

get_schedules_for_pipeline_id(workspace, pipeline_id, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

pipeline_id
str

The pipeline id.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint.

Returns

A list of Schedule.

Return type

list

Get all schedules in the current workspace.

list(workspace, active_only=True, pipeline_id=None, pipeline_endpoint_id=None, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

active_only
bool
default value: True

If true, only return schedules which are currently active. Only applies if no pipeline id is provided.

pipeline_id
str
default value: None

If provided, only return schedules for the pipeline with the given id.

pipeline_endpoint_id
str
default value: None

If provided, only return schedules for the pipeline endpoint with the given id.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint.

Returns

A list of Schedule.

Return type

load_yaml

Load and read the YAML file to get schedule parameters.

YAML file is one more way to pass Schedule parameters to create schedule.

load_yaml(workspace, filename, _workflow_provider=None, _service_endpoint=None)

Parameters

workspace
Workspace

The workspace.

filename
str

The YAML filename with location.

_workflow_provider
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
default value: None

The workflow provider.

_service_endpoint
str
default value: None

The service endpoint.

Returns

A dictionary of Schedule parameters and values.

Return type

Remarks

Two types of YAML are supported for Schedules. The first reads and loads recurrence info for schedule create to trigger pipeline. The second reads and loads datastore info for schedule create to trigger pipeline.

Example to create a Schedule which will submit a Pipeline on a recurrence, as follows:


   from azureml.pipeline.core import Schedule

   schedule_info = Schedule.load_yaml(workspace=workspace,
                                      filename='./yaml/test_schedule_with_recurrence.yaml')
   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
                              experiment_name="helloworld", recurrence=schedule_info.get("recurrence"),
                              description=schedule_info.get("description"))

Sample YAML file test_schedule_with_recurrence.yaml:


   Schedule:
       description: "Test create with recurrence"
       recurrence:
           frequency: Week # Can be "Minute", "Hour", "Day", "Week", or "Month".
           interval: 1 # how often fires
           start_time: 2019-06-07T10:50:00
           time_zone: UTC
           hours:
           - 1
           minutes:
           - 0
           time_of_day: null
           week_days:
           - Friday
       pipeline_parameters: {'a':1}
       wait_for_provisioning: True
       wait_timeout: 3600
       datastore_name: ~
       polling_interval: ~
       data_path_parameter_name: ~
       continue_on_step_failure: None
       path_on_datastore: ~

Example to create a Schedule which will submit a Pipeline on a datastore, as follows:


   from azureml.pipeline.core import Schedule

   schedule_info = Schedule.load_yaml(workspace=workspace,
                                      filename='./yaml/test_schedule_with_datastore.yaml')
   schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
                              experiment_name="helloworld",datastore=schedule_info.get("datastore_name"),
                              polling_interval=schedule_info.get("polling_interval"),
                              data_path_parameter_name=schedule_info.get("data_path_parameter_name"),
                              continue_on_step_failure=schedule_info.get("continue_on_step_failure"),
                              path_on_datastore=schedule_info.get("path_on_datastore"))

update

Update the schedule.

update(name=None, description=None, recurrence=None, pipeline_parameters=None, status=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=None, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None)

Parameters

name
str
default value: None

The new name of the Schedule.

recurrence
ScheduleRecurrence
default value: None

The new schedule recurrence of the pipeline.

description
str
default value: None

The new description of the schedule.

pipeline_parameters
dict
default value: None

A dictionary of parameters to assign new values {param name, param value}.

status
str
default value: None

The new status of the schedule: 'Active' or 'Disabled'.

wait_for_provisioning
bool
default value: False

Whether to wait for provisioning of the schedule to complete.

wait_timeout
int
default value: 3600

The number of seconds to wait before timing out.

datastore
AzureBlobDatastore
default value: None

The Datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported.

polling_interval
int
default value: None

How long, in minutes, between polling for modified/added blobs. Default is 5 minutes.

data_path_parameter_name
str
default value: None

The name of the data path pipeline parameter to set with the changed blob path.

continue_on_step_failure
bool
default value: None

Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline.

path_on_datastore
str
default value: None

Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules.

Attributes

continue_on_step_failure

Get the value of the continue_on_step_failure setting.

Returns

The value of the continue_on_step_failure setting

Return type

data_path_parameter_name

Get the name of the data path pipeline parameter to set with the changed blob path.

Returns

The data path parameter name.

Return type

str

datastore_name

Get the name of the Datastore used for the schedule.

Returns

The Datastore name.

Return type

str

description

Get the description of the schedule.

Returns

The description of the schedule.

Return type

str

id

Get the ID for the schedule.

Returns

The ID.

Return type

str

name

Get the name of the schedule.

Returns

The name.

Return type

str

path_on_datastore

Get the path on the datastore that the schedule monitors.

Returns

The path on datastore.

Return type

str

pipeline_endpoint_id

Get the ID of the pipeline endpoint the schedule submits.

Returns

The ID.

Return type

str

pipeline_id

Get the ID of the pipeline the schedule submits.

Returns

The ID.

Return type

str

polling_interval

Get how long, in minutes, between polling for modified/added blobs.

Returns

The polling interval.

Return type

int

recurrence

Get the schedule recurrence.

Returns

The schedule recurrence.

Return type

status

Get the status of the schedule.

Returns

The status of the schedule.

Return type

str