Tabelle: Lese-und Schreibvorgänge als Batchvorgang

Delta Lake unterstützt die meisten optionen, die von Apache Spark DataFrame-Lese- und -Schreib-APIs zum Ausführen von Batchlese- und -schreibvorgängen für Tabellen bereitgestellt werden.

Informationen zu Delta Lake SQL-Befehlen finden Sie unter

Erstellen einer Tabelle

Delta Lake unterstützt das Erstellen von zwei Tabellentypen: Tabellen, die im Metastore definiert sind, und Tabellen, die durch path definiert sind.

Tabellen können auf folgende Weise erstellt werden.

  • SQL DDL-Befehle:Sie können standard SQL DDL-Befehle verwenden, die in Apache Spark unterstützt werden (z. B. und ), um REPLACE TABLE Delta-Tabellen zu erstellen.

    CREATE TABLE IF NOT EXISTS default.people10m (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    ) USING DELTA
    
    CREATE OR REPLACE TABLE default.people10m (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    ) USING DELTA
    

    Hinweis

    In Databricks Runtime 8.0 und höher ist Delta Lake das Standardformat, und Sie benötigen nicht USING DELTA .

    In Databricks Runtime 7.0 und höher unterstützt SQL auch das Erstellen einer Tabelle unter einem Pfad, ohne einen Eintrag im Hive-Metastore zu erstellen.

    -- Create or replace table with path
    CREATE OR REPLACE TABLE delta.`/tmp/delta/people10m` (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    ) USING DELTA
    
  • API:Wenn Sie gleichzeitig eine Tabelle erstellen und Daten aus Spark-Datenrahmen oder Datasets einfügen möchten, können Sie Spark ( Scala oder Java und Python ) DataFrameWriterverwenden.DataFrameWriter

    Python

    # Create table in the metastore using DataFrame's schema and write data to it
    df.write.format("delta").saveAsTable("default.people10m")
    
    # Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to it
    df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
    

    Scala

    // Create table in the metastore using DataFrame's schema and write data to it
    df.write.format("delta").saveAsTable("default.people10m")
    
    // Create table with path using DataFrame's schema and write data to it
    df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
    
    • In Databricks Runtime 8.0 und höher ist Delta Lake das Standardformat, und Sie müssen USING DELTAformat("delta") , oder nicht using("delta") angeben.
    • In Databricks Runtime 7.0 und höher können Sie delta-Tabellen auch mithilfe der DataFrameWriterV2 Spark-API erstellen.
  • API:Sie können auch die DeltaTableBuilder API in Delta Lake zum Erstellen von Tabellen. Im Vergleich zu den DataFrameWriter-APIs erleichtert diese API die Angabe zusätzlicher Informationen wie Spaltenkommentare, Tabelleneigenschaften und generierte Spalten.

    Wichtig

    Dieses Feature befindet sich in der Public Preview.

    Hinweis

    Dieses Feature ist ab Databricks Runtime 8.3 verfügbar.

    Python

    # Create table in the metastore
    DeltaTable.createIfNotExists(spark) \
      .tableName("default.people10m") \
      .addColumn("id", "INT") \
      .addColumn("firstName", "STRING") \
      .addColumn("middleName", "STRING") \
      .addColumn("lastName", "STRING", comment = "surname") \
      .addColumn("gender", "STRING") \
      .addColumn("birthDate", "TIMESTAMP") \
      .addColumn("ssn", "STRING") \
      .addColumn("salary", "INT") \
      .execute()
    
    # Create or replace table with path and add properties
    DeltaTable.createOrReplace(spark) \
      .addColumn("id", "INT") \
      .addColumn("firstName", "STRING") \
      .addColumn("middleName", "STRING") \
      .addColumn("lastName", "STRING", comment = "surname") \
      .addColumn("gender", "STRING") \
      .addColumn("birthDate", "TIMESTAMP") \
      .addColumn("ssn", "STRING") \
      .addColumn("salary", "INT") \
      .property("description", "table with people data") \
      .location("/tmp/delta/people10m") \
      .execute()
    

    Scala

    // Create table in the metastore
    DeltaTable.createOrReplace(spark)
      .tableName("default.people10m")
      .addColumn("id", "INT")
      .addColumn("firstName", "STRING")
      .addColumn("middleName", "STRING")
      .addColumn(
        DeltaTable.columnBuilder("lastName")
          .dataType("STRING")
          .comment("surname")
          .build())
      .addColumn("lastName", "STRING", comment = "surname")
      .addColumn("gender", "STRING")
      .addColumn("birthDate", "TIMESTAMP")
      .addColumn("ssn", "STRING")
      .addColumn("salary", "INT")
      .execute()
    
    // Create or replace table with path and add properties
    DeltaTable.createOrReplace(spark)
      .addColumn("id", "INT")
      .addColumn("firstName", "STRING")
      .addColumn("middleName", "STRING")
      .addColumn(
        DeltaTable.columnBuilder("lastName")
          .dataType("STRING")
          .comment("surname")
          .build())
      .addColumn("lastName", "STRING", comment = "surname")
      .addColumn("gender", "STRING")
      .addColumn("birthDate", "TIMESTAMP")
      .addColumn("ssn", "STRING")
      .addColumn("salary", "INT")
      .property("description", "table with people data")
      .location("/tmp/delta/people10m")
      .execute()
    

Weitere Informationen finden Sie in der API-Dokumentation.

Siehe auch Erstellen einer Tabelle.

Partitionieren von Daten

Sie können Daten partitionieren, um Abfragen oder DML zu beschleunigen, die Prädikate mit den Partitionsspalten enthalten. Um Daten beim Erstellen einer Delta-Tabelle zu partitionieren, geben Sie eine Partition nach Spalten an. Im folgenden Beispiel wird nach Geschlecht partitioniert.

SQL

-- Create table in the metastore
CREATE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)
USING DELTA
PARTITIONED BY (gender)

Python

df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")

DeltaTable.create(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .partitionedBy("gender") \
  .execute()

Scala

df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")

DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .partitionedBy("gender")
  .execute()

Steuern des Datenspeicherorts

Für Tabellen, die im Metastore definiert sind, können Sie optional als LOCATION Pfad angeben. Tabellen, die mit einem angegebenen erstellt werden, werden vom LOCATION Metastore als nicht verwaltet betrachtet. Im Gegensatz zu einer verwalteten Tabelle, in der kein Pfad angegeben ist, werden die Dateien einer nicht verwalteten Tabelle nicht gelöscht, wenn Sie die DROP Tabelle verwenden.

Wenn Sie mit CREATE TABLE einem LOCATION ausführen, CREATE TABLE mit Delta Lake gespeicherte Daten enthält, führt Delta Lake folgende Schritte aus:

  • Wenn Sie nur den Tabellennamen und speicherort angeben,z. B.:

    CREATE TABLE default.people10m
    USING DELTA
    LOCATION '/tmp/delta/people10m'
    

    Die Tabelle im Metastore erbt automatisch die Schema-, Partitionierungs- und Tabelleneigenschaften der vorhandenen Daten. Diese Funktion kann verwendet werden, um Daten in den Metastore zu "importieren".

  • Wenn Sie eine Konfiguration angeben (Schema-, Partitionierungs- oder Tabelleneigenschaften), überprüft Delta Lake, ob die Spezifikation genau der Konfiguration der vorhandenen Daten entspricht.

    Wichtig

    Wenn die angegebene Konfiguration nicht genau mit der Konfiguration der Daten überein passt, löst Delta Lake eine Ausnahme aus, die die Abweichung beschreibt.

Hinweis

Der Metastore ist nicht die Quelle der Wahrheit über die neuesten Informationen einer Delta-Tabelle. Tatsächlich enthält die Tabellendefinition im Metastore möglicherweise nicht alle Metadaten wie Schema und Eigenschaften. Sie enthält den Speicherort der Tabelle, und das Transaktionsprotokoll der Tabelle am Speicherort ist die Quelle der Wahrheit. Wenn Sie den Metastore von einem System abfragen, das diese Delta-spezifische Anpassung nicht kennt, werden möglicherweise unvollständige oder veraltete Tabelleninformationen angezeigt.

Verwenden generierter Spalten

Wichtig

Dieses Feature befindet sich in der Public Preview.

Hinweis

Dieses Feature ist ab Databricks Runtime 8.3 verfügbar.

Delta Lake unterstützt generierte Spalten, bei denen es sich um eine spezielle Art von Spalten handelt, deren Werte automatisch basierend auf einer benutzerspezifischen Funktion für andere Spalten in der Delta-Tabelle generiert werden. Wenn Sie in eine Tabelle mit generierten Spalten schreiben und keine expliziten Werte dafür bereitstellen, berechnet Delta Lake die Werte automatisch. Beispielsweise können Sie automatisch eine Datumsspalte (für die Partitionierung der Tabelle nach Datum) aus der timestamp -Spalte generieren. Alle Schreibvorgänge in die Tabelle müssen nur die Daten für die timestamp-Spalte angeben. Wenn Sie jedoch explizit Werte für sie bereitstellen, müssen die Werte die Einschränkung erfüllen, da der Schreibfehler auftritt.

Das folgende Beispiel zeigt, wie Sie eine Tabelle mit generierten Spalten erstellen:

SQL

CREATE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  dateOfBirth DATE GENERATED ALWAYS AS (CAST(birthDate AS DATE)),
  ssn STRING,
  salary INT
)
USING DELTA
PARTITIONED BY (gender)

Python

DeltaTable.create(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .partitionedBy("gender") \
  .execute()

Scala

DeltaTable.create(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn(
    DeltaTable.columnBuilder("dateOfBirth")
     .dataType(DateType)
     .generatedAlwaysAs("CAST(dateOfBirth AS DATE)")
     .build())
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .partitionedBy("gender")
  .execute()

Generierte Spalten werden so gespeichert, als ob es sich um normale Spalten handelte. Das heißt, sie belegen Speicher.

Die folgenden Einschränkungen gelten für generierte Spalten:

  • Ein Generierungsausdruck kann alle SQL-Funktionen in Spark verwenden, die immer das gleiche Ergebnis zurückgeben, wenn dieselben Argumentwerte angegeben werden, mit Ausnahme der folgenden Funktionstypen:

    • Benutzerdefinierte Funktionen.
    • Aggregatfunktionen.
    • Fensterfunktionen.
    • Funktionen, die mehrere Zeilen zurückgeben.
  • Für Databricks Runtime 9.1 und höher unterstützen Vorgänge generierte Spalten, MERGE wenn Sie auf TRUE spark.databricks.delta.schema.autoMerge.enabled festlegen.

Lesen einer Tabelle

Sie können eine Delta-Tabelle als Datenrahmen laden, indem Sie einen Tabellennamen oder einen Pfad angeben:

SQL

SELECT * FROM default.people10m   -- query table in the metastore

SELECT * FROM delta.`/tmp/delta/people10m`  -- query table by path

Python

spark.table("default.people10m")    # query table in the metastore

spark.read.format("delta").load("/tmp/delta/people10m")  # query table by path

Scala

spark.table("default.people10m")      // query table in the metastore

spark.read.format("delta").load("/tmp/delta/people10m")  // create table by path

import io.delta.implicits._
spark.read.delta("/tmp/delta/people10m")

Der zurückgegebene DataFrame liest automatisch die neueste Momentaufnahme der Tabelle für jede Abfrage. Sie müssen nie REFRESH TABLE ausführen. Delta Lake verwendet automatisch Partitionierung und Statistiken, um die minimale Datenmenge zu lesen, wenn in der Abfrage anwendbare Prädikate vorhanden sind.

Abfragen einer älteren Momentaufnahme einer Tabelle (Zeitreise)

In diesem Abschnitt

Mit Delta Lake Time Travel können Sie eine ältere Momentaufnahme einer Delta-Tabelle abfragen. Für Zeitreise gibt es viele Anwendungsfälle, z. B.:

  • Erneutes Erstellen von Analysen, Berichten oder Ausgaben (z. B. die Ausgabe eines Machine Learning-Modells). Dies kann insbesondere in regulierten Branchen für das Debuggen oder die Überwachung nützlich sein.
  • Schreiben komplexer temporaler Abfragen.
  • Korrigieren von Fehlern in Ihren Daten.
  • Bereitstellen von Momentaufnahmeisolation für eine Reihe von Abfragen für schnell veränderliche Tabellen.

In diesem Abschnitt werden die unterstützten Methoden zum Abfragen älterer Tabellenversionen, Probleme bei der Datenaufbewahrung und Beispiele beschrieben.

Syntax

In diesem Abschnitt wird gezeigt, wie Sie eine ältere Version einer Delta-Tabelle abfragen.

In diesem Abschnitt

SQL AS OF Syntax

SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version

where

  • timestamp_expression kann eine der beiden möglichen Sein:
    • '2018-10-18T22:15:12.013Z', d. b. eine Zeichenfolge, die in einen Zeitstempel um umwandlungen kann.
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', d. b. eine Datumszeichenfolge
    • In Databricks Runtime 6.6 und höher:
      • current_timestamp() - interval 12 hours
      • date_sub(current_date(), 1)
      • Jeder andere Ausdruck, der oder ist, kann in einen Zeitstempel umformiert werden.
  • version ist ein long-Wert, der aus der Ausgabe von abgerufen werden DESCRIBE HISTORY table_spec kann.

timestamp_expressionWeder noch können version Unterabfragen sein.

Beispiel

SELECT * FROM default.people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

DataFrameReader-Optionen

Mit den DataFrameReader-Optionen können Sie einen DataFrame aus einer Delta-Tabelle erstellen, die auf eine bestimmte Version der Tabelle festgelegt ist.

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people10m")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people10m")

Für timestamp_string werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert. Beispiel: "2019-01-01" und "2019-01-01T00:00:00.000Z".

Ein gängiges Muster ist die Verwendung des aktuellen Zustands der Delta-Tabelle während der Ausführung eines Azure Databricks Auftrags, um Downstreamanwendungen zu aktualisieren.

Da Delta-Tabellen automatisch aktualisiert werden, gibt ein aus einer Delta-Tabelle geladener DataFrame möglicherweise unterschiedliche Ergebnisse über Aufrufe hinweg zurück, wenn die zugrunde liegenden Daten aktualisiert werden. Mithilfe der Zeitreise können Sie die vom DataFrame zurückgegebenen Daten über Aufrufe hinweg korrigieren:

latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/tmp/delta/people10m`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta/people10m")

Beispiele

  • Korrektur von versehentlichen Löschungen in einer Tabelle für den 111 Benutzer:
INSERT INTO my_table
  SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
  WHERE userId = 111
  • Korrektur versehentlich falscher Updates für eine Tabelle:
MERGE INTO my_table target
  USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
  ON source.userId = target.userId
  WHEN MATCHED THEN UPDATE SET *
  • Fragen Sie die Anzahl der neuen Kunden ab, die in der letzten Woche hinzugefügt wurden.
SELECT count(distinct userId) - (
  SELECT count(distinct userId)
  FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))

Beibehaltung von Daten

Für die Zeitreise zu einer früheren Version müssen Sie sowohl die Protokoll- als auch die Datendateien für diese Version beibehalten.

Die Datendateien, die eine Delta-Tabelle sichern, werden nie automatisch gelöscht. Datendateien werden nur gelöscht, wenn Sie VACUUMausführen. VACUUMVACUUM Deltaprotokolldateien. Protokolldateien werden automatisch bereinigt, nachdem Prüfpunkte geschrieben wurden.

Standardmäßig können Sie eine Zeitreise zu einer Delta-Tabelle bis zu 30 Tage alt machen, es sei denn, Sie verfügen über Folgendes:

  • Führen Sie VACUUM für Ihre Delta-Tabelle aus.
  • Die Aufbewahrungsdauer für Daten oder Protokolldateien wurde mithilfe der folgenden Tabelleneigenschaftengeändert:
    • delta.logRetentionDuration = "interval <interval>": Steuert, wie lange der Verlauf einer Tabelle beibehalten wird. Der Standardwert lautet interval 30 days.

      Jedes Mal, wenn ein Prüfpunkt geschrieben wird, bereinigt Azure Databricks automatisch Protokolleinträge, die älter als das Aufbewahrungsintervall sind. Wenn Sie diese Konfiguration auf einen ausreichend großen Wert festlegen, werden viele Protokolleinträge beibehalten. Dies sollte sich nicht auf die Leistung auswirken, da Vorgänge für das Protokoll eine konstante Zeit sind. Vorgänge im Verlauf sind parallel, werden jedoch mit zunehmender Protokollgröße teurer.

    • delta.deletedFileRetentionDuration = "interval <interval>": steuert, wie lange eine Datei gelöscht worden sein muss, delta.deletedFileRetentionDuration = "interval <interval>"VACUUM gilt. Der Standardwert lautet interval 7 days.

      Legen Sie fest, um auf Verlaufsdaten von 30 Tagen zuzugreifen, auch wenn Sie VACUUM für die Delta-Tabelle delta.deletedFileRetentionDuration = "interval 30 days" ausführen. Diese Einstellung kann dazu führen, dass Ihre Speicherkosten steigen.

Schreiben in eine Tabelle

Anfügen

Verwenden Sie den folgenden Modus, um einer vorhandenen Delta-Tabelle atomisch neue Daten append hinzuzufügen:

SQL

INSERT INTO default.people10m SELECT * FROM morePeople

Python

df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")

Scala

df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("append").delta("/tmp/delta/people10m")

Overwrite

Verwenden Sie den folgenden Modus, um alle Daten in einer Tabelle atomisch zu overwrite ersetzen:

SQL

INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople

Python

df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")

Scala

df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("overwrite").delta("/tmp/delta/people10m")

Mit DataFrames können Sie auch selektiv nur die Daten überschreiben, die einem beliebigen Ausdruck entsprechen. Dieses Feature ist in Databricks Runtime 9.1 LTS und höher verfügbar. Mit dem folgenden Befehl werden Ereignisse im Januar in der Zieltabelle, die durch partitioniert ist, atomisch start_date durch die Daten in df ersetzt:

Python

df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \
  .save("/tmp/delta/events")

Scala

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

Dieser Beispielcode schreibt die Daten in df , überprüft, ob alles mit dem Prädikat übereinstimmt, und führt eine atomare Ersetzung durch. Wenn Sie Daten ausschreiben möchten, die nicht alle mit dem Prädikat übereinstimmen, können Sie die Einschränkungsprüfung deaktivieren, indem Sie auf FALSE festlegen, um die übereinstimmenden Zeilen in der Zieltabelle spark.databricks.delta.replaceWhere.constraintCheck.enabled zu ersetzen:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

In Databricks Runtime 9.0 und darunter werden Daten überschrieben, die replaceWhere mit einem Prädikat nur über Partitionsspalten übereinstimmen. Der folgende Befehl ersetzt atomisch den Monat januar in der Zieltabelle, die durch partitioniert date ist, durch die Daten in df :

Python

df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
  .save("/tmp/delta/people10m")

Scala

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Wenn Sie in Databricks Runtime Version 9.1 und höher auf das alte Verhalten zurückgreifen möchten, können Sie das spark.databricks.delta.replaceWhere.dataColumns.enabled Flag deaktivieren:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

Hinweis

Im Gegensatz zu den Datei-APIs in Apache Spark speichert Delta Lake das Schema einer Tabelle und erzwingt es. Dies bedeutet, dass Überschreibungen standardmäßig nicht das Schema einer vorhandenen Tabelle ersetzen.

Informationen zur Delta Lake-Unterstützung für das Aktualisieren von Tabellen finden Sie unter Löschen, Aktualisieren und Zusammenführen vonTabellen.

Festlegen benutzerdefinierter Commitmetadaten

Sie können benutzerdefinierte Zeichenfolgen als Metadaten in Commits angeben, die von diesen Vorgängen ausgeführt werden, indem Sie entweder die DataFrameWriter-Option userMetadata oder die SparkSession-Konfiguration spark.databricks.delta.commitInfo.userMetadata verwenden. Wenn beide angegeben wurden, wird die Option bevorzugt. Diese benutzerdefinierten Metadaten sind im Verlaufsvorgang lesbar.

SQL


SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE default.people10m SELECT * FROM morePeople

Python

df.write.format("delta") \
  .mode("overwrite") \
  .option("userMetadata", "overwritten-for-fixing-incorrect-data") \
  .save("/tmp/delta/people10m")

Scala

df.write.format("delta")
  .mode("overwrite")
  .option("userMetadata", "overwritten-for-fixing-incorrect-data")
  .save("/tmp/delta/people10m")

Schemavalidierung

Delta Lake überprüft automatisch, ob das Schema des geschriebenen DataFrames mit dem Schema der Tabelle kompatibel ist. Delta Lake verwendet die folgenden Regeln, um zu bestimmen, ob ein Schreibvorgang aus einem DataFrame in eine Tabelle kompatibel ist:

  • Alle DataFrame-Spalten müssen in der Zieltabelle vorhanden sein. Wenn spalten im DataFrame nicht in der Tabelle vorhanden sind, wird eine Ausnahme ausgelöst. Spalten, die in der Tabelle, aber nicht im DataFrame vorhanden sind, werden auf NULL festgelegt.
  • DataFrame-Spaltendatentypen müssen mit den Spaltendatentypen in der Zieltabelle übereinstimmen. Wenn sie nicht übereinstimmen, wird eine Ausnahme ausgelöst.
  • Datenrahmenspaltennamen können sich nicht nur nach Fall unterscheiden. Dies bedeutet, dass Spalten wie "Foo" und "foo" nicht in derselben Tabelle definiert sind. Sie können Spark zwar im Standardmodus verwenden, aber beim Speichern und Zurückgeben von Spalteninformationen wird bei Parquet die Groß-/Kleinschreibung beachtet. Delta Lake behält beim Speichern des Schemas die Groß-/Kleinschreibung bei, berücksichtigt jedoch die Groß-/Kleinschreibung und weist diese Einschränkung auf, um potenzielle Fehler, Datenbeschädigungen oder Verlustprobleme zu vermeiden.

Delta Lake unterstützt DDL, um neue Spalten explizit hinzuzufügen und das Schema automatisch zu aktualisieren.

Wenn Sie andere Optionen angeben, z. B. partitionBy , in Kombination mit dem Anfügemodus, überprüft Delta Lake, ob sie übereinstimmen, und löst bei einem Konflikt einen Fehler aus. Wenn partitionBy nicht vorhanden ist, folgen Anfügungen automatisch der Partitionierung der vorhandenen Daten.

Hinweis

In Databricks Runtime 7.0 und höher INSERT ermöglicht die Syntax die Erzwingung von Schemas und unterstützt die Schemaentwicklung. Wenn der Datentyp einer Spalte nicht sicher in den Datentyp Ihrer Delta Lake-Tabelle umgeformt werden kann, wird eine Laufzeitausnahme ausgelöst. Wenn die Schemaentwicklung aktiviert ist, können neue Spalten als letzte Spalten Ihres Schemas (oder geschachtelte Spalten) vorhanden sein, damit das Schema weiterentwickelt werden kann.

Aktualisieren des Tabellenschemas

Mit Delta Lake können Sie das Schema einer Tabelle aktualisieren. Die folgenden Arten von Änderungen werden unterstützt:

  • Hinzufügen neuer Spalten (an beliebigen Positionen)
  • Neuanordnen vorhandener Spalten
  • Umbenennen vorhandener Spalten

Sie können diese Änderungen explizit mithilfe von DDL oder implizit mit DML vornehmen.

Wichtig

Wenn Sie ein Delta-Tabellenschema aktualisieren, werden Datenströme, die aus dieser Tabelle gelesen werden, beendet. Wenn Sie möchten, dass der Stream fortgesetzt wird, müssen Sie ihn neu starten.

Empfohlene Methoden finden Sie unter Strukturiertes Streaming in der Produktion.

Schema explizit aktualisieren

Sie können die folgende DDL verwenden, um das Schema einer Tabelle explizit zu ändern.

Hinzufügen von Spalten

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Standardmäßig ist nullability true .

Um einem geschachtelten Feld eine Spalte hinzuzufügen, verwenden Sie:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Beispiel

Wenn das Schema vor der Ausführung ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) wie im Folgenden angegeben ist:

- root
| - colA
| - colB
| +-field1
| +-field2

das Schema nach ist:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Hinweis

Das Hinzufügen geschachtelter Spalten wird nur für Strukturen unterstützt. Arrays und Zuordnungen werden nicht unterstützt.

Ändern des Spaltenkommentars oder der Reihenfolge

ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

Um eine Spalte in einem geschachtelten Feld zu ändern, verwenden Sie:

ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
Beispiel

Wenn das Schema vor der Ausführung ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST wie im Folgenden angegeben ist:

- root
| - colA
| - colB
| +-field1
| +-field2

das Schema nach ist:

- root
| - colA
| - colB
| +-field2
| +-field1

Spalten ersetzen

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Beispiel

Beim Ausführen der folgenden DSL:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

, wenn das Schema vor ist:

- root
| - colA
| - colB
| +-field1
| +-field2

das Schema nach ist:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Umbenennen von Spalten

Wichtig

Dieses Feature befindet sich in der Public Preview.

Anforderungen
  • Databricks Runtime 10.2 oder höher.
  • Um Spalten umzubenennen, ohne die vorhandenen Daten der Spalten umzuschreiben, müssen Sie die Spaltenzuordnung für die Tabelle aktivieren. Weitere Informationen finden Sie unter Deltaspaltenzuordnung.

So benennen Sie eine Spalte um:

ALTER TABLE <table_name> RENAME COLUMN old_col_name TO new_col_name

So benennen Sie ein geschachtelte Feld um:

ALTER TABLE <table_name> RENAME COLUMN col_name.old_nested_field TO new_nested_field
Beispiel

Wenn Sie den folgenden Befehl ausführen:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Wenn das schema before ist:

- root
| - colA
| - colB
| +-field1
| +-field2

Anschließend ist das Schema nach wie vor:

- root
| - colA
| - colB
| +-field001
| +-field2

Weitere Informationen finden Sie unter Deltaspaltenzuordnung.

Ändern des Spaltentyps oder -namens

Sie können den Typ oder Namen einer Spalte ändern oder eine Spalte durch Umschreiben der Tabelle ablegen. Verwenden Sie dazu die overwriteSchema Option :

Ändern eines Spaltentyps
spark.read.table(...) \
  .withColumn("birthDate", col("birthDate").cast("date")) \
  .write \
  .format("delta") \
  .mode("overwrite")
  .option("overwriteSchema", "true") \
  .saveAsTable(...)
Ändern eines Spaltennamens
spark.read.table(...) \
  .withColumnRenamed("dateOfBirth", "birthDate") \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable(...)

Automatische Schemaaktualisierung

Delta Lake kann das Schema einer Tabelle automatisch als Teil einer DML-Transaktion aktualisieren (entweder anfügen oder überschreiben) und das Schema mit den geschriebenen Daten kompatibel machen.

Hinzufügen von Spalten

Spalten, die im DataFrame vorhanden sind, aber in der Tabelle fehlen, werden automatisch als Teil einer Schreibtransaktion hinzugefügt, wenn:

  • writeoder writeStream.option("mergeSchema", "true")
  • spark.databricks.delta.schema.autoMerge.enabled ist gleich true.

Wenn beide Optionen angegeben sind, hat die Option DataFrameWriter aus Vorrang. Die hinzugefügten Spalten werden an das Ende der Struktur angefügt, in der sie vorhanden sind. Beim Anfügen einer neuen Spalte bleibt die Case-Fügespalte erhalten.

Hinweis

  • mergeSchema wird nicht unterstützt, mergeSchema aktiviert ist (da sie eine Anforderung, die erfordert, auf eine Anforderung erhöht, MODIFY die ALL PRIVILEGES erfordert).
  • mergeSchema kann nicht mit oder verwendet INSERT INTO.write.insertInto() werden.

NullType-Spalten

Da Parquet nicht unterstützt, werden Spalten beim Schreiben in Delta-Tabellen aus dem DataFrame gelöscht, aber weiterhin NullTypeNullType im Schema gespeichert. Wenn ein anderer Datentyp für diese Spalte empfangen wird, führt Delta Lake das Schema mit dem neuen Datentyp zusammen. Wenn Delta Lake eine für eine vorhandene Spalte empfängt, wird das alte Schema beibehalten und die neue Spalte während des NullType Schreibzugriffs gelöscht.

NullType im Streaming wird nicht unterstützt. Da Sie bei verwendung des Streamings Schemas festlegen müssen, sollte dies sehr selten sein. NullType wird auch für komplexe Typen wie und ArrayType nicht MapType akzeptiert.

Ersetzen von Tabellenschemas

Standardmäßig überschreibt das Überschreiben der Daten in einer Tabelle das Schema nicht. Wenn Sie eine Tabelle mit ohne überschreiben, möchten Sie möglicherweise dennoch das Schema mode("overwrite")replaceWhere der zu schreibenden Daten überschreiben. Sie ersetzen das Schema und die Partitionierung der Tabelle, indem Sie die overwriteSchema Option auf true festlegen:

df.write.option("overwriteSchema", "true")

Sichten in Tabellen

Delta Lake unterstützt die Erstellung von Sichten über Delta-Tabellen genauso wie bei einer Datenquellentabelle.

Diese Sichten sind in die Tabellenzugriffssteuerung integriert, um die Sicherheit auf Spalten- und Zeilenebene zu ermöglichen.

Die größte Herausforderung beim Arbeiten mit Sichten ist das Auflösen der Schemas. Wenn Sie ein Delta-Tabellenschema ändern, müssen Sie abgeleitete Sichten neu erstellen, um alle Ergänzungen des Schemas zu berücksichtigen. Wenn Sie beispielsweise einer Delta-Tabelle eine neue Spalte hinzufügen, müssen Sie sicherstellen, dass diese Spalte in den entsprechenden Sichten verfügbar ist, die auf dieser Basistabelle aufgebaut sind.

Tabelleneigenschaften

Sie können Ihre eigenen Metadaten als Tabelleneigenschaft speichern, indem Sie TBLPROPERTIES in CREATE und ALTER verwenden.

TBLPROPERTIES werden als Teil der Delta-Tabellenmetadaten gespeichert. Sie können "new" TBLPROPERTIES nicht in einer CREATE -Anweisung definieren, wenn bereits eine Delta-Tabelle an einem bestimmten Speicherort vorhanden ist. Weitere Informationen finden Sie unter Tabellenerstellung.

Darüber hinaus unterstützt Delta Lake bestimmte Delta-Tabelleneigenschaften, um Verhalten und Leistung zu optimieren:

  • Lösch- und Aktualisierungssperren in einer Delta-Tabelle: delta.appendOnly=true .

  • Konfigurieren Sie die Eigenschaften für die Aufbewahrung von Zeitreises: und delta.deletedFileRetentionDuration=<interval-string> . Weitere Informationen finden Sie unter Datenaufbewahrung.

  • Konfigurieren Sie die Anzahl der Spalten, für die Statistiken gesammelt werden: delta.dataSkippingNumIndexedCols=<number-of-columns> . Diese Eigenschaft wirkt sich nur auf neue Daten aus, die geschrieben werden.

Hinweis

  • Das Ändern einer Delta-Tabelleneigenschaft ist ein Schreibvorgang, der mit anderen gleichzeitigen Schreibvorgängen in Konflikt steht undzu einem Fehler führt. Es wird empfohlen, eine Tabelleneigenschaft nur zu ändern, wenn keine gleichzeitigen Schreibvorgänge für die Tabelle vorhanden sind.

Sie können eigenschaften mit Präfix auch während des ersten Commits für eine delta. Delta-Tabelle mithilfe von Spark-Konfigurationen festlegen. Um beispielsweise eine Delta-Tabelle mit der -Eigenschaft zu delta.appendOnly=true initialisieren, legen Sie die Spark-Konfiguration auf spark.databricks.delta.properties.defaults.appendOnlytrue fest. Beispiele:

SQL

spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")

Python

spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

Scala

spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

Tabellenmetadaten

Delta Lake verfügt über umfassende Features zum Untersuchen von Tabellenmetadaten.

Sie unterstützt SHOW [PARTITIONS | COLUMNS] und DESCRIBE TABLE . Finden Sie unter

Außerdem werden die folgenden eindeutigen Befehle angezeigt:

DESCRIBE DETAIL

Stellt Informationen zu Schema, Partitionierung, Tabellengröße und so weiter zur Verfügung. Weitere Informationen finden Sie unter Abrufen von Delta-Tabellendetails.

DESCRIBE HISTORY

Stellt Informationen zur Herkunft, einschließlich Vorgang, Benutzer und so weiter, sowie Vorgangsmetriken für jeden Schreibvorgang in eine Tabelle zur Verfügung. Der Tabellenverlauf wird 30 Tage lang aufbewahrt. Weitere Informationen finden Sie unter Abrufen des Delta-Tabellenverlaufs.

Die Randleiste Daten bietet eine visuelle Ansicht dieser detaillierten Tabelleninformationen und des Verlaufs für Delta-Tabellen. Zusätzlich zum Tabellenschema und den Beispieldaten können Sie auf die Registerkarte Verlauf klicken, um den Tabellenverlauf zu sehen, der mit angezeigt wird.

Konfigurieren von Speicheranmeldeinformationen

Delta Lake verwendet Hadoop FileSystem-APIs für den Zugriff auf die Speichersysteme. Die Anmeldeinformationen für Speichersysteme können in der Regel über Hadoop-Konfigurationen festgelegt werden. Delta Lake bietet mehrere Möglichkeiten zum Festlegen von Hadoop-Konfigurationen ähnlich Apache Spark.

Spark-Konfigurationen

Wenn Sie eine Spark-Anwendung in einem Cluster starten, können Sie die Spark-Konfigurationen in Form von festlegen, spark.hadoop.* um Ihre benutzerdefinierten Hadoop-Konfigurationen zu übergeben. Wenn Sie beispielsweise einen Wert für festlegen, wird der Wert als Hadoop-Konfiguration übergeben, und Delta Lake verwendet ihn für den Zugriff spark.hadoop.a.b.ca.b.c auf Hadoop-Dateisystem-APIs.

Weitere Informationen finden Sie unter __ .

SQL sitzungskonfigurationen

Spark SQL über gibt alle aktuellen SQL-Sitzungskonfigurationen an Delta Lake weiter, und Delta Lake verwendet sie für den Zugriff auf Hadoop-Dateisystem-APIs. beispielsweise wird Delta Lake an die Anweisungen übergeben, den Wert als Hadoop-Konfiguration zu übergeben, und Delta Lake verwendet ihn für den Zugriff SET a.b.c=x.y.zx.y.z auf a.b.c Hadoop-Dateisystem-APIs.

DataFrame-Optionen

Neben dem Festlegen von Hadoop-Dateisystemkonfigurationen über die Spark-Konfigurationen (Cluster) oder SQL-Sitzungskonfigurationen unterstützt Delta das Lesen von Hadoop-Dateisystemkonfigurationen aus und -Optionen (d. h. Optionsschlüssel, die mit dem Präfix beginnen), wenn die Tabelle gelesen oder geschrieben wird, indem oder verwendet DataFrameReaderDataFrameWriterfs.DataFrameReader.load(path)DataFrameWriter.save(path) wird.

Hinweis

Dieses Feature ist ab Databricks Runtime 10.1 verfügbar.

Beispielsweise können Sie Ihre Speicherbeschriftungen über DataFrame-Optionen übergeben:

Python

df1 = spark.read.format("delta") \
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>") \
  .read("...")
df2 = spark.read.format("delta") \
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>") \
  .read("...")
df1.union(df2).write.format("delta") \
  .mode("overwrite") \
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>") \
  .save("...")

Scala

val df1 = spark.read.format("delta")
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>")
  .read("...")
val df2 = spark.read.format("delta")
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>")
  .read("...")
df1.union(df2).write.format("delta")
  .mode("overwrite")
  .option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>")
  .save("...")

Die Details der Hadoop-Dateisystemkonfigurationen für Ihren Speicher finden Sie unter Datenquellen.

Notebook

Ein Beispiel für die verschiedenen Delta-Tabellenmetadatenbefehle finden Sie am Ende des folgenden Notebooks:

Notebook für Delta Lake-Batchbefehle

Notebook abrufen