Verzweigen und Verketten von Aktivitäten in einer Data Factory-Pipeline

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

In diesem Tutorial erstellen Sie eine Data Factory-Pipeline, die einige Ablaufsteuerungsfunktionen vorstellt. Diese Pipeline führt eine Kopieraktivität aus einem Container in Azure Blob Storage in einen anderen Container im selben Speicherkonto durch. Wenn die Kopieraktivität erfolgreich ist, versendet die Pipeline eine E-Mail mit Details zum Kopiervorgang. Dies umfasst beispielsweise die Menge der geschriebenen Daten. Wenn die Kopieraktivität nicht erfolgreich ist, versendet die Pipeline eine E-Mail mit Fehlerdetails, etwa der Fehlermeldung. In diesem Tutorial erfahren Sie, wie Sie Parameter übergeben.

Diese Grafik bietet einen Überblick über das Szenario:

Diagram shows Azure Blob Storage, which is the target of a copy, which, on success, sends an email with details or, on failure, sends an email with error details.

Dieses Tutorial beschreibt, wie die folgenden Aufgaben ausgeführt werden:

  • Erstellen einer Data Factory
  • Erstellen eines verknüpften Azure Storage-Diensts
  • Erstellen eines Azure-Blobdatasets.
  • Erstellen einer Pipeline, die eine Kopieraktivität und eine Webaktivität enthält.
  • Senden von Aktivitätsausgaben an nachfolgende Aktivitäten.
  • Verwenden von Parameterübergabe und Systemvariablen
  • Starten einer Pipelineausführung.
  • Überwachen der Pipeline- und Aktivitätsausführungen.

Dieses Tutorial verwendet .NET SDK. Sie können andere Mechanismen zur Interaktion mit Azure Data Factory verwenden. Data Factory-Schnellstarts finden Sie unter Schnellstarts.

Wenn Sie kein Azure-Abonnement besitzen, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.

Voraussetzungen

  • Azure Storage-Konto. Sie verwenden Blob Storage als Quelldatenspeicher. Falls Sie noch kein Azure-Speicherkonto haben, lesen Sie den Artikel Erstellen eines Speicherkontos.
  • Azure Storage-Explorer. Informationen zum Installieren des Tools finden Sie unter Azure Storage-Explorer.
  • Azure SQL-Datenbank. Sie verwenden die Datenbank als Senkendatenspeicher. Wenn Sie in Azure SQL-Datenbank keine Datenbank haben, lesen Sie Erstellen einer Datenbank in Azure SQL-Datenbank.
  • Visual Studio. In diesem Artikel wird Visual Studio 2019 verwendet.
  • Azure .NET SDK. Laden Sie das Azure .NET SDK herunter, und installieren Sie es.

Eine Liste der Azure-Regionen, in denen Data Factory derzeit verfügbar ist, finden Sie unter Verfügbare Produkte nach Region. Die Datenspeicher und Computeressourcen befinden sich unter Umständen in anderen Regionen. Die Speicher umfassen Azure Storage und Azure SQL-Datenbank. Die Computeressourcen umfassen HDInsight (von Data Factory verwendet).

Erstellen Sie eine Anwendung, wie in "Erstellen einer Microsoft Entra-Anwendung" beschrieben. Weisen Sie die Anwendung der Rolle Mitwirkender zu, indem Sie die Anweisungen im gleichen Artikel befolgen. Später in diesem Tutorial benötigen Sie verschiedene Werte, wie z. B. Anwendungs-ID (Client) und Verzeichnis-ID (Mandant) .

Erstellen einer Blobtabelle

  1. Öffnen Sie einen Text-Editor. Kopieren Sie den folgenden Text, und speichern Sie ihn lokal unter dem Namen input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. Öffnen Sie den Azure Storage-Explorer. Erweitern Sie Ihr Speicherkonto. Klicken Sie mit der rechten Maustaste auf Blobcontainer, und wählen Sie Blobcontainer erstellen aus.

  3. Geben Sie dem neuen Container den Namen adfv2branch, und wählen Sie Hochladen aus, um die Datei input.txt zum Container hinzuzufügen.

Erstellen eines Visual Studio-Projekts

Erstellen Sie eine C# .NET-Konsolenanwendung:

  1. Starten Sie Visual Studio, und wählen Sie Neues Projekt erstellen aus.
  2. Wählen Sie unter Neues Projekt erstellen die Option Konsolen-App (.NET Framework) für C# und dann Weiter aus.
  3. Nennen Sie das Projekt ADFv2BranchTutorial.
  4. Wählen Sie .NET Version 4.5.2 oder höher aus, und wählen Sie dann Erstellen aus.

Installieren von NuGet-Paketen

  1. Klicken Sie auf Extras>NuGet-Paket-Manager>Paket-Manager-Konsole.

  2. Führen Sie in der Paket-Manager-Konsole die folgenden Befehle zum Installieren von Paketen aus. Ausführliche Informationen finden Sie auf der Seite zum Microsoft.Azure.Management.DataFactory-NuGet-Paket.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Erstellen eines Data Factory-Clients

  1. Öffnen Sie Program.cs, und fügen Sie die folgenden Anweisungen hinzu:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Fügen Sie diese statischen Variablen zur Klasse Program hinzu. Ersetzen Sie Platzhalter durch Ihre eigenen Werte.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Fügen Sie der Main -Methode den folgenden Code hinzu. Dieser Code erstellt eine Instanz der Klasse DataFactoryManagementClient. Sie verwenden dieses Objekt anschließend, um eine Data Factory, einen verknüpften Dienst, Datasets und eine Pipeline zu erstellen. Sie können dieses Objekt auch zum Überwachen der Ausführungsdetails der Pipeline verwenden.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Erstellen einer Data Factory

  1. Fügen Sie der Datei Program.cs eine CreateOrUpdateDataFactory-Methode hinzu:

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Fügen Sie der Methode Main die folgenden Codezeile hinzu, die eine Data Factory erstellt:

    Factory df = CreateOrUpdateDataFactory(client);
    

Erstellen eines verknüpften Azure Storage-Diensts

  1. Fügen Sie der Datei Program.cs eine StorageLinkedServiceDefinition-Methode hinzu:

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Fügen Sie der Methode Main die folgende Codezeile hinzu, die einen verknüpften Azure Storage-Dienst erstellt:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Weitere Informationen zu unterstützten Eigenschaften und Details finden Sie unter Eigenschaften des verknüpften Diensts.

Erstellen von Datasets

In diesem Abschnitt erstellen Sie zwei Datasets: eines für die Quelle und eines für die Senke.

Erstellen eines Datasets für ein Azure-Quellblob

Fügen Sie eine Methode hinzu, die ein Azure-Blobdataset erstellt. Weitere Informationen zu unterstützten Eigenschaften und Details finden Sie unter Dataset-Eigenschaften.

Fügen Sie der Datei Program.cs eine SourceBlobDatasetDefinition-Methode hinzu:

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Sie definieren ein Dataset, das die Quelldaten im Azure-Blob darstellt. Dieses Blobdataset verweist auf den verknüpften Azure Storage-Dienst, den Sie im vorherigen Schritt unterstützt haben. Das Blobdataset beschreibt den Speicherort des Blobs, aus dem kopiert werden soll: FolderPath (Ordnerpfad) und FileName (Dateiname).

Beachten Sie die Verwendung von Parametern für FolderPath. sourceBlobContainer ist der Name des Parameters, und der Ausdruck wird durch die Werte ersetzt, die während der Pipelineausführung übergeben werden. Die Syntax zum Definieren von Parametern ist @pipeline().parameters.<parameterName>.

Erstellen eines Datasets für ein Azure-Senkenblob

  1. Fügen Sie der Datei Program.cs eine SourceBlobDatasetDefinition-Methode hinzu:

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Fügen Sie der Methode Main den folgenden Code hinzu, der sowohl Azure Blob-Quelldatasets als auch -Senkendatasets erstellt.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

Erstellen einer C#-Klasse: EmailRequest

Erstellen Sie in Ihrem C#-Projekt eine Klasse mit dem Namen EmailRequest. Diese Klasse definiert, welche Eigenschaften die Pipeline in der Textanforderung beim Versenden einer E-Mail sendet. In diesem Tutorial sendet die Pipeline vier Eigenschaften von der Pipeline an die E-Mail:

  • Message (Nachricht): Der Text der E-Mail. Bei einer erfolgreichen Kopie enthält diese Eigenschaft die Menge der geschriebenen Daten. Bei einer fehlerhaften Kopie enthält diese Eigenschaft die Fehlerdetails.
  • Data Factory-Name. Der Name der Data Factory
  • Pipelinename. Name der Pipeline.
  • Empfänger. Der Parameter, der übergeben wird. Diese Eigenschaft gibt den Empfänger der E-Mail an.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

Erstellen von E-Mail-Workflow-Endpunkten

Um das Senden einer E-Mail auszulösen, definieren Sie den Workflow mithilfe von Azure Logic Apps. Weitere Informationen finden Sie unter Erstellen eines verbrauchsbasierten Logik-App-Beispielworkflows.

Erfolgs-E-Mail-Workflow

Erstellen Sie im Microsoft Azure-Portal einen leeren Logik-App-Workflow namens CopySuccessEmail. Wählen Sie den Anforderungstrigger namens Beim Empfang einer HTTP-Anforderung aus. Füllen Sie im Anforderungstrigger das Feld JSON-Schema für Anforderungstext mit dem folgenden JSON-Code aus:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

Ihr Workflow sollte in etwa wie im folgenden Beispiel aussehen:

Success email workflow

Dieser JSON-Inhalt entspricht der Klasse EmailRequest, die Sie im vorherigen Abschnitt erstellt haben.

Fügen Sie die Office 365 Outlook-Aktion namens E-Mail senden hinzu. Passen Sie für die Aktion die Formatierungseinstellungen der E-Mail an Ihre Bedürfnisse an, indem Sie die Eigenschaften nutzen, die im Text des JSON-Anforderungsschemas übergeben wurden. Hier sehen Sie ein Beispiel:

Workflow designer with the action named Send an email.

Nachdem Sie den Workflow gespeichert haben, kopieren Sie den Wert HTTP-POST-URL aus dem Trigger, und speichern Sie ihn.

Fehler-E-Mail-Workflow

Klonen Sie den CopySuccessEmail-Logik-App-Workflow in einen neuen Workflow namens CopyFailEmail. Im Anforderungstrigger ist das JSON-Schema des Anforderungstexts identisch. Ändern Sie das Format Ihrer E-Mail (beispielsweise Subject), um eine E-Mail für einen nicht erfolgreichen Vorgang zu erhalten. Beispiel:

Workflow designer and the fail email workflow.

Nachdem Sie den Workflow gespeichert haben, kopieren Sie den Wert HTTP-POST-URL aus dem Trigger, und speichern Sie ihn.

Sie verfügen nun über zwei Workflow-URLs, wie in den folgenden Beispielen gezeigt:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Erstellen einer Pipeline

Kehren Sie zu Ihrem Projekt in Visual Studio zurück. Wir fügen nun den Code hinzu, mit dem eine Pipeline mit einer Kopieraktivität und der Eigenschaft DependsOn erstellt wird. In diesem Tutorial enthält die Pipeline eine einzige Aktivität: die Kopieraktivität, die das Blobdataset als Quelle und ein anderes Blobdataset als Senke umfasst. Je nachdem, ob die Kopieraktivität erfolgreich ist oder ein Fehler auftritt, ruft sie unterschiedliche E-Mail-Aufgaben ab.

In dieser Pipeline verwenden Sie die folgenden Funktionen:

  • Parameter
  • Webaktivität
  • Aktivitätsabhängigkeit
  • Verwenden der Ausgabe aus einer Aktivität als eine Eingabe für eine andere Aktivität
  1. Fügen Sie diese Methode zu Ihrem Projekt hinzu. In den folgenden Abschnitten finden Sie weitere Informationen.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. Fügen Sie der Methode Main die folgenden Codezeile hinzu, die die Pipeline erstellt:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parameter

Der erste Abschnitt unseres Pipelinecodes definiert Parameter.

  • sourceBlobContainer. Das Quell-Blobdataset verwendet diesen Parameter in der Pipeline.
  • sinkBlobContainer. Das Senken-Blobdataset verwendet diesen Parameter in der Pipeline.
  • receiver. Die beiden Webaktivitäten in der Pipeline, die Erfolgs- oder Fehlermeldungen an den Empfänger senden, verwenden diesen Parameter.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Webaktivität

Die Webaktivität ermöglicht einen Aufruf an jeden beliebigen REST-Endpunkt. Weitere Informationen zur Aktivität finden Sie unter Webaktivität in Azure Data Factory. Die Pipeline verwendet eine Webaktivität, um den E-Mail-Workflow von Logic Apps aufzurufen. Sie erstellen zwei Webaktivitäten: eine, die Aufrufe an den Workflow CopySuccessEmail sendet, und eine, die CopyFailWorkFlow aufruft.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

Fügen Sie in der Url-Eigenschaft die HTTP-POST-URL-Endpunkte aus Ihrem Logic Apps-Workflow ein. Erstellen Sie in der Eigenschaft Body eine Instanz der Klasse EmailRequest. Die E-Mail-Anforderung enthält die folgenden Eigenschaften:

  • Message (Nachricht): Übergibt den Wert von @{activity('CopyBlobtoBlob').output.dataWritten. Greift auf eine Eigenschaft der vorherigen Kopieraktivität zu, und übergibt den Wert von dataWritten. Für den Fehlerfall, übergeben Sie die Fehlerausgabe anstelle von @{activity('CopyBlobtoBlob').error.message.
  • Data Factory-Name. Übergibt den Wert von @{pipeline().DataFactory}. Diese Systemvariable ermöglicht Ihnen den Zugriff auf den Namen der entsprechenden Data Factory. Eine Liste der Systemvariablen finden Sie unter Systemvariablen.
  • Pipelinename. Übergibt den Wert von @{pipeline().Pipeline}. Diese Systemvariable ermöglicht Ihnen den Zugriff auf den entsprechenden Pipelinenamen.
  • Empfänger. Übergibt den Wert von "@pipeline().parameters.receiver". Greift auf die Pipelineparameter zu.

Dieser Code erstellt eine neue Aktivitätsabhängigkeit, die von der vorherigen Kopieraktivität abhängt.

Erstellen einer Pipelineausführung

Fügen Sie der Methode Main den folgenden Code hinzu, der eine Pipelineausführung auslöst.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Main-Klasse

Die endgültige Methode Main sollte wie folgt aussehen.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Erstellen Sie Ihr Programm und führen Sie es aus, um die Ausführung einer Pipeline auszulösen!

Überwachen einer Pipelineausführung

  1. Fügen Sie den folgenden Code in die Methode Main ein:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Dieser Code überprüft kontinuierlich den Status der Ausführung, bis das Kopieren der Daten beendet ist.

  2. Fügen Sie der Methode Main den folgenden Code hinzu, der Ausführungsdetails zur Kopieraktivität abruft, wie z. B. die Größe der gelesenen/geschriebenen Daten:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Ausführen des Codes

Erstellen und starten Sie die Anwendung, und überprüfen Sie dann die Pipelineausführung.

Die Anwendung zeigt den Fortschritt beim Erstellen der Data Factory, des verknüpften Diensts, der Datasets, der Pipeline und der Pipelineausführung an. Danach wird der Status der Pipelineausführung überprüft. Warten Sie, bis Sie die Ausführungsdetails der Kopieraktivität mit der Größe der gelesenen/geschriebenen Daten sehen. Verwenden Sie anschließend Tools wie z. B. Azure Storage-Explorer, um zu überprüfen, ob das Blob wie von Ihnen in den Variablen angegeben von inputBlobPath nach outputBlobPath kopiert wurde.

Die Ausgabe sollte wie im folgenden Beispiel aussehen:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

In diesem Tutorial haben Sie folgende Aufgaben ausgeführt:

  • Erstellen einer Data Factory
  • Erstellen eines verknüpften Azure Storage-Diensts
  • Erstellen eines Azure-Blobdatasets.
  • Erstellen einer Pipeline, die eine Kopieraktivität und eine Webaktivität enthält.
  • Senden von Aktivitätsausgaben an nachfolgende Aktivitäten.
  • Verwenden von Parameterübergabe und Systemvariablen
  • Starten einer Pipelineausführung.
  • Überwachen der Pipeline- und Aktivitätsausführungen.

Sie können jetzt mit dem Abschnitt „Konzepte“ fortfahren, um weitere Informationen zu Azure Data Factory zu erhalten.