Tutorial: Create a pipeline with Copy Activity using .NET API

In this article, you learn how to use .NET API to create a data factory with a pipeline that copies data from an Azure blob storage to an Azure SQL database. If you are new to Azure Data Factory, read through the Introduction to Azure Data Factory article before doing this tutorial.

In this tutorial, you create a pipeline with one activity in it: Copy Activity. The copy activity copies data from a supported data store to a supported sink data store. For a list of data stores supported as sources and sinks, see supported data stores. The activity is powered by a globally available service that can copy data between various data stores in a secure, reliable, and scalable way. For more information about the Copy Activity, see Data Movement Activities.

A pipeline can have more than one activity. And, you can chain two activities (run one activity after another) by setting the output dataset of one activity as the input dataset of the other activity. For more information, see multiple activities in a pipeline.

Note

For complete documentation on .NET API for Data Factory, see Data Factory .NET API Reference.

The data pipeline in this tutorial copies data from a source data store to a destination data store. For a tutorial on how to transform data using Azure Data Factory, see Tutorial: Build a pipeline to transform data using Hadoop cluster.

Prerequisites

Create an application in Azure Active Directory

Create an Azure Active Directory application, create a service principal for the application, and assign it to the Data Factory Contributor role.

  1. Launch PowerShell.
  2. Run the following command and enter the user name and password that you use to sign in to the Azure portal.

    Login-AzureRmAccount
    
  3. Run the following command to view all the subscriptions for this account.

    Get-AzureRmSubscription
    
  4. Run the following command to select the subscription that you want to work with. Replace <NameOfAzureSubscription> with the name of your Azure subscription.

    Get-AzureRmSubscription -SubscriptionName <NameOfAzureSubscription> | Set-AzureRmContext
    

    Important

    Note down SubscriptionId and TenantId from the output of this command.

  5. Create an Azure resource group named ADFTutorialResourceGroup by running the following command in the PowerShell.

    New-AzureRmResourceGroup -Name ADFTutorialResourceGroup  -Location "West US"
    

    If the resource group already exists, you specify whether to update it (Y) or keep it as (N).

    If you use a different resource group, you need to use the name of your resource group in place of ADFTutorialResourceGroup in this tutorial.

  6. Create an Azure Active Directory application.

    $azureAdApplication = New-AzureRmADApplication -DisplayName "ADFCopyTutotiralApp" -HomePage "https://www.contoso.org" -IdentifierUris "https://www.adfcopytutorialapp.org/example" -Password "Pass@word1"
    

    If you get the following error, specify a different URL and run the command again.

    Another object with the same value for property identifierUris already exists.
    
  7. Create the AD service principal.

    New-AzureRmADServicePrincipal -ApplicationId $azureAdApplication.ApplicationId
    
  8. Add service principal to the Data Factory Contributor role.

    New-AzureRmRoleAssignment -RoleDefinitionName "Data Factory Contributor" -ServicePrincipalName $azureAdApplication.ApplicationId.Guid
    
  9. Get the application ID.

    $azureAdApplication 
    

    Note down the application ID (applicationID) from the output.

You should have following four values from these steps:

  • Tenant ID
  • Subscription ID
  • Application ID
  • Password (specified in the first command)

Walkthrough

  1. Using Visual Studio 2012/2013/2015, create a C# .NET console application.
    1. Launch Visual Studio 2012/2013/2015.
    2. Click File, point to New, and click Project.
    3. Expand Templates, and select Visual C#. In this walkthrough, you use C#, but you can use any .NET language.
    4. Select Console Application from the list of project types on the right.
    5. Enter DataFactoryAPITestApp for the Name.
    6. Select C:\ADFGetStarted for the Location.
    7. Click OK to create the project.
  2. Click Tools, point to NuGet Package Manager, and click Package Manager Console.
  3. In the Package Manager Console, do the following steps:
    1. Run the following command to install Data Factory package: Install-Package Microsoft.Azure.Management.DataFactories
    2. Run the following command to install Azure Active Directory package (you use Active Directory API in the code): Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory -Version 2.19.208020213
  4. Add the following appSetttings section to the App.config file. These settings are used by the helper method: GetAuthorizationHeader.

    Replace values for <Application ID>, <Password>, <Subscription ID>, and <tenant ID> with your own values.

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
        <appSettings>
            <add key="ActiveDirectoryEndpoint" value="https://login.microsoftonline.com/" />
            <add key="ResourceManagerEndpoint" value="https://management.azure.com/" />
            <add key="WindowsManagementUri" value="https://management.core.windows.net/" />
    
            <add key="ApplicationId" value="your application ID" />
            <add key="Password" value="Password you used while creating the AAD application" />
            <add key="SubscriptionId" value= "Subscription ID" />
            <add key="ActiveDirectoryTenantId" value="Tenant ID" />
        </appSettings>
    </configuration>
    
  5. Add the following using statements to the source file (Program.cs) in the project.

    using System.Configuration;
    using System.Collections.ObjectModel;
    using System.Threading;
    using System.Threading.Tasks;
    
    using Microsoft.Azure;
    using Microsoft.Azure.Management.DataFactories;
    using Microsoft.Azure.Management.DataFactories.Models;
    using Microsoft.Azure.Management.DataFactories.Common.Models;
    
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  6. Add the following code that creates an instance of DataPipelineManagementClient class to the Main method. You use this object to create a data factory, a linked service, input and output datasets, and a pipeline. You also use this object to monitor slices of a dataset at runtime.

    // create data factory management client
    string resourceGroupName = "ADFTutorialResourceGroup";
    string dataFactoryName = "APITutorialFactory";
    
    TokenCloudCredentials aadTokenCredentials = new TokenCloudCredentials(
            ConfigurationManager.AppSettings["SubscriptionId"],
            GetAuthorizationHeader().Result);
    
    Uri resourceManagerUri = new Uri(ConfigurationManager.AppSettings["ResourceManagerEndpoint"]);
    
    DataFactoryManagementClient client = new DataFactoryManagementClient(aadTokenCredentials, resourceManagerUri);
    

    Important

    Replace the value of resourceGroupName with the name of your Azure resource group.

    Update name of the data factory (dataFactoryName) to be unique. Name of the data factory must be globally unique. See Data Factory - Naming Rules topic for naming rules for Data Factory artifacts.

  7. Add the following code that creates a data factory to the Main method.

    // create a data factory
    Console.WriteLine("Creating a data factory");
    client.DataFactories.CreateOrUpdate(resourceGroupName,
        new DataFactoryCreateOrUpdateParameters()
        {
            DataFactory = new DataFactory()
            {
                Name = dataFactoryName,
                Location = "westus",
                Properties = new DataFactoryProperties()
            }
        }
    );
    

    A data factory can have one or more pipelines. A pipeline can have one or more activities in it. For example, a Copy Activity to copy data from a source to a destination data store and a HDInsight Hive activity to run a Hive script to transform input data to product output data. Let's start with creating the data factory in this step.

  8. Add the following code that creates an Azure Storage linked service to the Main method.

    Important

    Replace storageaccountname and accountkey with name and key of your Azure Storage account.

    // create a linked service for input data store: Azure Storage
    Console.WriteLine("Creating Azure Storage linked service");
    client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName,
        new LinkedServiceCreateOrUpdateParameters()
        {
            LinkedService = new LinkedService()
            {
                Name = "AzureStorageLinkedService",
                Properties = new LinkedServiceProperties
                (
                    new AzureStorageLinkedService("DefaultEndpointsProtocol=https;AccountName=<storageaccountname>;AccountKey=<accountkey>")
                )
            }
        }
    );
    

    You create linked services in a data factory to link your data stores and compute services to the data factory. In this tutorial, you don't use any compute service such as Azure HDInsight or Azure Data Lake Analytics. You use two data stores of type Azure Storage (source) and Azure SQL Database (destination).

    Therefore, you create two linked services named AzureStorageLinkedService and AzureSqlLinkedService of types: AzureStorage and AzureSqlDatabase.

    The AzureStorageLinkedService links your Azure storage account to the data factory. This storage account is the one in which you created a container and uploaded the data as part of prerequisites.

  9. Add the following code that creates an Azure SQL linked service to the Main method.

    Important

    Replace servername, databasename, username, and password with names of your Azure SQL server, database, user, and password.

    // create a linked service for output data store: Azure SQL Database
    Console.WriteLine("Creating Azure SQL Database linked service");
    client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName,
        new LinkedServiceCreateOrUpdateParameters()
        {
            LinkedService = new LinkedService()
            {
                Name = "AzureSqlLinkedService",
                Properties = new LinkedServiceProperties
                (
                    new AzureSqlDatabaseLinkedService("Data Source=tcp:<servername>.database.windows.net,1433;Initial Catalog=<databasename>;User ID=<username>;Password=<password>;Integrated Security=False;Encrypt=True;Connect Timeout=30")
                )
            }
        }
    );
    

    AzureSqlLinkedService links your Azure SQL database to the data factory. The data that is copied from the blob storage is stored in this database. You created the emp table in this database as part of prerequisites.

  10. Add the following code that creates input and output datasets to the Main method.

    // create input and output datasets
    Console.WriteLine("Creating input and output datasets");
    string Dataset_Source = "InputDataset";
    string Dataset_Destination = "OutputDataset";
    
    Console.WriteLine("Creating input dataset of type: Azure Blob");
    client.Datasets.CreateOrUpdate(resourceGroupName, dataFactoryName,
    
    new DatasetCreateOrUpdateParameters()
    {
        Dataset = new Dataset()
        {
            Name = Dataset_Source,
            Properties = new DatasetProperties()
            {
                Structure = new List<DataElement>()
                {
                    new DataElement() { Name = "FirstName", Type = "String" },
                    new DataElement() { Name = "LastName", Type = "String" }
                },
                LinkedServiceName = "AzureStorageLinkedService",
                TypeProperties = new AzureBlobDataset()
                {
                    FolderPath = "adftutorial/",
                    FileName = "emp.txt"
                },
                External = true,
                Availability = new Availability()
                {
                    Frequency = SchedulePeriod.Hour,
                    Interval = 1,
                },
    
                Policy = new Policy()
                {
                    Validation = new ValidationPolicy()
                    {
                        MinimumRows = 1
                    }
                }
            }
        }
    });
    
    Console.WriteLine("Creating output dataset of type: Azure SQL");
    client.Datasets.CreateOrUpdate(resourceGroupName, dataFactoryName,
        new DatasetCreateOrUpdateParameters()
        {
            Dataset = new Dataset()
            {
                Name = Dataset_Destination,
                Properties = new DatasetProperties()
                {
                    Structure = new List<DataElement>()
                    {
                        new DataElement() { Name = "FirstName", Type = "String" },
                        new DataElement() { Name = "LastName", Type = "String" }
                    },
                    LinkedServiceName = "AzureSqlLinkedService",
                    TypeProperties = new AzureSqlTableDataset()
                    {
                        TableName = "emp"
                    },
                    Availability = new Availability()
                    {
                        Frequency = SchedulePeriod.Hour,
                        Interval = 1,
                    },
                }
            }
        });
    

    In the previous step, you created linked services to link your Azure Storage account and Azure SQL database to your data factory. In this step, you define two datasets named InputDataset and OutputDataset that represent input and output data that is stored in the data stores referred by AzureStorageLinkedService and AzureSqlLinkedService respectively.

    The Azure storage linked service specifies the connection string that Data Factory service uses at run time to connect to your Azure storage account. And, the input blob dataset (InputDataset) specifies the container and the folder that contains the input data.

    Similarly, the Azure SQL Database linked service specifies the connection string that Data Factory service uses at run time to connect to your Azure SQL database. And, the output SQL table dataset (OututDataset) specifies the table in the database to which the data from the blob storage is copied.

    In this step, you create a dataset named InputDataset that points to a blob file (emp.txt) in the root folder of a blob container (adftutorial) in the Azure Storage represented by the AzureStorageLinkedService linked service. If you don't specify a value for the fileName (or skip it), data from all blobs in the input folder are copied to the destination. In this tutorial, you specify a value for the fileName.

    In this step, you create an output dataset named OutputDataset. This dataset points to a SQL table in the Azure SQL database represented by AzureSqlLinkedService.

  11. Add the following code that creates and activates a pipeline to the Main method. In this step, you create a pipeline with a copy activity that uses InputDataset as an input and OutputDataset as an output.

    // create a pipeline
    Console.WriteLine("Creating a pipeline");
    DateTime PipelineActivePeriodStartTime = new DateTime(2017, 5, 11, 0, 0, 0, 0, DateTimeKind.Utc);
    DateTime PipelineActivePeriodEndTime = new DateTime(2017, 5, 12, 0, 0, 0, 0, DateTimeKind.Utc);
    string PipelineName = "ADFTutorialPipeline";
    
    client.Pipelines.CreateOrUpdate(resourceGroupName, dataFactoryName,
        new PipelineCreateOrUpdateParameters()
        {
            Pipeline = new Pipeline()
            {
                Name = PipelineName,
                Properties = new PipelineProperties()
                {
                    Description = "Demo Pipeline for data transfer between blobs",
    
                    // Initial value for pipeline's active period. With this, you won't need to set slice status
                    Start = PipelineActivePeriodStartTime,
                    End = PipelineActivePeriodEndTime,
    
                    Activities = new List<Activity>()
                    {
                        new Activity()
                        {
                            Name = "BlobToAzureSql",
                            Inputs = new List<ActivityInput>()
                            {
                                new ActivityInput() {
                                    Name = Dataset_Source
                                }
                            },
                            Outputs = new List<ActivityOutput>()
                            {
                                new ActivityOutput()
                                {
                                    Name = Dataset_Destination
                                }
                            },
                            TypeProperties = new CopyActivity()
                            {
                                Source = new BlobSource(),
                                Sink = new BlobSink()
                                {
                                    WriteBatchSize = 10000,
                                    WriteBatchTimeout = TimeSpan.FromMinutes(10)
                                }
                            }
                        }
                    }
                }
            }
        });
    

    Note the following points:

    • In the activities section, there is only one activity whose type is set to Copy. For more information about the copy activity, see data movement activities. In Data Factory solutions, you can also use data transformation activities.
    • Input for the activity is set to InputDataset and output for the activity is set to OutputDataset.
    • In the typeProperties section, BlobSource is specified as the source type and SqlSink is specified as the sink type. For a complete list of data stores supported by the copy activity as sources and sinks, see supported data stores. To learn how to use a specific supported data store as a source/sink, click the link in the table.

    Currently, output dataset is what drives the schedule. In this tutorial, output dataset is configured to produce a slice once an hour. The pipeline has a start time and end time that are one day apart, which is 24 hours. Therefore, 24 slices of output dataset are produced by the pipeline.

  12. Add the following code to the Main method to get the status of a data slice of the output dataset. There is only slice expected in this sample.

    // Pulling status within a timeout threshold
    DateTime start = DateTime.Now;
    bool done = false;
    
    while (DateTime.Now - start < TimeSpan.FromMinutes(5) && !done)
    {
        Console.WriteLine("Pulling the slice status");        
        // wait before the next status check
        Thread.Sleep(1000 * 12);
    
        var datalistResponse = client.DataSlices.List(resourceGroupName, dataFactoryName, Dataset_Destination,
            new DataSliceListParameters()
            {
                DataSliceRangeStartTime = PipelineActivePeriodStartTime.ConvertToISO8601DateTimeString(),
                DataSliceRangeEndTime = PipelineActivePeriodEndTime.ConvertToISO8601DateTimeString()
            });
    
        foreach (DataSlice slice in datalistResponse.DataSlices)
        {
            if (slice.State == DataSliceState.Failed || slice.State == DataSliceState.Ready)
            {
                Console.WriteLine("Slice execution is done with status: {0}", slice.State);
                done = true;
                break;
            }
            else
            {
                Console.WriteLine("Slice status is: {0}", slice.State);
            }
        }
    }
    
  13. Add the following code to get run details for a data slice to the Main method.

    Console.WriteLine("Getting run details of a data slice");
    
    // give it a few minutes for the output slice to be ready
    Console.WriteLine("\nGive it a few minutes for the output slice to be ready and press any key.");
    Console.ReadKey();
    
    var datasliceRunListResponse = client.DataSliceRuns.List(
            resourceGroupName,
            dataFactoryName,
            Dataset_Destination,
            new DataSliceRunListParameters()
            {
                DataSliceStartTime = PipelineActivePeriodStartTime.ConvertToISO8601DateTimeString()
            }
        );
    
    foreach (DataSliceRun run in datasliceRunListResponse.DataSliceRuns)
    {
        Console.WriteLine("Status: \t\t{0}", run.Status);
        Console.WriteLine("DataSliceStart: \t{0}", run.DataSliceStart);
        Console.WriteLine("DataSliceEnd: \t\t{0}", run.DataSliceEnd);
        Console.WriteLine("ActivityId: \t\t{0}", run.ActivityName);
        Console.WriteLine("ProcessingStartTime: \t{0}", run.ProcessingStartTime);
        Console.WriteLine("ProcessingEndTime: \t{0}", run.ProcessingEndTime);
        Console.WriteLine("ErrorMessage: \t{0}", run.ErrorMessage);
    }
    
    Console.WriteLine("\nPress any key to exit.");
    Console.ReadKey();
    
  14. Add the following helper method used by the Main method to the Program class.

    Note

    When you copy and paste the following code, make sure that the copied code is at the same level as the Main method.

    public static async Task<string> GetAuthorizationHeader()
    {
        AuthenticationContext context = new AuthenticationContext(ConfigurationManager.AppSettings["ActiveDirectoryEndpoint"] + ConfigurationManager.AppSettings["ActiveDirectoryTenantId"]);
        ClientCredential credential = new ClientCredential(
            ConfigurationManager.AppSettings["ApplicationId"],
            ConfigurationManager.AppSettings["Password"]);
        AuthenticationResult result = await context.AcquireTokenAsync(
            resource: ConfigurationManager.AppSettings["WindowsManagementUri"],
            clientCredential: credential);
    
        if (result != null)
            return result.AccessToken;
    
        throw new InvalidOperationException("Failed to acquire token");
    }
    
  15. In the Solution Explorer, expand the project (DataFactoryAPITestApp), right-click References, and click Add Reference. Select check box for System.Configuration assembly. and click OK.

  16. Build the console application. Click Build on the menu and click Build Solution.
  17. Confirm that there is at least one file in the adftutorial container in your Azure blob storage. If not, create Emp.txt file in Notepad with the following content and upload it to the adftutorial container.

    John, Doe
    Jane, Doe
    
  18. Run the sample by clicking Debug -> Start Debugging on the menu. When you see the Getting run details of a data slice, wait for a few minutes, and press ENTER.
  19. Use the Azure portal to verify that the data factory APITutorialFactory is created with the following artifacts:
    • Linked service: LinkedService_AzureStorage
    • Dataset: InputDataset and OutputDataset.
    • Pipeline: PipelineBlobSample
  20. Verify that the two employee records are created in the emp table in the specified Azure SQL database.

Next steps

For complete documentation on .NET API for Data Factory, see Data Factory .NET API Reference.

In this tutorial, you used Azure blob storage as a source data store and an Azure SQL database as a destination data store in a copy operation. The following table provides a list of data stores supported as sources and destinations by the copy activity:

Category Data store Supported as a source Supported as a sink
Azure Azure Blob storage
  Azure Cosmos DB (DocumentDB API)
  Azure Data Lake Store
  Azure SQL Database
  Azure SQL Data Warehouse
  Azure Search Index
  Azure Table storage
Databases Amazon Redshift
  DB2*
  MySQL*
  Oracle*
  PostgreSQL*
  SAP Business Warehouse*
  SAP HANA*
  SQL Server*
  Sybase*
  Teradata*
NoSQL Cassandra*
  MongoDB*
File Amazon S3
  File System*
  FTP
  HDFS*
  SFTP
Others Generic HTTP
  Generic OData
  Generic ODBC*
  Salesforce
  Web Table (table from HTML)
  GE Historian*

To learn about how to copy data to/from a data store, click the link for the data store in the table.