Deltakopiervorgänge aus einer Datenbank mit einer Steuertabelle

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

In diesem Artikel wird eine Vorlage zum inkrementellen Laden neuer oder aktualisierter Zeilen aus einer Datenbanktabelle in Azure erläutert, bei der eine externe Steuertabelle verwendet wird, die einen hohen Grenzwert speichert.

Diese Vorlage setzt voraus, dass das Schema der Quelldatenbank eine Zeitstempelspalte oder einen Inkrementierungsschlüssel enthält, um die neuen oder aktualisierten Zeilen identifizieren zu können.

Hinweis

Wenn eine Zeitstempelspalte in der Quelldatenbank vorhanden ist, mit der neue oder aktualisierte Zeilen ermittelt werden sollen, Sie aber keine externe Steuertabelle für den Deltakopiervorgang erstellen möchten, können Sie stattdessen das Azure Data Factory-Tool zum Kopieren von Daten verwenden, um eine Pipeline zu erstellen. Das Tool verwendet einen von einem Trigger geplanten Zeitpunkt als Variable, um neue Zeilen aus der Quelldatenbank zu lesen.

Informationen zu dieser Lösungsvorlage

Diese Vorlage ruft zuerst den alten Grenzwert ab und vergleicht ihn mit dem aktuellen Grenzwert. Auf der Grundlage eines Vergleichs der beiden Grenzwerte werden dann nur die Änderungen aus der Quelldatenbank kopiert. Zuletzt wird der neue hohe Grenzwert in einer externen Steuertabelle gespeichert, damit er beim nächsten Laden von Deltadaten verfügbar ist.

Die Vorlage enthält vier Aktivitäten:

  • Eine Suchaktivität zum Abrufen des alten hohen Grenzwerts, der in einer externen Steuertabelle gespeichert ist.
  • Eine weitere Suchaktivität zum Abrufen des aktuellen hohen Grenzwerts aus der Quelldatenbank.
  • Die Kopieraktivität kopiert nur Änderungen aus der Quelldatenbank in den Zielspeicher. Die Abfrage für das Ermitteln der Änderungen in der Quelldatenbank ähnelt SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > “last high-watermark” und TIMESTAMP_Column <= “current high-watermark”.
  • Eine SqlServerStoredProcedure-Aktivität, um den aktuellen hohen Grenzwert in eine externe Steuertabelle zu schreiben, damit er beim nächsten Deltakopiervorgang zur Verfügung steht.

Die Vorlage definiert die folgenden Parameter:

  • Data_Source_Table_Name entspricht der Tabelle aus der Quelldatenbank, aus der Sie Daten laden möchten.
  • Data_Source_WaterMarkColumn entspricht dem Spaltennamen in der Quelltabelle, der zum Identifizieren der neuen oder aktualisierten Zeilen verwendet werden kann. Der Typ dieser Spalte ist üblicherweise datetime, int oder ein ähnlicher Typ.
  • Data_Destination_Container entspricht dem Stammpfad des Orts, an den die Daten in Ihrem Zielspeicher kopiert werden.
  • Data_Destination_Directory entspricht dem Verzeichnispfad unter dem Stammelement des Orts, an den die Daten in Ihrem Zielspeicher kopiert werden.
  • Data_Destination_Table_Name entspricht dem Ort, an den die Daten in Ihren Zielspeicher kopiert werden (zutreffend, wenn „Azure Synapse Analytics“ als Datenziel ausgewählt ist).
  • Data_Destination_Folder_Path entspricht dem Ort, an den die Daten in Ihren Zielspeicher kopiert werden (zutreffend, wenn "Dateisystem" oder "Azure Data Lake Storage Gen1" als Datenziel ausgewählt ist).
  • Control_Table_Table_Name entspricht der externen Steuertabelle, in der der hohe Grenzwert gespeichert werden soll.
  • Control_Table_Column_Name entspricht der Spalte in der externen Steuertabelle, in der der hohe Grenzwert gespeichert werden soll.

So verwenden Sie diese Lösungsvorlage

  1. Untersuchen Sie die Quelltabelle, die Sie laden möchten, und definieren Sie die Spalte für den hohen Grenzwert, auf deren Grundlage die neuen oder aktualisierten Zeilen identifiziert werden können. Der Typ dieser Spalte kann datetime, int oder ein ähnlicher Typ sein. Der Wert dieser Spalte wird erhöht, wenn neue Zeilen hinzugefügt werden. In der folgenden Beispielquelltabelle (data_source_table) können Sie die Spalte LastModifytime als Spalte für den hohen Grenzwert verwenden.

    PersonID	Name            LastModifytime
    1           aaaa            2017-09-01 00:56:00.000
    2           bbbb            2017-09-02 05:23:00.000
    3           cccc            2017-09-03 02:36:00.000
    4           dddd            2017-09-04 03:21:00.000
    5           eeee            2017-09-05 08:06:00.000
    6           fffffff         2017-09-06 02:23:00.000
    7           gggg            2017-09-07 09:01:00.000
    8           hhhh            2017-09-08 09:01:00.000
    9           iiiiiiiii       2017-09-09 09:01:00.000
    
  2. Erstellen Sie in einer SQL Server- oder Azure SQL-Datenbank-Instanz eine Steuertabelle, um den hohen Grenzwert für das Laden von Deltadaten zu speichern. Im folgenden Beispiel ist der Name der Steuertabelle watermarktable. In der Spalte WatermarkValue vom Typ datetime dieser Tabelle wird der hohe Grenzwert gespeichert.

    create table watermarktable
    (
    WatermarkValue datetime,
    );
    INSERT INTO watermarktable
    VALUES ('1/1/2010 12:00:00 AM')
    
  3. Erstellen Sie eine gespeicherte Prozedur in der gleichen SQL Server- oder Azure SQL-Datenbank-Instanz, mit der Sie auch die Steuertabelle erstellt haben. Die gespeicherte Prozedur dient dazu, den neuen hohen Grenzwert in der externen Steuertabelle zu speichern, damit er beim nächsten Laden von Deltadaten zur Verfügung steht.

    CREATE PROCEDURE update_watermark @LastModifiedtime datetime
    AS
    
    BEGIN
    
        UPDATE watermarktable
        SET [WatermarkValue] = @LastModifiedtime 
    
    END
    
  4. Wechseln Sie zur Vorlage Delta copy from Database (Deltakopie aus einer Datenbank). Stellen Sie eine neue Verbindung mit der Quelldatenbank her, aus der Sie Daten kopieren möchten.

    Screenshot showing the creation of a new connection to the source table.

  5. Stellen Sie eine neue Verbindung mit dem Zieldatenspeicher her, in den Sie die Daten kopieren möchten.

    Screenshot showing the creation of a new connection to the destination table.

  6. Stellen Sie eine neue Verbindung mit der externen Steuertabelle und der gespeicherten Prozedur her, die Sie in Schritt 2 und Schritt 3 erstellt haben.

    Screenshot showing the creation of a new connection to the control table data store.

  7. Klicken Sie auf Diese Vorlage verwenden.

  8. Daraufhin wird die verfügbare Pipeline wie im folgenden Beispiel angezeigt:

    Screenshot showing the pipeline.

  9. Klicken Sie auf Gespeicherte Prozedur. Wählen Sie [dbo].[update_watermark] für Name der gespeicherten Prozedur aus. Klicken Sie auf Import parameter (Importparameter), und wählen Sie Dynamischen Inhalt hinzufügen aus.

    Screenshot showing where to set the stored procedure activity.

  10. Schreiben Sie den Inhalt {activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue}, und klicken Sie dann auf Fertig stellen.

    Screenshot showing where to write the content for the parameters of the stored procedure.

  11. Klicken Sie auf Debuggen, geben Sie die Parameter ein, und klicken Sie dann auf Fertig stellen.

    Screenshot showing the Debug button.

  12. Die angezeigten Ergebnisse entsprechen etwa folgendem Beispiel:

    Sreenshot showing the result of the pipeline run.

  13. Sie können neue Zeilen in Ihrer Quelltabelle erstellen. Hier sehen Sie ein SQL-Beispiel für die Erstellung neuer Zeilen:

    INSERT INTO data_source_table
    VALUES (10, 'newdata','9/10/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (11, 'newdata','9/11/2017 9:01:00 AM')
    
  14. Klicken Sie auf Debuggen, geben Sie die Parameter ein, und klicken Sie dann auf Fertig stellen, um die Pipeline erneut auszuführen.

    Sie sehen, dass nur neue Zeilen in das Ziel kopiert wurden.

  15. (Optional:) Wenn Sie Azure Synapse Analytics als Zielspeicher für die Daten ausgewählt haben, müssen Sie eine Verbindung mit Azure Blob Storage für den Stagingprozess angeben. Dabei handelt es sich um eine Anforderung von Azure Synapse Analytics PolyBase. Die Vorlage generiert einen Containerpfad für Sie. Überprüfen Sie nach Ausführung der Pipeline, ob der Container im Blobspeicher erstellt wurde.

    Screenshot showing where to configure Polybase.