Pipelines and activities in Azure Data Factory

This article helps you understand pipelines and activities in Azure Data Factory and use them to construct end-to-end data-driven workflows for your data movement and data processing scenarios.

Note

This article applies to version 2 of Data Factory, which is currently in preview. If you are using version 1 of the Data Factory service, which is generally available (GA), see Pipelines in Data Factory version 1.

This article assumes that you have gone through Introduction to Azure Data Factory and the quickstart tutorial.

Overview

A data factory can have one or more pipelines. A pipeline is a logical grouping of activities that together perform a task. For example, a pipeline could contain a set of activities that ingest and clean log data, and then kick of a Spark job on an HDInsight cluster to analyze the log data. The beauty of this is that the pipeline allows you to manage the activities as a set instead of each one individually. For example, you can deploy and schedule the pipeline, instead of the activities independently.

The activities in a pipeline define actions to perform on your data. For example, you may use a copy activity to copy data from an on-premises SQL Server to an Azure Blob Storage. Then, use a Hive activity that runs a Hive script on an Azure HDInsight cluster to process/transform data from the blob storage to produce output data. Finally, use a second copy activity to copy the output data to an Azure SQL Data Warehouse on top of which business intelligence (BI) reporting solutions are built.

Data Factory supports three types of activities: data movement activities, data transformation activities, and control activities. An activity can take zero or more input datasets and produce one or more output datasets. The following diagram shows the relationship between pipeline, activity, and dataset in Data Factory:

Relationship between dataset, activity, and pipeline

An input dataset represents the input for an activity in the pipeline and an output dataset represents the output for the activity. Datasets identify data within different data stores, such as tables, files, folders, and documents. After you create a dataset, you can use it with activities in a pipeline. For example, a dataset can be an input/output dataset of a Copy Activity or an HDInsightHive Activity. For more information about datasets, see Datasets in Azure Data Factory article.

Data movement activities

Copy Activity in Data Factory copies data from a source data store to a sink data store. Data Factory supports the data stores listed in the table in this section. Data from any source can be written to any sink. Click a data store to learn how to copy data to and from that store.

Category Data store Supported as a source Supported as a sink Supported by Azure IR Supported by Self-hosted IR
Azure Azure Blob Storage
  Azure Cosmos DB
  Azure Data Lake Store
  Azure Database for MySQL
  Azure Database for PostgreSQL
  Azure File Storage
  Azure SQL Database
  Azure SQL Data Warehouse
  Azure Search Index
  Azure Table Storage
Database Amazon Redshift
  DB2
  Drill (Beta)
  Google BigQuery (Beta)
  Greenplum (Beta)
  HBase
  Hive
  Apache Impala (Beta)
  Informix
  MariaDB
  Microsoft Access
  MySQL
  Oracle
  Phoenix
  PostgreSQL
  Presto (Beta)
  SAP Business Warehouse
  SAP HANA
  Spark
  SQL Server
  Sybase
  Teradata
NoSQL Cassandra
  Couchbase (Beta)
  MongoDB
File Amazon S3
  File System
  FTP
  HDFS
  SFTP
Generic protocol Generic HTTP
  Generic OData
  Generic ODBC
Services and apps Amazon Marketplace Web Service (Beta)
  Concur (Beta)
  Dynamics 365
  Dynamics CRM
  GE Historian
  HubSpot (Beta)
  Jira (Beta)
  Magento (Beta)
  Marketo (Beta)
  Oracle Eloqua (Beta)
  Paypal (Beta)
  QuickBooks (Beta)
  Salesforce
  Salesforce Service Cloud
  SAP Cloud for Customer (C4C)
  ServiceNow (Beta)
  Shopify (Beta)
  Square (Beta)
  Web Table (HTML table)
  Xero (Beta)
  Zoho (Beta)

Note

Any connector marked as Beta means that you can try it out and give us feedback. If you want to take a dependency on Beta connectors in your solution, please contact Azure support.

For more information, see Copy Activity - Overview article.

Data transformation activities

Azure Data Factory supports the following transformation activities that can be added to pipelines either individually or chained with another activity.

Data transformation activity Compute environment
Hive HDInsight [Hadoop]
Pig HDInsight [Hadoop]
MapReduce HDInsight [Hadoop]
Hadoop Streaming HDInsight [Hadoop]
Spark HDInsight [Hadoop]
Machine Learning activities: Batch Execution and Update Resource Azure VM
Stored Procedure Azure SQL, Azure SQL Data Warehouse, or SQL Server
U-SQL Azure Data Lake Analytics

For more information, see the data transformation activities article.

Control activities

The following control flow activities are supported:

Control activity Description
Execute Pipeline Activity Execute Pipeline activity allows a Data Factory pipeline to invoke another pipeline.
ForEachActivity ForEach Activity defines a repeating control flow in your pipeline. This activity is used to iterate over a collection and executes specified activities in a loop. The loop implementation of this activity is similar to Foreach looping structure in programming languages.
WebActivity Web Activity can be used to call a custom REST endpoint from a Data Factory pipeline. You can pass datasets and linked services to be consumed and accessed by the activity.
Lookup Activity Lookup Activity can be used to read or look up a record/ table name/ value from any external source. This output can further be referenced by succeeding activities.
Get Metadata Activity GetMetadata activity can be used to retrieve metadata of any data in Azure Data Factory.
Until Activity Implements Do-Until loop that is similar to Do-Until looping structure in programming languages. It executes a set of activities in a loop until the condition associated with the activity evaluates to true. You can specify a timeout value for the until activity in Data Factory.
If Condition Activity The If Condition can be used to branch based on condition that evaluates to true or false. The If Condition activity provides the same functionality that an if statement provides in programming languages. It evaluates a set of activities when the condition evaluates to true and another set of activities when the condition evaluates to false.
Wait Activity When you use a Wait activity in a pipeline, the pipeline waits for the specified period of time before continuing with execution of subsequent activities.

Pipeline JSON

Here is how a pipeline is defined in JSON format:

{
    "name": "PipelineName",
    "properties": 
    {
        "description": "pipeline description",
        "activities":
        [
        ],
        "parameters": {
         }
    }
}
Tag Description Type Required
name Name of the pipeline. Specify a name that represents the action that the pipeline performs.
  • Maximum number of characters: 260
  • Must start with a letter number, or an underscore (_)
  • • Following characters are not allowed: “.”, “+”, “?”, “/”, “<”,”>”,”*”,”%”,”&”,”:”,”\”
String Yes
description Specify the text describing what the pipeline is used for. String No
activities The activities section can have one or more activities defined within it. See the Activity JSON section for details about the activities JSON element. Array Yes
parameters The parameters section can have one or more parameters defined within the pipeline, making your pipeline flexible for reuse. List No

Activity JSON

The activities section can have one or more activities defined within it. There are two main types of activities: Execution and Control Activities.

Execution activities

Execution activities include data movement and data transformation activities. They have the following top-level structure:

{
    "name": "Execution Activity Name",
    "description": "description", 
    "type": "<ActivityType>",
    "typeProperties":
    {
    },
    "linkedServiceName": "MyLinkedService",
    "policy":
    {
    },
    "dependsOn":
    {
    }
}

Following table describes properties in the activity JSON definition:

Tag Description Required
name Name of the activity. Specify a name that represents the action that the activity performs.
  • Maximum number of characters: 260
  • Must start with a letter number, or an underscore (_)
  • Following characters are not allowed: “.”, “+”, “?”, “/”, “<”,”>”,”*”,”%”,”&”,”:”,”\”
Yes
description Text describing what the activity or is used for Yes
type Type of the activity. See the Data Movement Activities, Data Transformation Activities, and Control Activities sections for different types of activities. Yes
linkedServiceName Name of the linked service used by the activity.

An activity may require that you specify the linked service that links to the required compute environment.
Yes for HDInsight Activity, Azure Machine Learning Batch Scoring Activity, Stored Procedure Activity.

No for all others
typeProperties Properties in the typeProperties section depend on each type of activity. To see type properties for an activity, click links to the activity in the previous section. No
policy Policies that affect the run-time behavior of the activity. This property includes timeout and retry behavior. If it is not specified, default values are used. For more information, see Activity policy section. No
dependsOn This property is used to define activity dependencies, and how subsequent activities depend on previous activities. For more information, see Activity dependency No

Activity policy

Policies affect the run-time behavior of an activity, giving configurability options. Activity Policies are only available for execution activities.

Activity policy JSON definition

{
    "name": "MyPipelineName",
    "properties": {
      "activities": [
        {
          "name": "MyCopyBlobtoSqlActivity"
          "type": "Copy",
          "typeProperties": {
            ...
          },
         "policy": {
            "timeout": "00:10:00",
            "retry": 1,
            "retryIntervalInSeconds": 60
         }
        }
      ],
        "parameters": {
           ...
        }
    }
}
JSON name Description Allowed Values Required
timeout Specifies the timeout for the activity to run. Timespan No. Default timeout is 7 days.
retry Maximum retry attempts Integer No. Default is 0
retryIntervalInSeconds The delay between retry attempts in seconds Integer No. Default is 20 seconds

Control activity

Control activities have the following top-level structure:

{
    "name": "Control Activity Name",
    "description": "description", 
    "type": "<ActivityType>",
    "typeProperties":
    {
    },
    "dependsOn":
    {
    }
}
Tag Description Required
name Name of the activity. Specify a name that represents the action that the activity performs.
  • Maximum number of characters: 260
  • Must start with a letter number, or an underscore (_)
  • Following characters are not allowed: “.”, “+”, “?”, “/”, “<”,”>”,”*”,”%”,”&”,”:”,”\”
Yes
    description Text describing what the activity or is used for Yes
    type Type of the activity. See the data movement activities, data transformation activities, and control activities sections for different types of activities. Yes
    typeProperties Properties in the typeProperties section depend on each type of activity. To see type properties for an activity, click links to the activity in the previous section. No
    dependsOn This property is used to define Activity Dependency, and how subsequent activities depend on previous activities. For more information, see activity dependency. No

    Activity dependency

    Activity Dependency defines how subsequent activities depend on previous activities, determining the condition whether to continue executing the next task. An activity can depend on one or multiple previous activities with different dependency conditions.

    The different dependency conditions are: Succeeded, Failed, Skipped, Completed.

    For example, if a pipeline has Activity A -> Activity B, the different scenarios that can happen are:

    • Activity B has dependency condition on Activity A with succeeded: Activity B only runs if Activity A has a final status of succeeded
    • Activity B has dependency condition on Activity A with failed: Activity B only runs if Activity A has a final status of failed
    • Activity B has dependency condition on Activity A with completed: Activity B runs if Activity A has a final status of succeeded or failed
    • Activity B has dependency condition on Activity A with skipped: Activity B runs if Activity A has a final status of skipped. Skipped occurs in the scenario of Activity X -> Activity Y -> Activity Z, where each activity runs only if the previous activity succeeds. If Activity X fails, then Activity Y has a status of “Skipped” because it never executes. Similarly, Activity Z has a status of “Skipped” as well.

    Example: Activity 2 depends on the Activity 1 succeeding

    {
        "name": "PipelineName",
        "properties": 
        {
            "description": "pipeline description",
            "activities": [
             {
                "name": "MyFirstActivity",
                "type": "Copy",
                "typeProperties": {
                },
                "linkedServiceName": {
                }
            },
            {
                "name": "MySecondActivity",
                "type": "Copy",
                "typeProperties": {
                },
                "linkedServiceName": {
                },
                "dependsOn": [
                {
                    "activity": "MyFirstActivity",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
              ]
            }
          ],
          "parameters": {
           }
        }
    }
    

    Sample copy pipeline

    In the following sample pipeline, there is one activity of type Copy in the activities section. In this sample, the copy activity copies data from an Azure Blob storage to an Azure SQL database.

    {
      "name": "CopyPipeline",
      "properties": {
        "description": "Copy data from a blob to Azure SQL table",
        "activities": [
          {
            "name": "CopyFromBlobToSQL",
            "type": "Copy",
            "inputs": [
              {
                "name": "InputDataset"
              }
            ],
            "outputs": [
              {
                "name": "OutputDataset"
              }
            ],
            "typeProperties": {
              "source": {
                "type": "BlobSource"
              },
              "sink": {
                "type": "SqlSink",
                "writeBatchSize": 10000,
                "writeBatchTimeout": "60:00:00"
              }
            },
            "policy": {
              "retry": 2,
              "timeout": "01:00:00"
            }
          }
        ]
      }
    } 
    

    Note the following points:

    • In the activities section, there is only one activity whose type is set to Copy.
    • Input for the activity is set to InputDataset and output for the activity is set to OutputDataset. See Datasets article for defining datasets in JSON.
    • In the typeProperties section, BlobSource is specified as the source type and SqlSink is specified as the sink type. In the data movement activities section, click the data store that you want to use as a source or a sink to learn more about moving data to/from that data store.

    For a complete walkthrough of creating this pipeline, see Quickstart: create a data factory.

    Sample transformation pipeline

    In the following sample pipeline, there is one activity of type HDInsightHive in the activities section. In this sample, the HDInsight Hive activity transforms data from an Azure Blob storage by running a Hive script file on an Azure HDInsight Hadoop cluster.

    {
        "name": "TransformPipeline",
        "properties": {
            "description": "My first Azure Data Factory pipeline",
            "activities": [
                {
                    "type": "HDInsightHive",
                    "typeProperties": {
                        "scriptPath": "adfgetstarted/script/partitionweblogs.hql",
                        "scriptLinkedService": "AzureStorageLinkedService",
                        "defines": {
                            "inputtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/inputdata",
                            "partitionedtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/partitioneddata"
                        }
                    },
                    "inputs": [
                        {
                            "name": "AzureBlobInput"
                        }
                    ],
                    "outputs": [
                        {
                            "name": "AzureBlobOutput"
                        }
                    ],
                    "policy": {
                        "retry": 3
                    },
                    "name": "RunSampleHiveActivity",
                    "linkedServiceName": "HDInsightOnDemandLinkedService"
                }
            ]
        }
    }
    

    Note the following points:

    • In the activities section, there is only one activity whose type is set to HDInsightHive.
    • The Hive script file, partitionweblogs.hql, is stored in the Azure storage account (specified by the scriptLinkedService, called AzureStorageLinkedService), and in script folder in the container adfgetstarted.
    • The defines section is used to specify the runtime settings that are passed to the hive script as Hive configuration values (for example, ${hiveconf:inputtable}, ${hiveconf:partitionedtable}).

    The typeProperties section is different for each transformation activity. To learn about type properties supported for a transformation activity, click the transformation activity in the Data transformation activities.

    For a complete walkthrough of creating this pipeline, see Tutorial: transform data using Spark.

    Multiple activities in a pipeline

    The previous two sample pipelines have only one activity in them. You can have more than one activity in a pipeline. If you have multiple activities in a pipeline and subsequent activities are not dependent on previous activities, the activities may run in parallel.

    You can chain two activities by using activity dependency, which defines how subsequent activities depend on previous activities, determining the condition whether to continue executing the next task. An activity can defend on one or more previous activities with different dependency conditions.

    Scheduling pipelines

    Pipelines are scheduled by triggers. There are different types of triggers (scheduler trigger, which allows pipelines to be triggered on a wall-clock schedule, as well as manual trigger, which triggers pipelines on-demand). For more information about triggers, see pipeline execution and triggers article.

    To have your trigger kick off a pipeline run, you must include a pipeline reference of the particular pipeline in the trigger definition. Pipelines & triggers have an n-m relationship. Multiple triggers can kick off a single pipeline and the same trigger can kick off multiple pipelines. Once the trigger is defined, you must start the trigger to have it start triggering the pipeline. For more information about triggers, see pipeline execution and triggers article.

    For example, say you have a scheduler trigger, “Trigger A” that I wish to kick off my pipeline, “MyCopyPipeline.” You define the trigger as shown in the following example:

    Trigger A definition

    {
      "name": "TriggerA",
      "properties": {
        "type": "ScheduleTrigger",
        "typeProperties": {
          ...
          }
        },
        "pipeline": {
          "pipelineReference": {
            "type": "PipelineReference",
            "referenceName": "MyCopyPipeline"
          },
          "parameters": {
            "copySourceName": "FileSource"
          }
        }
      }
    }
    

    Next steps

    See the following tutorials for step-by-step instructions for creating pipelines with activities: