您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

数据工厂计划和执行Data Factory scheduling and execution

备注

本文适用于数据工厂版本 1。This article applies to version 1 of Data Factory. 如果正在使用当前版本数据工厂服务,请参阅管道执行和触发器一文。If you are using the current version of the Data Factory service, see pipeline execution and triggers article.

此文介绍 Azure 数据工厂应用程序模型的计划和执行方面。This article explains the scheduling and execution aspects of the Azure Data Factory application model. 本文假定用户了解数据工厂应用程序模型概念的基础知识(包括活动、管道、链接服务和数据集)。This article assumes that you understand basics of Data Factory application model concepts, including activity, pipelines, linked services, and datasets. 有关的 Azure 数据工厂的基本概念,请参阅以下文章:For basic concepts of Azure Data Factory, see the following articles:

管道的开始和结束时间Start and end times of pipeline

管道仅在其 开始 时间和 结束 时间之间处于活动状态。A pipeline is active only between its start time and end time. 开始时间之前或结束时间之后,不会执行管道。It is not executed before the start time or after the end time. 如果暂停管道,则无论开始和结束时间,都不会执行管道。If the pipeline is paused, it is not executed irrespective of its start and end time. 不暂停才可运行管道。For a pipeline to run, it should not be paused. 可以在管道定义中找到这些设置(开始、结束、暂停):You find these settings (start, end, paused) in the pipeline definition:

"start": "2017-04-01T08:00:00Z",
"end": "2017-04-01T11:00:00Z"
"isPaused": false

有关这些属性的详细信息,请参阅创建管道一文。For more information these properties, see create pipelines article.

为活动指定计划Specify schedule for an activity

执行的不是管道。It is not the pipeline that is executed. 它是在管道的总体上下文中执行的管道中的活动。It is the activities in the pipeline that are executed in the overall context of the pipeline. 可以使用活动 JSON 的 计划程序 部分指定活动的定期计划。You can specify a recurring schedule for an activity by using the scheduler section of activity JSON. 例如,可将活动计划为每小时运行一次,如下所示:For example, you can schedule an activity to run hourly as follows:

"scheduler": {
    "frequency": "Hour",
    "interval": 1
},

如下图中所示,指定为活动计划创建一系列翻转与窗口在管道开始和结束时间。As shown in the following diagram, specifying a schedule for an activity creates a series of tumbling windows with in the pipeline start and end times. 翻转时段是一系列固定大小、非重叠、连续的时间间隔。Tumbling windows are a series of fixed-size non-overlapping, contiguous time intervals. 活动的这些逻辑翻转时段称为“活动时段”。These logical tumbling windows for an activity are called activity windows.

活动计划程序示例

活动的 计划程序 属性是可选的。The scheduler property for an activity is optional. 如果要指定一个属性,它必须与在输出数据集定义中指定的频率匹配。If you do specify this property, it must match the cadence you specify in the definition of output dataset for the activity. 目前,输出数据集驱动计划。Currently, output dataset is what drives the schedule. 因此,必须创建一个输出数据集,即使活动不会生成任何输出。Therefore, you must create an output dataset even if the activity does not produce any output.

为数据集指定计划Specify schedule for a dataset

数据工厂管道中的每个活动可获取零个或多个输入 数据集,并生成一个或多个输出数据集。An activity in a Data Factory pipeline can take zero or more input datasets and produce one or more output datasets. 对于活动,可以指定从该处有可用的输入的数据或使用生成的输出数据的频率 可用性 的数据集定义中的部分。For an activity, you can specify the cadence at which the input data is available or the output data is produced by using the availability section in the dataset definitions.

可用性 部分中的 频率 指定时间单位。Frequency in the availability section specifies the time unit. 频率的允许值为:分钟、小时、天、周和月。The allowed values for frequency are: Minute, Hour, Day, Week, and Month. 可用性部分中的 间隔 属性指定频率的乘数。The interval property in the availability section specifies a multiplier for frequency. 例如:如果频率设置为“天”且间隔设置为 1 的输出数据集,每天生成输出数据。For example: if the frequency is set to Day and interval is set to 1 for an output dataset, the output data is produced daily. 如果将频率指定为分钟,建议将间隔设置为小于 15 的值。If you specify the frequency as minute, we recommend that you set the interval to no less than 15.

在下面的示例中,输入有可用的数据每小时和每小时生成一次输出数据 ("frequency": "Hour", "interval": 1)。In the following example, the input data is available hourly and the output data is produced hourly ("frequency": "Hour", "interval": 1).

输入数据集:Input dataset:

{
    "name": "AzureSqlInput",
    "properties": {
        "published": false,
        "type": "AzureSqlTable",
        "linkedServiceName": "AzureSqlLinkedService",
        "typeProperties": {
            "tableName": "MyTable"
        },
        "availability": {
            "frequency": "Hour",
            "interval": 1
        },
        "external": true,
        "policy": {}
    }
}

输出数据集Output dataset

{
    "name": "AzureBlobOutput",
    "properties": {
        "published": false,
        "type": "AzureBlob",
        "linkedServiceName": "StorageLinkedService",
        "typeProperties": {
            "folderPath": "mypath/{Year}/{Month}/{Day}/{Hour}",
            "format": {
                "type": "TextFormat"
            },
            "partitionedBy": [
                { "name": "Year", "value": { "type": "DateTime", "date": "SliceStart", "format": "yyyy" } },
                { "name": "Month", "value": { "type": "DateTime", "date": "SliceStart", "format": "MM" } },
                { "name": "Day", "value": { "type": "DateTime", "date": "SliceStart", "format": "dd" } },
                { "name": "Hour", "value": { "type": "DateTime", "date": "SliceStart", "format": "HH" }}
            ]
        },
        "availability": {
            "frequency": "Hour",
            "interval": 1
        }
    }
}

目前,输出数据集驱动计划Currently, output dataset drives the schedule. 换而言之,使用指定的输出数据集的计划在运行时运行的活动。In other words, the schedule specified for the output dataset is used to run an activity at runtime. 因此,必须创建一个输出数据集,即使活动不会生成任何输出。Therefore, you must create an output dataset even if the activity does not produce any output. 如果活动没有任何输入,可以跳过创建输入数据集。If the activity doesn't take any input, you can skip creating the input dataset.

在下面的管道定义中,计划程序 属性用于指定活动的计划。In the following pipeline definition, the scheduler property is used to specify schedule for the activity. 此属性是可选的。This property is optional. 目前,该活动的计划必须匹配指定的输出数据集的计划。Currently, the schedule for the activity must match the schedule specified for the output dataset.

{
    "name": "SamplePipeline",
    "properties": {
        "description": "copy activity",
        "activities": [
            {
                "type": "Copy",
                "name": "AzureSQLtoBlob",
                "description": "copy activity",
                "typeProperties": {
                    "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "$$Text.Format('select * from MyTable where timestampcolumn >= \\'{0:yyyy-MM-dd HH:mm}\\' AND timestampcolumn < \\'{1:yyyy-MM-dd HH:mm}\\'', WindowStart, WindowEnd)"
                    },
                    "sink": {
                        "type": "BlobSink",
                        "writeBatchSize": 100000,
                        "writeBatchTimeout": "00:05:00"
                    }
                },
                "inputs": [
                    {
                        "name": "AzureSQLInput"
                    }
                ],
                "outputs": [
                    {
                        "name": "AzureBlobOutput"
                    }
                ],
                "scheduler": {
                    "frequency": "Hour",
                    "interval": 1
                }
            }
        ],
        "start": "2017-04-01T08:00:00Z",
        "end": "2017-04-01T11:00:00Z"
    }
}

在此示例中,活动运行每小时之间的管道的开始和结束时间。In this example, the activity runs hourly between the start and end times of the pipeline. 将在三个小时的时间范围内(上午 8-9 点、上午 9-10 点和上午 10-11 点),每小时生成输出数据。The output data is produced hourly for three-hour windows (8 AM - 9 AM, 9 AM - 10 AM, and 10 AM - 11 AM).

活动运行使用或生成的每个数据单元称为 数据切片Each unit of data consumed or produced by an activity run is called a data slice. 下图显示了具有 1 个输入数据集和 1 个输出数据集的活动示例:The following diagram shows an example of an activity with one input dataset and one output dataset:

可用性计划程序

上图中显示了输入和输出数据集的每小时数据切片。The diagram shows the hourly data slices for the input and output dataset. 图中还显示已准备好进行处理的三个输入切片。The diagram shows three input slices that are ready for processing. “上午 10-11 点”活动正在进行,生成“上午 10-11 点”输出切片。The 10-11 AM activity is in progress, producing the 10-11 AM output slice.

可通过变量 SliceStartSliceEnd 访问与正在数据集 JSON 中生成的当前切片关联的时间间隔。You can access the time interval associated with the current slice in the dataset JSON by using variables: SliceStart and SliceEnd. 同样,可以使用 WindowStart 和 WindowEnd 访问与活动窗口关联的时间间隔。Similarly, you can access the time interval associated with an activity window by using the WindowStart and WindowEnd. 活动的计划必须匹配活动的输出数据集的计划。The schedule of an activity must match the schedule of the output dataset for the activity. 因此,SliceStart 和 SliceEnd 值分别与 WindowStart 和 WindowEnd 值相同。Therefore, the SliceStart and SliceEnd values are the same as WindowStart and WindowEnd values respectively. 有关这些变量的详细信息,请参阅数据工厂函数和系统变量一文。For more information on these variables, see Data Factory functions and system variables articles.

可在活动 JSON 中将这些变量用于不同目的。You can use these variables for different purposes in your activity JSON. 例如,可用于从表示时序数据的输入和输出数据集中选择数据(例如:上午 8-9 点)。For example, you can use them to select data from input and output datasets representing time series data (for example: 8 AM to 9 AM). 此示例还使用 WindowStartWindowEnd 选择活动运行的相关数据,并使用相应 folderPath 将其复制到 blob。This example also uses WindowStart and WindowEnd to select relevant data for an activity run and copy it to a blob with the appropriate folderPath. FolderPath 参数化为每小时具有一个单独文件夹。The folderPath is parameterized to have a separate folder for every hour.

在前面的示例中,计划指定为输入和输出数据集是相同的(每小时)。In the preceding example, the schedule specified for input and output datasets is the same (hourly). 如果活动的输入数据集位于不同的频率,假设每隔 15 分钟,则生成此输出数据集的活动都将仍运行每小时一次,如下所输出数据集是什么驱动器活动计划。If the input dataset for the activity is available at a different frequency, say every 15 minutes, the activity that produces this output dataset still runs once an hour as the output dataset is what drives the activity schedule. 有关详细信息,请参阅具有不同频率的模型数据集For more information, see Model datasets with different frequencies.

数据集可用性和策略Dataset availability and policies

在数据集定义的可用性部分中,已了解频率和间隔属性。You have seen the usage of frequency and interval properties in the availability section of dataset definition. 有几个其他属性会影响活动的计划和执行。There are a few other properties that affect the scheduling and execution of an activity.

数据集可用性Dataset availability

下表描述了可在 可用性 部分中使用的属性:The following table describes properties you can use in the availability section:

propertiesProperty 说明Description 必须Required 默认Default
频率frequency 指定数据集切片生成的时间单位。Specifies the time unit for dataset slice production.

支持的频率:Minute、Hour、Day、Week、MonthSupported frequency: Minute, Hour, Day, Week, Month
Yes 不可用NA
intervalinterval 指定频率的乘数Specifies a multiplier for frequency

“频率 x 间隔”确定生成切片的频率。”Frequency x interval” determines how often the slice is produced.

若需要数据集每小时生成切片,则将“频率”设置为“小时”,“间隔”设置为“1”If you need the dataset to be sliced on an hourly basis, you set Frequency to Hour, and interval to 1.

注意:如果将 Frequency 指定为 Minute,建议将 interval 设置为不小于15Note: If you specify Frequency as Minute, we recommend that you set the interval to no less than 15
Yes 不可用NA
样式style 指定是否应在间隔的开头/结尾生成切片。Specifies whether the slice should be produced at the start/end of the interval.
  • StartOfIntervalStartOfInterval
  • EndOfIntervalEndOfInterval


若将 Month 设置为 Month,style 设置为 EndOfInterval,则会在每月的最后一天生成切片。If Frequency is set to Month and style is set to EndOfInterval, the slice is produced on the last day of month. 若将 style 设为 StartOfInterval,会在每月的第一天生成切片。If the style is set to StartOfInterval, the slice is produced on the first day of month.

若将 Frequency 设置为 Day,style 设置为 EndOfInterval,则会在一天的最后一小时生成切片。If Frequency is set to Day and style is set to EndOfInterval, the slice is produced in the last hour of the day.

若将 Frequency 设置为 Hour,style 设置为 EndOfInterval,则会在一小时结束时生成切片。If Frequency is set to Hour and style is set to EndOfInterval, the slice is produced at the end of the hour. 例如,对于下午 1 点到下午 2 点期间的切片,则在下午 2 点生成切片。For example, for a slice for 1 PM – 2 PM period, the slice is produced at 2 PM.
No EndOfIntervalEndOfInterval
anchorDateTimeanchorDateTime 定义计划程序用于计算数据集切片边界的时间中的绝对位置。Defines the absolute position in time used by scheduler to compute dataset slice boundaries.

注意:如果 AnchorDateTime 的日期部分比频率更精细,则忽略更精细的部分。Note: If the AnchorDateTime has date parts that are more granular than the frequency then the more granular parts are ignored.

例如,如果“interval”是“每小时”(frequency: hour 且 interval: 1),而 AnchorDateTime 包含分钟和秒,则将忽略 AnchorDateTime 的分钟和秒部分。For example, if the interval is hourly (frequency: hour and interval: 1) and the AnchorDateTime contains minutes and seconds, then the minutes and seconds parts of the AnchorDateTime are ignored.
No 01/01/000101/01/0001
offsetoffset 所有数据集切片的开始和结束之间偏移的时间跨度。Timespan by which the start and end of all dataset slices are shifted.

注意:如果同时指定了 anchorDateTime 和 offset,则结果是合并移位。Note: If both anchorDateTime and offset are specified, the result is the combined shift.
No NANA

偏移示例offset example

默认情况下,每天 ("frequency": "Day", "interval": 1) 在 UTC 时间晚上 12 点(午夜)开始切片。By default, daily ("frequency": "Day", "interval": 1) slices start at 12 AM UTC time (midnight). 要将开始时间改为 UTC 时间早上 6 点,请按以下片段中所示设置偏移量:If you want the start time to be 6 AM UTC time instead, set the offset as shown in the following snippet:

"availability":
{
    "frequency": "Day",
    "interval": 1,
    "offset": "06:00:00"
}

anchorDateTime 示例anchorDateTime example

下例中,每 23 小时生成一次数据集。In the following example, the dataset is produced once every 23 hours. 第一个切片在 anchorDateTime 指定的时间启动,该时间设置为 2017-04-19T08:00:00(UTC 时间)。The first slice starts at the time specified by the anchorDateTime, which is set to 2017-04-19T08:00:00 (UTC time).

"availability":    
{    
    "frequency": "Hour",        
    "interval": 23,    
    "anchorDateTime":"2017-04-19T08:00:00"    
}

偏移/样式示例offset/style Example

以下数据集是每月数据集,在每月第三天上午 8:00 (3.08:00:00) 生成:The following dataset is a monthly dataset and is produced on 3rd of every month at 8:00 AM (3.08:00:00):

"availability": {
    "frequency": "Month",
    "interval": 1,
    "offset": "3.08:00:00", 
    "style": "StartOfInterval"
}

数据集策略Dataset policy

数据集可以具有定义的验证策略,该策略指定切片执行生成的数据在准备好进行使用之前应如何验证。A dataset can have a validation policy defined that specifies how the data generated by a slice execution can be validated before it is ready for consumption. 在这种情况下,切片执行完成后,输出切片状态将变为“等待”且子状态为“验证”。In such cases, after the slice has finished execution, the output slice status is changed to Waiting with a substatus of Validation. 切片验证后,切片状态将更改为“就绪”。After the slices are validated, the slice status changes to Ready. 如果数据切片已生成但没有通过验证,因此将不会处理依赖于此切片的下游切片的活动运行。If a data slice has been produced but did not pass the validation, activity runs for downstream slices that depend on this slice are not processed. 监视和管理管道介绍数据工厂中的数据切片的各种状态。Monitor and manage pipelines covers the various states of data slices in Data Factory.

数据集定义中的 策略 部分定义了数据集切片必须满足的标准或条件。The policy section in dataset definition defines the criteria or the condition that the dataset slices must fulfill. 下表描述了可在 policy 节中使用的属性:The following table describes properties you can use in the policy section:

策略名称Policy Name 说明Description 适用对象Applied To 必须Required 默认Default
minimumSizeMBminimumSizeMB 验证 Azure Blob 中的数据是否满足最小大小要求(以兆字节为单位)。Validates that the data in an Azure blob meets the minimum size requirements (in megabytes). Azure BlobAzure Blob No NANA
minimumRowsminimumRows 验证 AZURE SQL 数据库azure 表 中的数据是否包含最小行数。Validates that the data in Azure SQL Database or Azure table contains the minimum number of rows.
  • Azure SQL 数据库Azure SQL Database
  • Azure 表Azure Table
No NANA

示例Examples

minimumSizeMB:minimumSizeMB:

"policy":

{
    "validation":
    {
        "minimumSizeMB": 10.0
    }
}

minimumRowsminimumRows

"policy":
{
    "validation":
    {
        "minimumRows": 100
    }
}

有关这些属性的详细信息和示例,请参阅创建数据集一文。For more information about these properties and examples, see Create datasets article.

活动策略Activity policies

策略会影响活动的运行时行为,尤其在处理表的切片时。Policies affect the run-time behavior of an activity, specifically when the slice of a table is processed. 下表提供详细信息。The following table provides the details.

propertiesProperty 允许的值Permitted values 默认值Default Value 说明Description
concurrencyconcurrency IntegerInteger

最大值:10Max value: 10
11 活动的并发执行次数。Number of concurrent executions of the activity.

它决定可在不同切片上发生的并行活动执行次数。It determines the number of parallel activity executions that can happen on different slices. 例如,如果活动需要完成大量可用数据,更大的并发值能加快数据处理速度。For example, if an activity needs to go through a large set of available data, having a larger concurrency value speeds up the data processing.
executionPriorityOrderexecutionPriorityOrder NewestFirstNewestFirst

OldestFirstOldestFirst
OldestFirstOldestFirst 确定正在处理的数据切片的顺序。Determines the ordering of data slices that are being processed.

例如,有两个切片(分别发生在下午 4 点和下午 5 点),且均在等待执行。For example, if you have 2 slices (one happening at 4pm, and another one at 5pm), and both are pending execution. 如果将 executionPriorityOrder 设置为 NewestFirst,则首先处理下午 5 点的切片。If you set the executionPriorityOrder to be NewestFirst, the slice at 5 PM is processed first. 同理,如果将 executionPriorityORder 设置为 OldestFIrst,则先处理下午 4 点的切片。Similarly if you set the executionPriorityORder to be OldestFIrst, then the slice at 4 PM is processed.
retryretry IntegerInteger

最大值可为 10Max value can be 10
00 将切片的数据处理标记为“失败”之前的重试次数。Number of retries before the data processing for the slice is marked as Failure. 数据切片的活动执行次数不能超过指定的重试次数。Activity execution for a data slice is retried up to the specified retry count. 出现故障后尽快重试。The retry is done as soon as possible after the failure.
timeouttimeout TimeSpanTimeSpan 00:00:0000:00:00 活动的超时。Timeout for the activity. 示例:00:10:00(表示超时 10 分钟)Example: 00:10:00 (implies timeout 10 mins)

如果不指定值或值为 0,则表示无限超时。If a value is not specified or is 0, the timeout is infinite.

如果某个切片的数据处理时间超出了超时值,将取消该处理,且系统尝试重试处理。If the data processing time on a slice exceeds the timeout value, it is canceled, and the system attempts to retry the processing. 重试次数取决于重试属性。The number of retries depends on the retry property. 发生超时时,会将状态设置为“超时”。When timeout occurs, the status is set to TimedOut.
delaydelay TimeSpanTimeSpan 00:00:0000:00:00 启动切片的数据处理前,需指定延迟。Specify the delay before data processing of the slice starts.

延迟超过预期执行时间后,启动数据切片的活动执行。The execution of activity for a data slice is started after the Delay is past the expected execution time.

示例:00:10:00(表示延迟 10 分钟)Example: 00:10:00 (implies delay of 10 mins)
longRetrylongRetry IntegerInteger

最大值:10Max value: 10
11 切片执行失败之前的长重试次数。The number of long retry attempts before the slice execution is failed.

longRetryInterval 指定尝试 longRetry 的间隔。longRetry attempts are spaced by longRetryInterval. 因此,如果需要指定重试尝试之间的时间,请使用 longRetry。So if you need to specify a time between retry attempts, use longRetry. 如果同时指定 Retry 和 longRetry,则每次 longRetry 尝试均包含 Retry 尝试,且最大尝试次数为 Retry * longRetry。If both Retry and longRetry are specified, each longRetry attempt includes Retry attempts and the max number of attempts is Retry * longRetry.

例如,如果活动策略中具有以下设置:For example, if we have the following settings in the activity policy:
Retry: 3Retry: 3
longRetry: 2longRetry: 2
longRetryInterval: 01:00:00longRetryInterval: 01:00:00

假设仅执行一个切片(状态为“等待”),且活动执行每次都失败。Assume there is only one slice to execute (status is Waiting) and the activity execution fails every time. 最初将有 3 次连续执行尝试。Initially there would be 3 consecutive execution attempts. 每次尝试后,切片状态为“重试”。After each attempt, the slice status would be Retry. 前 3 次尝试结束后,切片状态为“长重试”。After first 3 attempts are over, the slice status would be LongRetry.

1 小时(即 longRetryInteval 的值)后,开始另一组 3 次连续执行尝试。After an hour (that is, longRetryInteval’s value), there would be another set of 3 consecutive execution attempts. 之后,切片的状态变为“失败”,且不会再进行重试。After that, the slice status would be Failed and no more retries would be attempted. 因此,共进行了 6 次尝试。Hence overall 6 attempts were made.

如果某次执行成功,切片状态“就绪”,且不会再重试。If any execution succeeds, the slice status would be Ready and no more retries are attempted.

如果依赖数据在非确定性时间到达,或处理数据的总体环境难以捉摸,可以使用 longRetry。longRetry may be used in situations where dependent data arrives at non-deterministic times or the overall environment is flaky under which data processing occurs. 在这种情况下,一遍遍重试效果可能不理想,但一段时间后再重试可能会输出想要的结果。In such cases, doing retries one after another may not help and doing so after an interval of time results in the desired output.

注意:不要为 longRetry 或 longRetryInterval 设置高值。Word of caution: do not set high values for longRetry or longRetryInterval. 通常,较高值可能会引起其他系统问题。Typically, higher values imply other systemic issues.
longRetryIntervallongRetryInterval TimeSpanTimeSpan 00:00:0000:00:00 长重试尝试之间的延迟The delay between long retry attempts

有关详细信息,请参阅管道一文。For more information, see Pipelines article.

并行处理数据切片Parallel processing of data slices

可设置过去管道的开始日期。You can set the start date for the pipeline in the past. 执行此操作时,数据工厂会自动计算(回填)过去的所有数据切片,并开始处理。When you do so, Data Factory automatically calculates (back fills) all data slices in the past and begins processing them. 例如:如果创建开始日期为 2017-04-01 的管道,而当前日期是 2017-04-10。For example: if you create a pipeline with start date 2017-04-01 and the current date is 2017-04-10. 如果输出数据集的频率为每日,则数据工厂将立即开始处理所有从 2017-04-01 到 2017-04-09 的切片,因为开始日期已过去。If the cadence of the output dataset is daily, then Data Factory starts processing all the slices from 2017-04-01 to 2017-04-09 immediately because the start date is in the past. 从自 2017-04-10 未处理切片尚未因为可用性部分中的样式属性的值是 EndOfInterval 默认情况下。The slice from 2017-04-10 is not processed yet because the value of style property in the availability section is EndOfInterval by default. 处理最旧的切片 executionPriorityOrder 值是 OldestFirst 首先为默认值。The oldest slice is processed first as the default value of executionPriorityOrder is OldestFirst. 有关样式属性的说明,请参阅数据集可用性部分。For a description of the style property, see dataset availability section. 有关 executionPriorityOrder 部分的说明,请参阅活动策略部分。For a description of the executionPriorityOrder section, see the activity policies section.

通过在活动 JSON 的 策略 部分中设置 并发 属性,用户可将回填数据切片配置为并行处理。You can configure back-filled data slices to be processed in parallel by setting the concurrency property in the policy section of the activity JSON. 此属性决定可在不同切片上发生的并行活动执行次数。This property determines the number of parallel activity executions that can happen on different slices. 并发性属性的默认值为 1。The default value for the concurrency property is 1. 因此,一个切片默认被处理一次。Therefore, one slice is processed at a time by default. 最大值为 10。The maximum value is 10. 当管道需要完成大量可用数据,更大的并发值能加快数据处理速度。When a pipeline needs to go through a large set of available data, having a larger concurrency value speeds up the data processing.

重新运行失败的数据切片Rerun a failed data slice

如果处理数据切片时出错,可以通过 Azure 门户边栏选项卡或监视器和管理应用程序的切片了解处理失败的原因。When an error occurs while processing a data slice, you can find out why the processing of a slice failed by using Azure portal blades or Monitor and Manage App. 有关详细信息,请参阅使用 Azure 门户边栏选项卡监视和管理管道监视和管理应用See Monitoring and managing pipelines using Azure portal blades or Monitoring and Management app for details.

请参考以下示例,其中演示了两个活动。Consider the following example, which shows two activities. Activity1 和 Activity2。Activity1 and Activity 2. Activity1 使用 Dataset1 的切片,并生成 Dataset2,作为输入由 Activity2 以生成最终的数据集的切片的切片。Activity1 consumes a slice of Dataset1 and produces a slice of Dataset2, which is consumed as an input by Activity2 to produce a slice of the Final Dataset.

失败的切片

该图显示在最近的三个片段中,为 Dataset2 生成“上午 9-10 点”切片时失败。The diagram shows that out of three recent slices, there was a failure producing the 9-10 AM slice for Dataset2. 数据工厂自动跟踪时序数据集的依赖项。Data Factory automatically tracks dependency for the time series dataset. 因此,未启动“上午 9-10 点”下游切片的活动运行。As a result, it does not start the activity run for the 9-10 AM downstream slice.

使用数据工厂监视和管理工具可以深入查看失败切片的诊断日志,从而轻松找到问题的根本原因并进行修复。Data Factory monitoring and management tools allow you to drill into the diagnostic logs for the failed slice to easily find the root cause for the issue and fix it. 修复问题后,便可轻松地启动活动运行以生成失败切片。After you have fixed the issue, you can easily start the activity run to produce the failed slice. 有关如何重新运行和了解数据切片的状态转换的详细信息,请参阅使用 Azure 门户边栏选项卡监视和管理管道监视和管理应用For more information on how to rerun and understand state transitions for data slices, see Monitoring and managing pipelines using Azure portal blades or Monitoring and Management app.

重新运行“Dataset2”的“上午 9-10 点”切片后,数据工厂会在最终数据集上启动运行“上午 9-10 点”依赖切片。After you rerun the 9-10 AM slice for Dataset2, Data Factory starts the run for the 9-10 AM dependent slice on the final dataset.

重新运行失败的切片

管道中的多个活动Multiple activities in a pipeline

可在管道中添加多个活动。You can have more than one activity in a pipeline. 如果管道中有多个活动,且一个活动的输出不是其他活动的输入,则若活动的输入数据切片准备就绪,活动可能会并行运行。If you have multiple activities in a pipeline and the output of an activity is not an input of another activity, the activities may run in parallel if input data slices for the activities are ready.

通过将一个活动的输出数据集设置为另一个活动的输入数据集,可链接两个活动(两个活动先后运行)。You can chain two activities (run one activity after another) by setting the output dataset of one activity as the input dataset of the other activity. 这两个活动可在相同管道中,也可在不同管道中。The activities can be in the same pipeline or in different pipelines. 仅当第一个活动成功完成后第二个活动才能执行。The second activity executes only when the first one finishes successfully.

例如,考虑以下情况下管道其中有两个活动:For example, consider the following case where a pipeline has two activities:

  1. 需要外部输入数据集 D1 的活动 A1,并生成 输出数据集 D2。Activity A1 that requires external input dataset D1, and produces output dataset D2.
  2. 需要来自数据集 D2 的输入的活动 A2,并生成输出数据集 D3。Activity A2 that requires input from dataset D2, and produces output dataset D3.

在此示例中,活动 A1 和 A2 在相同的管道中。In this scenario, activities A1 and A2 are in the same pipeline. 当外部数据可用且达到计划可用性频率时,运行活动 A1。The activity A1 runs when the external data is available and the scheduled availability frequency is reached. 来自 D2 的计划切片可用且达到计划可用性频率时,运行活动 A2。The activity A2 runs when the scheduled slices from D2 become available and the scheduled availability frequency is reached. 如果数据集 D2 中的某个切片发生错误,则在该切片可用之前,A2 不会运行该切片。If there is an error in one of the slices in dataset D2, A2 does not run for that slice until it becomes available.

两个活动位于相同管道的“关系图”视图如下图所示:The Diagram view with both activities in the same pipeline would look like the following diagram:

链接相同管道中的活动

如前文所述,活动可以位于不同的管道中。As mentioned earlier, the activities could be in different pipelines. 在此情况下,图示视图如下图所示:In such a scenario, the diagram view would look like the following diagram:

链接两个管道中的活动

有关示例,请参阅附录中的“按顺序复制”部分。See the copy sequentially section in the appendix for an example.

具有不同频率的模型数据集Model datasets with different frequencies

在这些示例中,输入和输出数据集的频率与活动计划时段相同。In the samples, the frequencies for input and output datasets and the activity schedule window were the same. 某些方案需要以某一频率生成输出的功能,其中此频率不同于一个或多个输入的频率。Some scenarios require the ability to produce output at a frequency different than the frequencies of one or more inputs. 数据工厂支持对这些方案进行建模。Data Factory supports modeling these scenarios.

示例 1:为每小时可用的输入数据生成每日输出报告Sample 1: Produce a daily output report for input data that is available every hour

请考虑这样一种方案,方案中具有来自 Azure Blob 存储中每小时可用的传感器的输入测量数据。Consider a scenario in which you have input measurement data from sensors available every hour in Azure Blob storage. 希望使用数据工厂 hive 活动生成包含当天平均值、最大值和最小值等统计数据的每日汇总报告。You want to produce a daily aggregate report with statistics such as mean, maximum, and minimum for the day with Data Factory hive activity.

下面介绍如何使用数据工厂对这种方案建模:Here is how you can model this scenario with Data Factory:

输入数据集Input dataset

删除文件夹中某给定日期的每小时输入文件。The hourly input files are dropped in the folder for the given day. 输入的 Availability 设置为 Hour (frequency: Hour, interval: 1)。Availability for input is set at Hour (frequency: Hour, interval: 1).

{
  "name": "AzureBlobInput",
  "properties": {
    "type": "AzureBlob",
    "linkedServiceName": "StorageLinkedService",
    "typeProperties": {
      "folderPath": "mycontainer/myfolder/{Year}/{Month}/{Day}/",
      "partitionedBy": [
        { "name": "Year", "value": {"type": "DateTime","date": "SliceStart","format": "yyyy"}},
        { "name": "Month","value": {"type": "DateTime","date": "SliceStart","format": "MM"}},
        { "name": "Day","value": {"type": "DateTime","date": "SliceStart","format": "dd"}}
      ],
      "format": {
        "type": "TextFormat"
      }
    },
    "external": true,
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

输出数据集Output dataset

每天在当天的文件夹中创建一个输出文件。One output file is created every day in the day's folder. 输出的 Availability 设置为 Day (frequency: Day and interval: 1)。Availability of output is set at Day (frequency: Day and interval: 1).

{
  "name": "AzureBlobOutput",
  "properties": {
    "type": "AzureBlob",
    "linkedServiceName": "StorageLinkedService",
    "typeProperties": {
      "folderPath": "mycontainer/myfolder/{Year}/{Month}/{Day}/",
      "partitionedBy": [
        { "name": "Year", "value": {"type": "DateTime","date": "SliceStart","format": "yyyy"}},
        { "name": "Month","value": {"type": "DateTime","date": "SliceStart","format": "MM"}},
        { "name": "Day","value": {"type": "DateTime","date": "SliceStart","format": "dd"}}
      ],
      "format": {
        "type": "TextFormat"
      }
    },
    "availability": {
      "frequency": "Day",
      "interval": 1
    }
  }
}

活动:管道中的 hive 活动Activity: hive activity in a pipeline

Hive 脚本接收相应的 DateTime 信息作为参数,这些参数使用 WindowStart 变量,如以下代码段中所示。The hive script receives the appropriate DateTime information as parameters that use the WindowStart variable as shown in the following snippet. hive 脚本使用此变量从正确的文件夹加载当天数据,并运行聚合以生成输出。The hive script uses this variable to load the data from the correct folder for the day and run the aggregation to generate the output.

{  
    "name":"SamplePipeline",
    "properties":{  
    "start":"2015-01-01T08:00:00",
    "end":"2015-01-01T11:00:00",
    "description":"hive activity",
    "activities": [
        {
            "name": "SampleHiveActivity",
            "inputs": [
                {
                    "name": "AzureBlobInput"
                }
            ],
            "outputs": [
                {
                    "name": "AzureBlobOutput"
                }
            ],
            "linkedServiceName": "HDInsightLinkedService",
            "type": "HDInsightHive",
            "typeProperties": {
                "scriptPath": "adftutorial\\hivequery.hql",
                "scriptLinkedService": "StorageLinkedService",
                "defines": {
                    "Year": "$$Text.Format('{0:yyyy}',WindowStart)",
                    "Month": "$$Text.Format('{0:MM}',WindowStart)",
                    "Day": "$$Text.Format('{0:dd}',WindowStart)"
                }
            },
            "scheduler": {
                "frequency": "Day",
                "interval": 1
            },            
            "policy": {
                "concurrency": 1,
                "executionPriorityOrder": "OldestFirst",
                "retry": 2,
                "timeout": "01:00:00"
            }
         }
     ]
   }
}

下图从数据依赖项的角度演示该方案。The following diagram shows the scenario from a data-dependency point of view.

数据依赖项

每天的输出切片依赖于输入数据集的 24 小时切片。The output slice for every day depends on 24 hourly slices from an input dataset. 数据工厂会自动计算这些依赖项,方法是计算在同一时间段内作为待生成输出切片的输入数据切片。Data Factory computes these dependencies automatically by figuring out the input data slices that fall in the same time period as the output slice to be produced. 如果 24 个输入切片均不可用,数据工厂将等待输入切片准备就绪,然后启动每日活动运行。If any of the 24 input slices is not available, Data Factory waits for the input slice to be ready before starting the daily activity run.

示例 2:使用表达式和数据工厂函数指定依赖项Sample 2: Specify dependency with expressions and Data Factory functions

现在考虑下另一种方案。Let’s consider another scenario. 假设某个 hive 活动处理两个输入数据集。Suppose you have a hive activity that processes two input datasets. 其中一个每日获取新数据,但另一个每周获取新数据。One of them has new data daily, but one of them gets new data every week. 假设希望联接两个输入并每天生成输出。Suppose you wanted to do a join across the two inputs and produce an output every day.

数据工厂通过对齐到输出数据切片时间段来自动计算相应的输入切片,这种简单方法不起作用。The simple approach in which Data Factory automatically figures out the right input slices to process by aligning to the output data slice’s time period does not work.

必须为每个活动运行指定:数据工厂应对每周输入数据集使用上一周的数据切片。You must specify that for every activity run, the Data Factory should use last week’s data slice for the weekly input dataset. 使用以下代码段中所示的 Azure 数据工厂函数实现此行为。You use Azure Data Factory functions as shown in the following snippet to implement this behavior.

Input1:Azure blobInput1: Azure blob

第一个输入是每日更新的 Azure blob。The first input is the Azure blob being updated daily.

{
  "name": "AzureBlobInputDaily",
  "properties": {
    "type": "AzureBlob",
    "linkedServiceName": "StorageLinkedService",
    "typeProperties": {
      "folderPath": "mycontainer/myfolder/{Year}/{Month}/{Day}/",
      "partitionedBy": [
        { "name": "Year", "value": {"type": "DateTime","date": "SliceStart","format": "yyyy"}},
        { "name": "Month","value": {"type": "DateTime","date": "SliceStart","format": "MM"}},
        { "name": "Day","value": {"type": "DateTime","date": "SliceStart","format": "dd"}}
      ],
      "format": {
        "type": "TextFormat"
      }
    },
    "external": true,
    "availability": {
      "frequency": "Day",
      "interval": 1
    }
  }
}

Input2:Azure blobInput2: Azure blob

Input2 是每周更新的 Azure blob。Input2 is the Azure blob being updated weekly.

{
  "name": "AzureBlobInputWeekly",
  "properties": {
    "type": "AzureBlob",
    "linkedServiceName": "StorageLinkedService",
    "typeProperties": {
      "folderPath": "mycontainer/myfolder/{Year}/{Month}/{Day}/",
      "partitionedBy": [
        { "name": "Year", "value": {"type": "DateTime","date": "SliceStart","format": "yyyy"}},
        { "name": "Month","value": {"type": "DateTime","date": "SliceStart","format": "MM"}},
        { "name": "Day","value": {"type": "DateTime","date": "SliceStart","format": "dd"}}
      ],
      "format": {
        "type": "TextFormat"
      }
    },
    "external": true,
    "availability": {
      "frequency": "Day",
      "interval": 7
    }
  }
}

输出:Azure blobOutput: Azure blob

每天在当天的文件夹中创建一个输出文件。One output file is created every day in the folder for the day. 输出的 Availability 设置为 day (frequency: Day, interval: 1)。Availability of output is set to day (frequency: Day, interval: 1).

{
  "name": "AzureBlobOutputDaily",
  "properties": {
    "type": "AzureBlob",
    "linkedServiceName": "StorageLinkedService",
    "typeProperties": {
      "folderPath": "mycontainer/myfolder/{Year}/{Month}/{Day}/",
      "partitionedBy": [
        { "name": "Year", "value": {"type": "DateTime","date": "SliceStart","format": "yyyy"}},
        { "name": "Month","value": {"type": "DateTime","date": "SliceStart","format": "MM"}},
        { "name": "Day","value": {"type": "DateTime","date": "SliceStart","format": "dd"}}
      ],
      "format": {
        "type": "TextFormat"
      }
    },
    "availability": {
      "frequency": "Day",
      "interval": 1
    }
  }
}

活动:管道中的 hive 活动Activity: hive activity in a pipeline

hive 活动每天采用两个输入并且生成输出切片。The hive activity takes the two inputs and produces an output slice every day. 可将每天的输出切片指定为依赖每周输入的上一周输入切片,如下所示。You can specify every day’s output slice to depend on the previous week’s input slice for weekly input as follows.

{  
    "name":"SamplePipeline",
    "properties":{  
    "start":"2015-01-01T08:00:00",
    "end":"2015-01-01T11:00:00",
    "description":"hive activity",
    "activities": [
      {
        "name": "SampleHiveActivity",
        "inputs": [
          {
            "name": "AzureBlobInputDaily"
          },
          {
            "name": "AzureBlobInputWeekly",
            "startTime": "Date.AddDays(SliceStart, - Date.DayOfWeek(SliceStart))",
            "endTime": "Date.AddDays(SliceEnd,  -Date.DayOfWeek(SliceEnd))"  
          }
        ],
        "outputs": [
          {
            "name": "AzureBlobOutputDaily"
          }
        ],
        "linkedServiceName": "HDInsightLinkedService",
        "type": "HDInsightHive",
        "typeProperties": {
          "scriptPath": "adftutorial\\hivequery.hql",
          "scriptLinkedService": "StorageLinkedService",
          "defines": {
            "Year": "$$Text.Format('{0:yyyy}',WindowStart)",
            "Month": "$$Text.Format('{0:MM}',WindowStart)",
            "Day": "$$Text.Format('{0:dd}',WindowStart)"
          }
        },
        "scheduler": {
          "frequency": "Day",
          "interval": 1
        },            
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 2,  
          "timeout": "01:00:00"
        }
       }
     ]
   }
}

有关数据工厂支持的函数和系统变量列表,请参阅数据工厂的函数和系统变量See Data Factory functions and system variables for a list of functions and system variables that Data Factory supports.

附录Appendix

示例:按顺序复制Example: copy sequentially

可以按顺序或以有序的方式依次运行多个复制操作。It is possible to run multiple copy operations one after another in a sequential/ordered manner. 例如,管道中的两个复制活动(CopyActivity1 和 CopyActivity2)可能具有以下输入数据输出数据集:For example, you might have two copy activities in a pipeline (CopyActivity1 and CopyActivity2) with the following input data output datasets:

CopyActivity1CopyActivity1

输入:Dataset1。Input: Dataset. 输出:Dataset2。Output: Dataset2.

CopyActivity2CopyActivity2

输入:Dataset2。Input: Dataset2. 输出:Dataset3。Output: Dataset3.

仅当 CopyActivity1 已成功运行且 Dataset2 可用时,CopyActivity2 才会运行。CopyActivity2 would run only if the CopyActivity1 has run successfully and Dataset2 is available.

以下是示例管道 JSON:Here is the sample pipeline JSON:

{
    "name": "ChainActivities",
    "properties": {
        "description": "Run activities in sequence",
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "BlobSink",
                        "copyBehavior": "PreserveHierarchy",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "Dataset1"
                    }
                ],
                "outputs": [
                    {
                        "name": "Dataset2"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00"
                },
                "scheduler": {
                    "frequency": "Hour",
                    "interval": 1
                },
                "name": "CopyFromBlob1ToBlob2",
                "description": "Copy data from a blob to another"
            },
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "BlobSink",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "Dataset2"
                    }
                ],
                "outputs": [
                    {
                        "name": "Dataset3"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00"
                },
                "scheduler": {
                    "frequency": "Hour",
                    "interval": 1
                },
                "name": "CopyFromBlob2ToBlob3",
                "description": "Copy data from a blob to another"
            }
        ],
        "start": "2016-08-25T01:00:00Z",
        "end": "2016-08-25T01:00:00Z",
        "isPaused": false
    }
}

请注意,示例中将第一个复制活动 (Dataset2) 的输出数据集指定为第二个活动的输入。Notice that in the example, the output dataset of the first copy activity (Dataset2) is specified as input for the second activity. 因此,仅当第一个活动的输出数据集准备就绪后,第二个活动才会运行。Therefore, the second activity runs only when the output dataset from the first activity is ready.

在示例中,CopyActivity2 可以具有不同的输入,如 Dataset3,但由于将 Dataset2 指定为 CopyActivity2 的输入,因此该活动仅在 CopyActivity1 完成后才可运行。In the example, CopyActivity2 can have a different input, such as Dataset3, but you specify Dataset2 as an input to CopyActivity2, so the activity does not run until CopyActivity1 finishes. 例如:For example:

CopyActivity1CopyActivity1

输入:Dataset1。Input: Dataset1. 输出:Dataset2。Output: Dataset2.

CopyActivity2CopyActivity2

输入:Dataset3、Dataset2。Inputs: Dataset3, Dataset2. 输出:Dataset4。Output: Dataset4.

{
    "name": "ChainActivities",
    "properties": {
        "description": "Run activities in sequence",
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "BlobSink",
                        "copyBehavior": "PreserveHierarchy",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "Dataset1"
                    }
                ],
                "outputs": [
                    {
                        "name": "Dataset2"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00"
                },
                "scheduler": {
                    "frequency": "Hour",
                    "interval": 1
                },
                "name": "CopyFromBlobToBlob",
                "description": "Copy data from a blob to another"
            },
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "BlobSink",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "Dataset3"
                    },
                    {
                        "name": "Dataset2"
                    }
                ],
                "outputs": [
                    {
                        "name": "Dataset4"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00"
                },
                "scheduler": {
                    "frequency": "Hour",
                    "interval": 1
                },
                "name": "CopyFromBlob3ToBlob4",
                "description": "Copy data from a blob to another"
            }
        ],
        "start": "2017-04-25T01:00:00Z",
        "end": "2017-04-25T01:00:00Z",
        "isPaused": false
    }
}

请注意,示例中为第二个复制活动指定了两个输入数据集。Notice that in the example, two input datasets are specified for the second copy activity. 如果指定了多个输入,则仅将第一个输入数据集用于复制数据,其他数据集用作依赖项。When multiple inputs are specified, only the first input dataset is used for copying data, but other datasets are used as dependencies. 仅当满足以下条件后 CopyActivity2 才会启动:CopyActivity2 would start only after the following conditions are met:

  • CopyActivity1 已成功完成且 Dataset2 可用。CopyActivity1 has successfully completed and Dataset2 is available. 将数据复制到 Dataset4 时不会使用此数据集。This dataset is not used when copying data to Dataset4. 它仅可充当 CopyActivity2 的计划依赖项。It only acts as a scheduling dependency for CopyActivity2.
  • Dataset3 可用。Dataset3 is available. 此数据集表示复制到目标的数据。This dataset represents the data that is copied to the destination.