Best Practices: Delta Lake

In diesem Artikel werden best Practices für die Verwendung von Delta Lake beschrieben.

Databricks empfiehlt die Verwendung der prädiktiven Optimierung. Weitere Informationen finden Sie unter Prädiktive Optimierung für Delta Lake.

Wenn Sie eine Tabelle an derselben Stelle löschen und neu erstellen, sollten Sie immer eine CREATE OR REPLACE TABLE-Anweisung verwenden. Siehe Löschen oder Ersetzen einer Delta-Tabelle.

Verwenden von Liquid Clustering für optimierte Datensprünge

Databricks empfiehlt die Verwendung von Liquid Clustering anstelle von Partitionierung, Z-Reihenfolge oder anderen Datenorganisationsstrategien, um das Datenlayout für das Überspringen von Daten zu optimieren. Weitere Informationen finden Sie unter Verwenden von Liquid Clustering für Delta-Tabellen.

Komprimieren von Dateien

Die Predictive Optimization führt automatisch OPTIMIZE- und VACUUM-Befehle in verwalteten Tabellen im Unity-Katalog aus. Weitere Informationen finden Sie unter Prädiktive Optimierung für Delta Lake.

Databricks empfiehlt, häufig den Befehl OPTIMIEREN auszuführen, um kleine Dateien zu komprimieren.

Hinweis

Dieser Vorgang entfernt nicht die alten Dateien. Um sie zu entfernen, führen Sie den Befehl VACUUMaus.

Ersetzen des Inhalts oder Schemas einer Tabelle

Manchmal ist es erforderlich, eine Delta-Tabelle zu ersetzen. Beispiele:

  • Sie stellen fest, dass die Daten in der Tabelle falsch sind und möchten den Inhalt ersetzen.
  • Sie möchten die gesamte Tabelle neu schreiben, um inkompatible Schemaänderungen vorzunehmen, z. B. die Spaltentypen ändern.

Sie können zwar das gesamte Verzeichnis einer Delta-Tabelle löschen und eine neue Tabelle unter demselben Pfad erstellen, aber das wird aus den folgenden Gründen nicht empfohlen:

  • Das Löschen eines Verzeichnisses ist nicht effizient. Das Löschen eines Verzeichnisses mit sehr großen Dateien kann Stunden oder sogar Tage dauern.
  • Sie verlieren sämtliche Inhalte in den gelöschten Dateien. Eine Wiederherstellung ist schwierig, wenn Sie die falsche Tabelle löschen.
  • Das Löschen des Verzeichnisses ist nicht atomisch. Während Sie die Tabelle löschen, kann eine gleichzeitige Abfrage, die die Tabelle liest, fehlschlagen oder eine unvollständige Tabelle anzeigen.

Wenn Sie das Tabellenschema nicht ändern müssen, können Sie Daten aus einer Delta-Tabelle löschen und die neuen Daten einfügen oder die Tabelle aktualisieren, um die falschen Werte zu korrigieren.

Wenn Sie das Tabellenschema ändern möchten, können Sie die gesamte Tabelle atomisch ersetzen. Beispiele:

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Diese Vorgehensweise bietet mehrere Vorteile:

  • Das Überschreiben einer Tabelle ist wesentlich schneller, da sie das Verzeichnis nicht rekursiv auflisten oder Dateien löschen muss.
  • Die alte Version der Tabelle ist weiterhin vorhanden. Wenn Sie die falsche Tabelle löschen, können Sie die alten Daten problemlos unter Verwendung von Zeitreise abrufen. Weitere Informationen finden Sie unter Arbeiten mit dem Delta Lake-Tabellenverlauf.
  • Der Vorgang ist atomisch. Während Sie eine Tabelle löschen, können gleichzeitige Abfragen die Tabelle weiterhin lesen.
  • Wenn das Überschreiben der Tabelle fehlschlägt, bleibt die Tabelle aufgrund der Delta Lake ACID-Transaktionsgarantien in ihrem vorherigen Zustand.

Wenn Sie darüber hinaus alte Dateien löschen möchten, um Speicherkosten nach dem Überschreiben der Tabelle zu sparen, können Sie sie mit VACUUM löschen. Dieser Befehl ist für das Löschen von Dateien optimiert und in der Regel schneller als das Löschen des gesamten Verzeichnisses.

Spark-Zwischenspeicherung

Databricks empfiehlt die Verwendung von Spark-Zwischenspeicherung aus folgenden Gründen nicht:

  • Sie verlieren alle Daten, die von zusätzlichen Filtern, die zusätzlich zu den zwischengespeicherten DataFrame hinzugefügt werden, übersprungen werden können.
  • Die Daten, die zwischengespeichert werden, werden möglicherweise nicht aktualisiert, wenn auf die Tabelle mit einem anderen Bezeichner zugegriffen wird.

Unterschiede zwischen Delta Lake und Parquet auf Apache Spark

Delta Lake verarbeitet die folgenden Vorgänge automatisch. Sie sollten diese Vorgänge niemals manuell ausführen:

  • REFRESH TABLE: Delta-Tabellen geben immer die neuesten Informationen zurück. Ein manueller Aufruf von REFRESH TABLE nach einer Änderung ist daher nicht erforderlich.
  • Hinzufügen und Entfernen von Partitionen: Delta Lake verfolgt die in einer Tabelle vorhandenen Partitionen automatisch und aktualisiert die Liste, sobald Daten hinzugefügt oder entfernt werden. Daher ist die Ausführung von ALTER TABLE [ADD|DROP] PARTITION oder MSCK nicht erforderlich.
  • Laden einer einzelnen Partition: Das direkte Lesen von Partitionen ist nicht erforderlich. Sie müssen beispielsweise spark.read.format("parquet").load("/data/date=2017-01-01") nicht ausführen. Verwenden Sie stattdessen eine WHERE-Klausel zum Überspringen von Daten, etwa spark.read.table("<table-name>").where("date = '2017-01-01'").
  • Ändern Sie Datendateien nicht manuell: Mithilfe des Transaktionsprotokolls führt Delta Lake einen automatischen Commit von Änderungen an der Tabelle aus. Ändern Sie in einer Delta-Tabelle keine Parquet-Datendateien direkt, fügen Sie diese nicht direkt zu dieser Tabelle hinzu oder entfernen Sie sie aus dieser Tabelle, da dies zu Datenverlusten oder einer Beschädigung der Tabelle führen kann.

Verbessern der Leistung für Delta Lake-Zusammenführungen

Sie können die Zeit für die Zusammenführung mit den folgenden Ansätzen reduzieren:

  • Reduzieren des Suchbereichs für Übereinstimmungen: Standardmäßig durchsucht der merge-Vorgang die gesamte Delta-Tabelle nach Übereinstimmungen in der Quelltabelle. Eine Möglichkeit zur Beschleunigung von merge besteht darin, den Suchbereich zu reduzieren, indem bekannte Einschränkungen in der Übereinstimmungsbedingung hinzugefügt werden. Angenommen, Sie haben eine Tabelle, die nach country und date partitioniert ist, und Sie möchten mithilfe von merge Informationen für den letzten Tag und ein bestimmtes Land aktualisieren. Durch Hinzufügen der folgenden Bedingung wird die Abfrage beschleunigt, da sie nur in den relevanten Partitionen nach Übereinstimmungen sucht:

    events.date = current_date() AND events.country = 'USA'
    

    Darüber hinaus verringert diese Abfrage auch die Wahrscheinlichkeit von Konflikten mit anderen gleichzeitigen Vorgängen. Weitere Informationen finden Sie unter Isolationsstufen und Schreibkonflikte in Azure Databricks.

  • Komprimieren von Dateien: Wenn die Daten in vielen kleinen Dateien gespeichert sind, kann das Lesen der Daten für die Suche nach Übereinstimmungen langsam werden. Sie können kleine Dateien in größere Dateien komprimieren, um den Lesedurchsatz zu verbessern. Ausführliche Informationen finden Sie unter Komprimieren von Datendateien mit OPTIMIZE in Delta Lake.

  • Steuern der Shufflepartitionen für Schreibvorgänge: Der merge-Vorgang mischt Daten mehrmals, um die aktualisierten Daten zu berechnen und zu schreiben. Die Anzahl der für Shuffle verwendeten Aufgaben wird durch die Spark-Sitzungskonfiguration spark.sql.shuffle.partitions gesteuert. Durch das Festlegen dieses Parameters wird nicht nur die Parallelität gesteuert, sondern auch die Anzahl der Ausgabedateien bestimmt. Durch das Erhöhen des Werts wird die Parallelität erhöht, aber auch eine größere Anzahl kleinerer Datendateien generiert.

  • Aktivieren von optimierten Schreibvorgängen: Bei partitionierten Tabellen kann merge eine viel größere Anzahl von kleinen Dateien erzeugen als die Anzahl von Shufflepartitionen. Dies liegt daran, dass jede Shuffleaufgabe mehrere Dateien in mehreren Partitionen schreiben und so zu einem Leistungsengpass werden kann. Sie können die Anzahl der Dateien verringern, indem Sie „Optimized Write“ (Optimierter Schreibvorgang) aktivieren. Weitere Informationen finden Sie unter Optimierte Schreibvorgänge für Delta Lake in Azure Databricks.

  • Optimieren der Dateigröße in Tabellen: Azure Databricks kann automatisch erkennen, ob eine Delta-Tabelle häufige merge-Vorgänge zum erneuten Schreiben von Dateien enthält. Möglicherweise reduziert Databricks die Größe neu geschriebener Dateien in Erwartung weiterer Dateischreibvorgänge in der Zukunft. Details finden Sie im Abschnitt zum Optimieren von Dateigrößen.

  • Low Shuffle Merge: Low Shuffle Merge bietet eine optimierte Implementierung von MERGE-Vorgängen, die eine bessere Leistung für die meisten gängigen Workloads bietet. Darüber hinaus behält es vorhandene Optimierungen des Datenlayouts wie z. B. Z-Sortierung für unveränderte Daten bei.

Verwalten der Datenaktualität

Zu Beginn jeder Abfrage werden Delta-Tabellen automatisch auf die neueste Version der Tabelle aktualisiert. Dieser Vorgang ist in einem Notebook zu beobachten, wenn der folgende Befehlsstatus gemeldet wird: Updating the Delta table's state. Wenn Sie jedoch eine historische Analyse für eine Tabelle durchführen, benötigen Sie nicht unbedingt Daten bis zur letzten Minute, insbesondere bei Tabellen, in die häufig Streamingdaten eingespeist werden. In diesen Fällen können die Abfragen mit veralteten Momentaufnahmen Ihrer Delta-Tabelle ausgeführt werden. Dieser Ansatz kann die Latenz beim Abrufen von Abfrageergebnissen senken.

Sie können die Toleranz für veraltete Daten konfigurieren, indem Sie die Spark-Sitzungskonfiguration spark.databricks.delta.stalenessLimit auf einen Uhrzeit-Zeichenfolgenwert wie 1h oder 15m (für 1 Stunde bzw. 15 Minuten) festlegen. Diese Konfiguration ist sitzungsspezifisch und wirkt sich nicht auf andere Clients aus, die auf die Tabelle zugreifen. Wenn der Tabellenstatus innerhalb des Grenzwerts für die Veraltung aktualisiert wurde, gibt eine Abfrage für die Tabelle Ergebnisse zurück, ohne auf die neueste Tabellenaktualisierung zu warten. Diese Einstellung verhindert niemals, dass die Tabelle aktualisiert wird. Falls veraltete Daten zurückgegeben werden, läuft die Aktualisierung noch im Hintergrund. Wenn die letzte Tabellenaktualisierung älter als der Grenzwert für die Veraltung ist, gibt die Abfrage erst Ergebnisse zurück, wenn die Tabellenstatusaktualisierung abgeschlossen ist.

Verbesserte Prüfpunkte für Abfragen mit geringer Latenz

Delta Lake schreibt mit optimierter Häufigkeit Prüfpunkte als Aggregatzustand einer Delta-Tabelle. Diese Prüfpunkte dienen als Ausgangspunkt zum Berechnen des aktuellen Status der Tabelle. Ohne Prüfpunkte müsste Delta Lake eine große Sammlung von JSON-Dateien (Delta-Dateien) lesen, die Commits im Transaktionsprotokoll repräsentieren, um den Status einer Tabelle zu berechnen. Darüber hinaus werden die Statistiken auf Spaltenebene, die Delta Lake zum Überspringen von Daten verwendet, im Prüfpunkt gespeichert.

Wichtig

Delta Lake-Prüfpunkte unterscheiden sich von Prüfpunkten für das strukturierte Streaming.

Statistiken auf Spaltenebene werden als Struktur und JSON-Code (für Abwärtskompatibilität) gespeichert. Aufgrund des Strukturformats sind Delta Lake-Lesevorgänge deutlich schneller, denn:

  • Delta Lake führt keine aufwändige JSON-Analyse durch, um Statistiken auf Spaltenebene abzurufen.
  • Die Funktionen zur Parquet-Spaltenbereinigung verringern die zum Lesen der Statistiken für eine Spalte erforderlichen E/A-Vorgänge erheblich.

Das Strukturformat ermöglicht eine Reihe von Optimierungen, die den Mehraufwand für Delta Lake-Lesevorgänge von Sekunden auf einige zehn Millisekunden reduzieren, was die Latenzzeit für kurze Abfragen erheblich verringert.

Verwalten von Statistiken auf Spaltenebene in Prüfpunkten

Mit den Tabelleneigenschaften delta.checkpoint.writeStatsAsJson und delta.checkpoint.writeStatsAsStruct steuern Sie, wie die Statistiken in Prüfpunkte geschrieben werden. Wenn beide Tabelleneigenschaften false sind, kann Delta Lake keine Daten überspringen.

  • Batchschreibvorgänge schreiben Statistiken sowohl im JSON- als auch im Strukturformat. delta.checkpoint.writeStatsAsJson ist true
  • delta.checkpoint.writeStatsAsStruct ist standardmäßig nicht definiert.
  • Reader verwenden die Strukturspalte, sofern verfügbar, und greifen ansonsten auf die JSON-Spalte zurück.

Wichtig

Die verbesserten Prüfpunkte beeinträchtigen nicht die Kompatibilität mit Open-Source-Data Lake-Readern. Das Festlegen von auf delta.checkpoint.writeStatsAsJson kann false jedoch Auswirkungen auf proprietäre Delta Lake-Reader haben. Wenden Sie sich an Ihre Anbieter, um mehr über die Auswirkungen auf die Leistung zu erfahren.

Aktivieren erweiterter Prüfpunkte für Abfragen mit strukturiertem Streaming

Wenn für Ihre Workloads für strukturiertes Streaming keine niedrigen Latenzzeiten benötigt werden (Latenzzeiten im Minutenbereich), können Sie erweiterte Prüfpunkte aktivieren, indem Sie den folgenden SQL-Befehl ausführen:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

Die Schreiblatenz des Prüfpunkts kann auch durch Festlegen der folgenden Tabelleneigenschaften verbessert werden:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Wenn das Überspringen von Daten in Ihrer Anwendung nicht sinnvoll ist, können Sie beide Eigenschaften auf „false“ festlegen. Dann werden keine Statistiken erfasst oder geschrieben. Diese Konfiguration wird nicht empfohlen.