Share via


Inkrementelles Laden von Daten aus Azure SQL-Datenbank in Azure Blob Storage mit Informationen der Änderungsnachverfolgung und PowerShell

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

In diesem Tutorial erstellen Sie eine Azure Data Factory mit einer Pipeline, die Deltadaten basierend auf Informationen der Änderungsnachverfolgung in der Quelldatenbank in Azure SQL-Datenbank als Quelle in Azure Blob Storage lädt.

In diesem Tutorial führen Sie die folgenden Schritte aus:

  • Vorbereiten des Quelldatenspeichers
  • Erstellen einer Data Factory.
  • Erstellen Sie verknüpfte Dienste.
  • Erstellen von Datasets für Quelle, Senke und Änderungsnachverfolgung
  • Erstellen, Ausführen und Überwachen der vollständigen Kopierpipeline
  • Hinzufügen oder Aktualisieren von Daten in der Quelltabelle
  • Erstellen, Ausführen und Überwachen der inkrementellen Kopierpipeline

Hinweis

Es wird empfohlen, das Azure Az PowerShell-Modul für die Interaktion mit Azure zu verwenden. Informationen zu den ersten Schritten finden Sie unter Installieren des Azure Az PowerShell-Moduls. Informationen zum Migrieren zum Az PowerShell-Modul finden Sie unter Migrieren von Azure PowerShell von AzureRM zum Az-Modul.

Übersicht

In einer Datenintegrationslösung ist das inkrementelle Laden von Daten nach anfänglichen vollständigen Ladevorgängen ein häufig verwendetes Szenario. In einigen Fällen können die geänderten Daten für einen Zeitraum Ihres Quelldatenspeichers leicht aufgeteilt werden (z.B. LastModifyTime, CreationTime). Manchmal gibt es keine explizite Möglichkeit, die Deltadaten seit der letzten Verarbeitung der Daten zu identifizieren. Die Technologie für die Änderungsnachverfolgung, die von Datenspeichern unterstützt wird, z.B. Azure SQL-Datenbank und SQL Server, kann zum Identifizieren der Deltadaten verwendet werden. In diesem Tutorial wird beschrieben, wie Sie Azure Data Factory zusammen mit der SQL-Technologie für die Änderungsnachverfolgung nutzen, um Deltadaten inkrementell aus Azure SQL-Datenbank in Azure Blob Storage zu laden. Konkretere Informationen zur SQL-Technologie für die Änderungsnachverfolgung finden Sie unter Informationen zur Änderungsnachverfolgung (SQL Server).

Kompletter Workflow

Hier sind die Schritte des typischen End-to-End-Workflows zum inkrementellen Laden von Daten per Technologie für die Änderungsnachverfolgung angegeben.

Hinweis

Diese Technologie wird sowohl von Azure SQL-Datenbank als auch von SQL Server unterstützt. In diesem Tutorial wird Azure SQL-Datenbank als Quelldatenspeicher verwendet. Sie können auch eine SQL Server-Instanz verwenden.

  1. Anfängliches Laden von Verlaufsdaten (einmal ausführen):
    1. Aktivieren Sie die Technologie für die Änderungsnachverfolgung in der Quelldatenbank in Azure SQL-Datenbank.
    2. Rufen Sie den Anfangswert von SYS_CHANGE_VERSION in der Datenbank als Baseline zum Erfassen von geänderten Daten ab.
    3. Laden Sie die vollständigen Daten aus der Quelldatenbank in Azure Blob Storage.
  2. Inkrementelles Laden von Delta-Daten in einem Zeitplan (wird regelmäßig nach dem anfänglichen Laden von Daten ausgeführt):
    1. Rufen Sie die alten und neuen SYS_CHANGE_VERSION-Werte ab.
    2. Laden Sie die Deltadaten, indem Sie die Primärschlüssel von geänderten Zeilen (zwischen zwei SYS_CHANGE_VERSION-Werten) aus sys.change_tracking_tables mit Daten in der Quelltabelle verknüpfen, und verschieben Sie die Deltadaten dann auf das Ziel.
    3. Aktualisieren Sie SYS_CHANGE_VERSION für das nächste Deltaladen.

Allgemeine Lösung

In diesem Tutorial erstellen Sie zwei Pipelines, mit denen die folgenden beiden Vorgänge durchgeführt werden:

  1. Erstes Laden: : Sie erstellen eine Pipeline mit einer Kopieraktivität, bei der die gesamten Daten aus dem Quelldatenspeicher (Azure SQL-Datenbank) in den Zieldatenspeicher (Azure Blob Storage) kopiert werden.

    Full loading of data

  2. Inkrementell laden: Sie erstellen eine Pipeline mit den folgenden Aktivitäten und führen sie regelmäßig aus.

    1. Erstellen Sie zwei Lookup-Aktivitäten, um die alte und neue SYS_CHANGE_VERSION aus Azure SQL-Datenbank abzurufen und an die Kopieraktivität zu übergeben.
    2. Erstellen Sie eine Kopieraktivität, um die eingefügten/aktualisierten/gelöschten Daten zwischen den beiden SYS_CHANGE_VERSION-Werten aus Azure SQL-Datenbank nach Azure Blob Storage zu kopieren.
    3. Erstellen Sie eine Aktivität „Gespeicherte Prozedur“ , um den Wert von SYS_CHANGE_VERSION für die nächste Pipelineausführung zu aktualisieren.

    Increment load flow diagram

Wenn Sie kein Azure-Abonnement besitzen, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.

Voraussetzungen

  • Azure PowerShell. Installieren Sie die aktuellen Azure PowerShell-Module, indem Sie die Anweisungen unter Installieren und Konfigurieren von Azure PowerShell befolgen.
  • Azure SQL-Datenbank. Sie verwenden die Datenbank als den Quell-Datenspeicher. Wenn Sie in Azure SQL-Datenbank noch keine Datenbank haben, lesen Sie den Artikel Erstellen einer Datenbank in Azure SQL-Datenbank. Dort finden Sie die erforderlichen Schritte zum Erstellen einer solchen Datenbank.
  • Azure Storage-Konto. Sie verwenden den Blob Storage als den Senken-Datenspeicher. Wenn Sie kein Azure Storage-Konto besitzen, finden Sie im Artikel Erstellen eines Speicherkontos Schritte zum Erstellen eines solchen Kontos. Erstellen Sie einen Container mit dem Namen Adftutorial.

Erstellen einer Datenquellentabelle in Ihrer Datenbank

  1. Starten Sie SQL Server Management Studio, und stellen Sie eine Verbindung mit SQL-Datenbank her.

  2. Klicken Sie im Server-Explorer mit der rechten Maustaste auf Ihre Datenbank, und wählen Sie Neue Abfrage.

  3. Führen Sie den folgenden SQL-Befehl für Ihre Datenbank aus, um eine Tabelle mit dem Namen data_source_table als Datenquellenspeicher zu erstellen.

    create table data_source_table
    (
        PersonID int NOT NULL,
        Name varchar(255),
        Age int
        PRIMARY KEY (PersonID)
    );
    
    INSERT INTO data_source_table
        (PersonID, Name, Age)
    VALUES
        (1, 'aaaa', 21),
        (2, 'bbbb', 24),
        (3, 'cccc', 20),
        (4, 'dddd', 26),
        (5, 'eeee', 22);
    
    
  4. Aktivieren Sie den Mechanismus für die Änderungsnachverfolgung in Ihrer Datenbank und der Quelltabelle (data_source_table), indem Sie die folgende SQL-Abfrage ausführen:

    Hinweis

    • Ersetzen Sie <your database name> durch den Namen Ihrer Datenbank, in der „data_source_table“ enthalten ist.
    • Im aktuellen Beispiel werden die geänderten Daten zwei Tage lang aufbewahrt. Wenn Sie die geänderten Daten für drei oder mehr Tage laden, sind einige geänderte Daten nicht enthalten. Eine Möglichkeit besteht darin, den Wert von CHANGE_RETENTION zu erhöhen. Alternativ dazu können Sie sicherstellen, dass Ihr Zeitraum für das Laden der geänderten Daten nicht mehr als zwei Tage beträgt. Weitere Informationen finden Sie unter Aktivieren der Änderungsnachverfolgung für eine Datenbank.
    ALTER DATABASE <your database name>
    SET CHANGE_TRACKING = ON  
    (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
    
    ALTER TABLE data_source_table
    ENABLE CHANGE_TRACKING  
    WITH (TRACK_COLUMNS_UPDATED = ON)
    
  5. Erstellen Sie eine neue Tabelle, und speichern Sie „ChangeTracking_version“ mit einem Standardwert, indem Sie die folgende Abfrage ausführen:

    create table table_store_ChangeTracking_version
    (
        TableName varchar(255),
        SYS_CHANGE_VERSION BIGINT,
    );
    
    DECLARE @ChangeTracking_version BIGINT
    SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION();  
    
    INSERT INTO table_store_ChangeTracking_version
    VALUES ('data_source_table', @ChangeTracking_version)
    

    Hinweis

    Wenn sich die Daten nach dem Aktivieren der Änderungsnachverfolgung für SQL-Datenbank nicht geändert haben, lautet der Wert für die Version der Änderungsnachverfolgung „0“.

  6. Führen Sie die folgende Abfrage zum Erstellen einer gespeicherten Prozedur in Ihrer Datenbank aus. Die Pipeline ruft diese gespeicherte Prozedur auf, um die Version der Änderungsnachverfolgung in der Tabelle zu aktualisieren, die Sie im vorherigen Schritt erstellt haben.

    CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50)
    AS
    
    BEGIN
    
    UPDATE table_store_ChangeTracking_version
    SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion
    WHERE [TableName] = @TableName
    
    END    
    

Azure PowerShell

Installieren Sie die aktuellen Azure PowerShell-Module, indem Sie die Anweisungen unter Installieren und Konfigurieren von Azure PowerShell befolgen.

Erstellen einer Data Factory

  1. Definieren Sie eine Variable für den Ressourcengruppennamen zur späteren Verwendung in PowerShell-Befehlen. Kopieren Sie den folgenden Befehlstext nach PowerShell, geben Sie einen Namen für die Azure-Ressourcengruppe in doppelten Anführungszeichen an, und führen Sie dann den Befehl aus. Beispiel: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Beachten Sie, dass die Ressourcengruppe ggf. nicht überschrieben werden soll, falls sie bereits vorhanden ist. Weisen Sie der Variablen $resourceGroupName einen anderen Wert zu, und führen Sie den Befehl erneut aus.

  2. Definieren Sie eine Variable für den Speicherort der Data Factory:

    $location = "East US"
    
  3. Führen Sie den folgenden Befehl aus, um die Azure-Ressourcengruppe zu erstellen:

    New-AzResourceGroup $resourceGroupName $location
    

    Beachten Sie, dass die Ressourcengruppe ggf. nicht überschrieben werden soll, falls sie bereits vorhanden ist. Weisen Sie der Variablen $resourceGroupName einen anderen Wert zu, und führen Sie den Befehl erneut aus.

  4. Definieren Sie eine Variable für den Namen der Data Factory.

    Wichtig

    Aktualisieren Sie den Data Factory-Namen, damit er global eindeutig ist.

    $dataFactoryName = "IncCopyChgTrackingDF";
    
  5. Führen Sie zum Erstellen der Data Factory das Cmdlet Set-AzDataFactoryV2 wie folgt aus:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Beachten Sie folgende Punkte:

  • Der Name der Azure Data Factory muss global eindeutig sein. Wenn die folgende Fehlermeldung angezeigt wird, ändern Sie den Namen, und wiederholen Sie den Vorgang.

    The specified Data Factory name 'ADFIncCopyChangeTrackingTestFactory' is already in use. Data Factory names must be globally unique.
    
  • Damit Sie Data Factory-Instanzen erstellen können, muss das Benutzerkonto, mit dem Sie sich bei Azure anmelden, ein Mitglied der Rolle Mitwirkender oder Besitzer oder ein Administrator des Azure-Abonnements sein.

  • Eine Liste der Azure-Regionen, in denen Data Factory derzeit verfügbar ist, finden Sie, indem Sie die für Sie interessanten Regionen auf der folgenden Seite auswählen und dann Analysen erweitern, um Data Factory zu finden: Verfügbare Produkte nach Region. Die von der Data Factory verwendeten Datenspeicher (Azure Storage, Azure SQL-Datenbank usw.) und Computedienste (HDInsight usw.) können sich in anderen Regionen befinden.

Erstellen von verknüpften Diensten

Um Ihre Datenspeicher und Compute Services mit der Data Factory zu verknüpfen, können Sie verknüpfte Dienste in einer Data Factory erstellen. In diesem Abschnitt erstellen Sie verknüpfte Dienste mit Ihrem Azure Storage-Konto und mit Ihrer Datenbank in Azure SQL-Datenbank.

Erstellen des verknüpften Azure Storage-Diensts.

In diesem Schritt verknüpfen Sie Ihr Azure Storage-Konto mit der Data Factory.

  1. Erstellen Sie eine JSON-Datei mit dem Namen AzureStorageLinkedService.json im Ordner C:\ADFTutorials\IncCopyChangeTrackingTutorial und dem folgenden Inhalt: (Erstellen Sie den Ordner, falls er noch nicht vorhanden ist.) Ersetzen Sie <accountName> und <accountKey> durch den Namen bzw. Schlüssel Ihres Azure Storage-Kontos, bevor Sie die Datei speichern.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. Wechseln Sie in Azure PowerShell zum Ordner C:\ADFTutorials\IncCopyChangeTrackingTutorial.

  3. Führen Sie das Cmdlet Set-AzDataFactoryV2LinkedService aus, um den verknüpften Dienst zu erstellen: AzureStorageLinkedService. Im folgenden Beispiel, übergeben Sie Werte für die ResourceGroupName- und DataFactoryName-Parameter.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Hier ist die Beispielausgabe:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

Erstellen Sie einen Azure SQL-Datenbank -verknüpften Dienst.

In diesem Schritt verknüpfen Sie Ihre Datenbank mit der Data Factory.

  1. Erstellen Sie eine JSON-Datei mit dem Namen AzureSQLDatabaseLinkedService.json im Ordner C:\ADFTutorials\IncCopyChangeTrackingTutorial mit folgendem Inhalt: Ersetzen Sie <server><database name>, <user id> und <password> durch den Namen Ihrer Azure SQL Server-Instanz, den Namen Ihrer Datenbank, die Benutzer-ID und das Kennwort, bevor Sie die Datei speichern.

    {
        "name": "AzureSQLDatabaseLinkedService",
        "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server = tcp:<server>.database.windows.net,1433;Initial Catalog=<database name>; Persist Security Info=False; User ID=<user name>; Password=<password>; MultipleActiveResultSets = False; Encrypt = True; TrustServerCertificate = False; Connection Timeout = 30;"
            }
        }
    }
    
  2. Führen Sie in Azure PowerShell das Cmdlet Set-AzDataFactoryV2LinkedService aus, um den verknüpften Dienst zu erstellen: AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Hier ist die Beispielausgabe:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Erstellen von Datasets

In diesem Schritt erstellen Sie Datasets, die für die Datenquelle, das Datenziel und den Ort zum Speichern der SYS_CHANGE_VERSION stehen.

Erstellen eines Quelldatasets

In diesem Schritt erstellen Sie ein Dataset, das für die Quelldaten steht.

  1. Erstellen Sie eine JSON-Datei mit dem Namen „SourceDataset.json“ im selben Ordner und dem folgenden Inhalt:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    
  2. Führen Sie das Cmdlet „Set-AzDataFactoryV2Dataset“ aus, um das Dataset zu erstellen: SourceDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Hier ist die Beispielausgabe des Cmdlets:

    DatasetName       : SourceDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Erstellen Sie ein Senkendataset

In diesem Schritt erstellen Sie ein Dataset, das für die Daten steht, die aus dem Quelldatenspeicher kopiert werden.

  1. Erstellen Sie eine JSON-Datei mit dem Namen „SinkDataset.json“ im selben Ordner und dem folgenden Inhalt:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incchgtracking",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    Sie erstellen den Container „adftutorial“ in Ihrer Azure Blob Storage-Instanz im Rahmen der Erfüllung der Voraussetzungen. Erstellen Sie den Container, wenn er noch nicht vorhanden ist (oder) geben Sie den Namen eines bereits vorhandenen ein. In diesem Tutorial wird der Name der Ausgabedatei dynamisch generiert, indem der folgende Ausdruck verwendet wird: @CONCAT(('Incremental-', pipeline().RunId, '.txt').

  2. Führen Sie das Cmdlet „Set-AzDataFactoryV2Dataset“ aus, um das Dataset zu erstellen: SinkDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Hier ist die Beispielausgabe des Cmdlets:

    DatasetName       : SinkDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
    

Erstellen eines Datasets für die Änderungsnachverfolgung

In diesem Schritt erstellen Sie ein Dataset zum Speichern der Version für die Änderungsnachverfolgung.

  1. Erstellen Sie in demselben Ordner eine JSON-Datei mit dem Namen „ChangeTrackingDataset.json“ und folgendem Inhalt:

    {
        "name": " ChangeTrackingDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "table_store_ChangeTracking_version"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    Sie erstellen die Tabelle „table_store_ChangeTracking_version“ im Rahmen der Erfüllung der Voraussetzungen.

  2. Führen Sie das Cmdlet „Set-AzDataFactoryV2Dataset“ aus, um das Dataset zu erstellen: ChangeTrackingDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "ChangeTrackingDataset" -File ".\ChangeTrackingDataset.json"
    

    Hier ist die Beispielausgabe des Cmdlets:

    DatasetName       : ChangeTrackingDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Erstellen einer Pipeline für den vollständigen Kopiervorgang

In diesem Schritt erstellen Sie eine Pipeline mit einer Kopieraktivität, bei der die gesamten Daten aus dem Quelldatenspeicher (Azure SQL-Datenbank) in den Zieldatenspeicher (Azure Blob Storage) kopiert werden.

  1. Erstellen Sie die JSON-Datei „FullCopyPipeline.json“ im gleichen Ordner, aber mit folgendem Inhalt:

    {
        "name": "FullCopyPipeline",
        "properties": {
            "activities": [{
                "name": "FullCopyActivity",
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "SqlSource"
                    },
                    "sink": {
                        "type": "BlobSink"
                    }
                },
    
                "inputs": [{
                    "referenceName": "SourceDataset",
                    "type": "DatasetReference"
                }],
                "outputs": [{
                    "referenceName": "SinkDataset",
                    "type": "DatasetReference"
                }]
            }]
        }
    }
    
  2. Führen Sie das Cmdlet „Set-AzDataFactoryV2Pipeline“ aus, um die Pipeline zu erstellen: FullCopyPipeline.

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "FullCopyPipeline" -File ".\FullCopyPipeline.json"
    

    Hier ist die Beispielausgabe:

     PipelineName      : FullCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {FullCopyActivity}
     Parameters        :
    

Ausführen der vollständigen Kopierpipeline

Führen Sie die Pipeline FullCopyPipeline mithilfe des Cmdlets Invoke-AzDataFactoryV2Pipeline aus.

Invoke-AzDataFactoryV2Pipeline -PipelineName "FullCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName        

Überwachen der vollständigen Kopierpipeline

  1. Melden Sie sich beim Azure-Portal an.

  2. Klicken Sie auf Alle Dienste, führen Sie eine Suche mit dem Schlüsselwort data factories durch, und wählen Sie Data Factorys aus.

    Data factories menu

  3. Suchen Sie in der Liste mit den Data Factorys nach Ihrer Data Factory, und wählen Sie sie aus, um die Seite „Data Factory“ anzuzeigen.

    Search for your data factory

  4. Klicken Sie auf der Seite für die Data Factory auf die Kachel Überwachung und Verwaltung.

    Monitor & Manage tile

  5. Die Anwendung für die Datenintegration wird in einer separaten Registerkarte gestartet. Alle Pipelineausführungen mit dem dazugehörigen Status werden angezeigt. Beachten Sie, dass der Status der Pipelineausführung im folgenden Beispiel Erfolgreich lautet. Sie können die an die Pipeline übergebenen Parameter überprüfen, indem Sie in der Spalte Parameter auf den Link klicken. Wenn ein Fehler auftritt, wird in der Spalte Fehler ein Link angezeigt. Klicken Sie in der Spalte Aktionen auf den Link.

    Screenshot shows pipeline runs for a data factory.

  6. Wenn Sie in der Spalte Aktionen auf den Link klicken, wird die folgende Seite mit allen Aktivitätsausführungen der Pipeline angezeigt.

    Screenshot shows activity runs for a data factory with the Pipelines link called out.

  7. Klicken Sie wie in der Abbildung dargestellt auf Pipelines, um zurück zur Ansicht Pipelineausführungen zu wechseln.

Überprüfen der Ergebnisse

Im Ordner incchgtracking des Containers adftutorial wird eine Datei mit dem Namen incremental-<GUID>.txt angezeigt.

Output file from full copy

Die Datei sollte die Daten aus Ihrer Datenbank enthalten:

1,aaaa,21
2,bbbb,24
3,cccc,20
4,dddd,26
5,eeee,22

Hinzufügen von weiteren Daten zur Quelltabelle

Führen Sie die folgende Abfrage für Ihre Datenbank aus, um eine Zeile hinzuzufügen und eine Zeile zu aktualisieren.

INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');


UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1

Erstellen einer Pipeline für die Deltakopie

In diesem Schritt erstellen Sie eine Pipeline mit den folgenden Aktivitäten und führen sie regelmäßig aus. Mit den Lookup-Aktivitäten wird die alte und neue SYS_CHANGE_VERSION aus Azure SQL-Datenbank abgerufen und an die Kopieraktivität übergeben. Die Kopieraktivität kopiert die eingefügten/aktualisierten/gelöschten Daten zwischen den beiden SYS_CHANGE_VERSION-Werten aus Azure SQL-Datenbank nach Azure Blob Storage. Die Aktivität „Gespeicherte Prozedur“ aktualisiert den Wert von SYS_CHANGE_VERSION für die nächste Pipelineausführung.

  1. Erstellen Sie die JSON-Datei „IncrementalCopyPipeline.json“ im gleichen Ordner, aber mit folgendem Inhalt:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupLastChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from table_store_ChangeTracking_version"
                        },
                        "dataset": {
                            "referenceName": "ChangeTrackingDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupCurrentChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion"
                        },
                        "dataset": {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) as CT on data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupLastChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupCurrentChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
                {
                    "name": "StoredProceduretoUpdateChangeTrackingActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
                        "storedProcedureName": "Update_ChangeTracking_Version",
                        "storedProcedureParameters": {
                            "CurrentTrackingVersion": {
                                "value": "@{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}",
                                "type": "INT64"
                            },
                            "TableName": {
                                "value": "@{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}",
                                "type": "String"
                            }
                        }
                    },
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
        }
    }
    
  2. Führen Sie das Cmdlet „Set-AzDataFactoryV2Pipeline“ aus, um die Pipeline zu erstellen: FullCopyPipeline.

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Hier ist die Beispielausgabe:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {LookupLastChangeTrackingVersionActivity, LookupCurrentChangeTrackingVersionActivity, IncrementalCopyActivity, StoredProceduretoUpdateChangeTrackingActivity}
     Parameters        :
    

Ausführen der inkrementellen Kopierpipeline

Führen Sie die Pipeline IncrementalCopyPipeline mithilfe des Cmdlets Invoke-AzDataFactoryV2Pipeline aus.

Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName     

Überwachen der inkrementellen Kopierpipeline

  1. Aktualisieren Sie in der Anwendung für die Datenintegration die Ansicht Pipelineausführungen. Vergewissern Sie sich, dass „IncrementalCopyPipeline“ in der Liste enthalten ist. Klicken Sie in der Spalte Aktionen auf den Link.

    Screenshot shows pipeline runs for a data factory including your pipeline.

  2. Wenn Sie in der Spalte Aktionen auf den Link klicken, wird die folgende Seite mit allen Aktivitätsausführungen der Pipeline angezeigt.

    Screenshot shows pipeline runs for a data factory with several marked as succeeded.

  3. Klicken Sie wie in der Abbildung dargestellt auf Pipelines, um zurück zur Ansicht Pipelineausführungen zu wechseln.

Überprüfen der Ergebnisse

Die zweite Datei ist im Ordner incchgtracking des Containers adftutorial enthalten.

Output file from incremental copy

Die Datei sollte nur die Deltadaten aus Ihrer Datenbank enthalten. Der Datensatz mit der Kennzeichnung U ist die aktualisierte Zeile in der Datenbank, und mit I wird die hinzugefügte Zeile angegeben.

1,update,10,2,U
6,new,50,1,I

Die ersten drei Spalten enthalten geänderte Daten aus „data_source_table“. Die letzten beiden Spalten enthalten die Metadaten aus der Systemtabelle für die Änderungsnachverfolgung. Die vierte Spalte enthält die SYS_CHANGE_VERSION für die einzelnen geänderten Zeilen. Die fünfte Spalte enthält den Vorgang: U = update (aktualisieren), I = insert (einfügen). Weitere Informationen zu den Informationen zur Änderungsnachverfolgung finden Sie unter CHANGETABLE.

==================================================================
PersonID Name    Age    SYS_CHANGE_VERSION    SYS_CHANGE_OPERATION
==================================================================
1        update  10            2                                 U
6        new     50            1                                 I

Im folgenden Tutorial erfahren Sie mehr über das Kopieren von neuen und geänderten Dateien nur auf Grundlage ihres LastModifiedDate-Werts: