Deltaformat in Azure Data Factory

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

In diesem Artikel erfahren Sie, wie Sie Daten unter Verwendung des Deltaformats in eine bzw. aus einer Delta Lake-Instanz kopieren, die in Azure Data Lake Store Gen2 oder in Azure Blob Storage gespeichert ist. Dieser Connector ist als Inlinedataset in Zuordnungsdatenflüssen sowohl als Quelle als auch als Senke verfügbar.

Eigenschaften von Mapping Data Flow

Dieser Connector ist als Inlinedataset in Zuordnungsdatenflüssen sowohl als Quelle als auch als Senke verfügbar.

Quelleigenschaften

Die folgende Tabelle enthält die von einer Deltaquelle unterstützten Eigenschaften. Sie können diese Eigenschaften auf der Registerkarte Quelloptionen bearbeiten.

Name BESCHREIBUNG Erforderlich Zulässige Werte Datenflussskript-Eigenschaft
Format Das Format muss delta sein ja delta format
Dateisystem Der Container bzw. das Dateisystem der Delta Lake-Instanz. ja String fileSystem
Ordnerpfad Das Verzeichnis des Deltasees ja String folderPath
Komprimierungstyp Der Komprimierungstyp der Deltatabelle. nein bzip2
gzip
deflate
ZipDeflate
snappy
lz4
compressionType
Komprimierungsgrad Wählen Sie aus, ob die Komprimierung schnellstmöglich abgeschlossen oder die resultierende Datei optimal komprimiert werden soll. Erforderlich, wenn compressedType angegeben wird. Optimal oder Fastest compressionLevel
Zeitreise Wählen Sie aus, ob eine ältere Momentaufnahme einer Deltatabelle abgefragt werden soll. Nein Abfrage nach Zeitstempel: Zeitstempel
Abfragen nach Version: Integer
timestampAsOf
versionAsOf
Finden keiner Dateien zulässig Falls TRUE, wird kein Fehler ausgelöst, wenn keine Dateien gefunden werden. nein true oder false ignoreNoFilesFound

Schema importieren

Delta ist nur als Inlinedataset verfügbar und verfügt standardmäßig über kein zugeordnetes Schema. Zum Abrufen von Spaltenmetadaten klicken Sie auf der Registerkarte Projektion auf die Schaltfläche Schema importieren. Dadurch können Sie auf die Spaltennamen und Datentypen verweisen, die vom Korpus angegeben sind. Zum Importieren des Schemas muss eine Debugsitzung für den Datenfluss aktiv sein, und es muss eine CDM-Entitätsdefinitionsdatei vorhanden sein, auf die verwiesen werden kann.

Skriptbeispiel für Deltaquelle

source(output(movieId as integer,
            title as string,
            releaseDate as date,
            rated as boolean,
            screenedOn as timestamp,
            ticketPrice as decimal(10,2)
            ),
    store: 'local',
    format: 'delta',
    versionAsOf: 0,
    allowSchemaDrift: false,
    folderPath: $tempPath + '/delta'
  ) ~> movies

Senkeneigenschaften

Die folgende Tabelle enthält die von einer Deltasenke unterstützten Eigenschaften. Sie können diese Eigenschaften auf der Registerkarte Einstellungen bearbeiten.

Name BESCHREIBUNG Erforderlich Zulässige Werte Datenflussskript-Eigenschaft
Format Das Format muss delta sein ja delta format
Dateisystem Der Container bzw. das Dateisystem der Delta Lake-Instanz. ja String fileSystem
Ordnerpfad Das Verzeichnis des Deltasees ja String folderPath
Komprimierungstyp Der Komprimierungstyp der Deltatabelle. nein bzip2
gzip
deflate
ZipDeflate
snappy
lz4
TarGZip
tar
compressionType
Komprimierungsgrad Wählen Sie aus, ob die Komprimierung schnellstmöglich abgeschlossen oder die resultierende Datei optimal komprimiert werden soll. Erforderlich, wenn compressedType angegeben wird. Optimal oder Fastest compressionLevel
Vakuum Löscht Dateien, die älter als die angegebene Dauer und für die aktuelle Tabellenversion nicht mehr relevant sind. Bei Angabe eines Werts kleiner oder gleich 0 erfolgt der Bereinigungsvorgang nicht. ja Integer vacuum
Aktion table Informiert ADF, was mit der Ziel-Deltatabelle in Ihrer Senke zu tun ist. Sie können die Tabelle unverändert lassen und neue Zeilen anfügen, die vorhandene Tabellendefinition und -daten mit neuen Metadaten und Daten überschreiben oder die vorhandene Tabellenstruktur beibehalten, aber zuerst alle Zeilen abschneiden und dann die neuen Zeilen einfügen. Nein Keine, Abschneiden, Überschreiben Differenz abschneiden, Überschreiben
Updatemethode Wenn Sie allein „Einfügen zulassen“ auswählen oder in eine neue Deltatabelle schreiben, empfängt das Ziel alle eingehenden Zeilen, und zwar unabhängig von den festgelegten Zeilenrichtlinien. Wenn Ihre Daten Zeilen mit anderen Zeilenrichtlinien enthalten, müssen diese mit einer vorhergehenden Filtertransformation ausgeschlossen werden.

Wenn alle Update-Methoden ausgewählt sind, erfolgt eine Zusammenführung, bei der Zeilen gemäß den festgelegten Richtlinien für Zeilen eingefügt/gelöscht/aktualisiert und eingefügt/aktualisiert werden, und zwar mithilfe einer vorhergehenden Alter Row-Transformation.
ja true oder false insertable
deletable
upsertable
updateable
Optimierter Schreibvorgang Erzielen Sie einen höheren Durchsatz für den Schreibvorgang, indem Sie das interne Mischen in Spark-Executors optimieren. Dies führt ggf. zu weniger Partitionen und größeren Dateien. nein true oder false optimizedWrite: true
Automatisches Komprimieren Nach dem Abschließen der einzelnen Schreibvorgänge führt Spark automatisch den Befehl OPTIMIZE aus, um die Daten neu zu organisieren. Dies führt bei Bedarf zu weiteren Partitionen, um künftig eine bessere Leseleistung zu erzielen. nein true oder false autoCompact: true

Skriptbeispiel für Deltasenke

Das zugehörige Datenflussskript ist:

moviesAltered sink(
          input(movieId as integer,
                title as string
            ),
           mapColumn(
                movieId,
                title
            ),
           insertable: true,
           updateable: true,
           deletable: true,
           upsertable: false,
           keys: ['movieId'],
            store: 'local',
           format: 'delta',
           vacuum: 180,
           folderPath: $tempPath + '/delta'
           ) ~> movieDB

Deltasenke mit Partitionsbereinigung

Mit dieser Option unter der Updatemethode oben (also update/upsert/delete) können Sie die Anzahl der Partitionen einschränken, die überprüft werden. Nur Partitionen, die diese Bedingung erfüllen, werden vom Zielspeicher abgerufen. Sie können feste Werte angeben, die eine Partitionsspalte annehmen kann.

Screenshot of partition pruning options are available to limit the inspection.

Beispielskript für eine Deltasenke mit Partitionsbereinigung

Ein Beispielskript finden Sie unten.

DerivedColumn1 sink( 
      input(movieId as integer,
            title as string
           ), 
      allowSchemaDrift: true,
      validateSchema: false,
      format: 'delta',
      container: 'deltaContainer',
      folderPath: 'deltaPath',
      mergeSchema: false,
      autoCompact: false,
      optimizedWrite: false,
      vacuum: 0,
      deletable:false,
      insertable:true,
      updateable:true,
      upsertable:false,
      keys:['movieId'],
      pruneCondition:['part_col' -> ([5, 8])],
      skipDuplicateMapInputs: true,
      skipDuplicateMapOutputs: true) ~> sink2
 

Delta liest nur zwei Partitionen aus dem Zieldeltaspeicher, für die part_col == 5 und 8 gilt, und nicht alle Partitionen. part_col ist eine Spalte, nach der die Zieldeltadaten partitioniert werden. Sie muss in den Quelldaten nicht enthalten sein.

Optionen zur Deltasenkenoptimierung

Auf der Registerkarte „Einstellungen“ finden Sie drei weitere Optionen zum Optimieren der Deltasenkentransformation.

  • Wenn die Option Schema zusammenführen aktiviert ist, ermöglicht sie eine Schemaweiterentwicklung, d. h. alle Spalten, die im aktuellen eingehenden Datenstrom, aber nicht in der Deltazieltabelle vorhanden sind, werden automatisch deren Schema hinzugefügt. Diese Option wird für alle Updatemethoden unterstützt.

  • Wenn Automatisch komprimieren aktiviert ist, wird bei der Transformation nach einem einzelnen Schreibvorgang überprüft, ob Dateien weiter komprimiert werden können. Zudem wird ein schneller OPTIMIZE-Auftrag (mit Dateigrößen von 128 MB anstelle von 1 GB) ausgeführt, um Dateien für Partitionen mit der höchsten Anzahl kleiner Dateien weiter zu komprimieren. Die automatische Komprimierung hilft bei der Zusammenführung einer großen Anzahl kleiner Dateien in eine kleinere Anzahl großer Dateien. Die automatische Komprimierung erfolgt nur, wenn mindestens 50 Dateien vorhanden sind. Nachdem ein Komprimierungsvorgang ausgeführt wurde, wird eine neue Version der Tabelle erstellt und eine neue Datei geschrieben, die die Daten mehrerer früherer Dateien in einem kompakten, komprimierten Formular enthält.

  • Wenn Schreiben optimieren aktiviert ist, optimiert die Senkentransformation die Partitionsgrößen dynamisch basierend auf den tatsächlichen Daten, indem versucht wird, Dateien mit einer Größe von 128 MB für jede Tabellenpartition zu schreiben. Dies ist eine ungefähre Größe und kann je nach Datasetmerkmalen variieren. Mit der Option „Schreiben optimieren“ wird die Gesamteffizienz der Schreibvorgänge und nachfolgenden Lesevorgänge verbessert. Dabei werden Partitionen so organisiert, dass die Leistung nachfolgender Lesevorgänge verbessert wird.

Tipp

Der optimierte Schreibvorgang wird ihren gesamten ETL-Auftrag verlangsamen, da der Senke den Spark Delta Lake Optimize Befehl ausstellen wird, nachdem Ihre Daten verarbeitet wurden. Es ist empfohlen “Optimierter Schreibzugriff“ sparsam zu benutzen. Wenn Sie beispielsweise über eine stundenweise Datenpipeline verfügen, führen Sie täglich einen Datenfluss mit "Optimierter Schreibzugriff" aus.

Bekannte Einschränkungen

Beim Schreiben in eine Deltasenke wird aufgrund einer bekannten Einschränkung die Anzahl geschriebener Zeilen nicht in der Überwachungsausgabe gezeigt.