Tutorial: Delta Lake

In diesem Tutorial werden allgemeine Delta Lake-Vorgänge in Azure Databricks vorgestellt, einschließlich der folgenden:

Sie können den Python-, R-, Scala- und SQL-Beispielcode in diesem Artikel in einem Notebook ausführen, das an einen Azure Databricks-Cluster angefügt ist. Sie können den SQL-Code in diesem Artikel auch in einer Abfrage ausführen, die einem SQL-Warehouse in Databricks SQL zugeordnet ist.

Hinweis

Einige der folgenden Codebeispiele verwenden eine zweistufige Namespacenotation, die aus einem Schema (auch Datenbank genannt) und einer Tabelle oder Sicht (z. B. default.people10m) besteht. Ersetzen Sie zur Verwendung dieser Beispiele mit Unity Catalog den zweistufigen Namespace durch die dreistufige Namespacenotation von Unity Catalog, die aus einem Katalog, einem Schema und einer Tabelle oder Sicht besteht (z. B. main.default.people10m).

Erstellen einer Tabelle

In Azure Databricks erstellte Tabellen verwenden standardmäßig das Delta Lake-Protokoll.

Hinweis

Delta Lake ist die Standardeinstellung für alle Lese-, Schreib- und Tabellenerstellungsbefehle für Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Bei den vorherigen Vorgängen wird eine neue nicht verwaltete Tabelle mithilfe des Schemas erstellt, das aus den Daten abgeleitet wurde. Informationen zu den verfügbaren Optionen beim Erstellen einer Delta-Tabelle finden Sie unter CREATE TABLE.

Bei verwalteten Tabellen bestimmt Azure Databricks den Speicherort für die Daten. Zum Abrufen des Speicherorts können Sie die DESCRIBE DETAIL-Anweisung verwenden. Beispiel:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Mitunter möchten Sie möglicherweise eine Tabelle erstellen, indem Sie das Schema angeben, ehe Sie Daten einfügen. Sie können dies mit den folgenden SQL-Befehlen tun:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

In Databricks Runtime 13.0 und höher können Sie eine neue leere Delta-Tabelle mittels CREATE TABLE LIKE erstellen, die die Schema- und Tabelleneigenschaften für eine Delta-Quelltabelle dupliziert. Dies kann besonders nützlich sein, wenn Tabellen aus einer Entwicklungsumgebung in die Produktion hochgestuft werden, z. B. im folgenden Codebeispiel:

CREATE TABLE prod.people10m LIKE dev.people10m

Sie können zum Erstellen von Tabellen auch die DeltaTableBuilder-API in Delta Lake verwenden. Im Vergleich zu den DataFrameWriter-APIs erleichtert diese API die Angabe zusätzlicher Informationen, darunter Spaltenkommentare, Tabelleneigenschaften und generierte Spalten.

Wichtig

Dieses Feature befindet sich in der Public Preview.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert in eine Tabelle

Zum Zusammenführen mehrerer Updates und Einfügungen in eine vorhandene Delta-Tabelle verwenden Sie die MERGE INTO-Anweisung. Mit der folgenden Anweisung werden beispielsweise Daten aus der Quelltabelle übernommen und mit der Delta-Zieltabelle zusammengeführt. Wenn in beiden Tabellen eine übereinstimmende Zeile vorhanden ist, wird die Datenspalte in Delta Lake mithilfe des angegebenen Ausdrucks aktualisiert. Wenn keine übereinstimmende Zeile vorhanden ist, wird in Delta Lake eine neue Zeile hinzugefügt. Dieser Vorgang wird als Upsert bezeichnet.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Wenn Sie * angeben, werden alle Spalten in der Zieltabelle aktualisiert oder eingefügt. Dabei wird davon ausgegangen, dass die Quelltabelle dieselben Spalten wie die Zieltabelle enthält. Andernfalls wird durch die Abfrage einen Analysefehler ausgelöst.

Beim Ausführen eines INSERT-Vorgangs muss für jede Spalte in der Tabelle ein Wert angegeben werden (z. B. wenn im vorhandenen Dataset keine übereinstimmende Zeile vorhanden ist). Es müssen jedoch nicht alle Werte aktualisiert werden.

Fragen Sie die Tabelle ab, um die Ergebnisse anzuzeigen.

SELECT * FROM people_10m WHERE id >= 9999998

Lesen einer Tabelle

Sie greifen auf Daten in Delta-Tabellen über den Tabellennamen oder -pfad zu, wie in den folgenden Beispielen gezeigt:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Schreiben in eine Tabelle

Delta Lake verwendet Standardsyntax zum Schreiben von Daten in Tabellen.

Um einer vorhandenen Delta-Tabelle neue Daten atomisch hinzuzufügen, verwenden Sie wie in den folgenden Beispielen den Modus append:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Um alle Daten in einer Tabelle atomisch zu ersetzen, verwenden Sie wie in den folgenden Beispielen den Modus overwrite:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Aktualisieren einer Tabelle

Sie können Daten aktualisieren, die einem Prädikat in einer Delta-Tabelle entsprechen. In einer Tabelle mit dem Namen people10m oder einem Pfad unter /tmp/delta/people-10m können Sie beispielsweise folgendes ausführen, um eine Abkürzung in der Spalte gender von M oder F in Male oder Female zu ändern:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Löschen aus einer Tabelle

Sie können Daten, die einem Prädikat entsprechen, aus einer Delta-Tabelle entfernen. In einer Tabelle mit dem Namen people10m oder einem Pfad unter /tmp/delta/people-10m können Sie beispielsweise Folgendes ausführen, um alle Zeilen zu löschen, die Personen mit einem Wert in der Spalte birthDate vor 1955 entsprechen:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Wichtig

delete entfernt die Daten aus der neuesten Version der Delta-Tabelle, entfernt sie aber erst dann aus dem physischen Speicher, wenn die alten Versionen mit Vacuum explizit bereinigt wurden. Details finden Sie unter Vacuum.

Anzeigen des Tabellenverlaufs

Verwenden Sie zum Anzeigen des Verlaufs einer Tabelle die DESCRIBE HISTORY-Anweisung, mit für jeden Schreibvorgang in einer Tabelle Informationen zur Herkunft wie etwa zu Tabellenversion, Vorgang, Benutzer usw. bereitgestellt werden.

DESCRIBE HISTORY people_10m

Abfragen einer früheren Version der Tabelle (Zeitreise)

Mit einer Delta Lake-Zeitreise können Sie eine ältere Momentaufnahme einer Delta-Tabelle abfragen.

Zum Abfragen einer älteren Version einer Tabelle geben Sie in einer SELECT-Anweisung eine Version oder einen Zeitstempel an. Verwenden Sie beispielsweise Folgendes, um Version 0 aus dem obigen Verlauf abzufragen:

SELECT * FROM people_10m VERSION AS OF 0

oder

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Als Zeitstempel werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert, z. B. "2019-01-01" und "2019-01-01'T'00:00:00.000Z".

Mit den DataFrameReader-Optionen können Sie beispielsweise in Python einen Datenrahmen aus einer Delta-Tabelle erstellen, die auf eine bestimmte Version der Tabelle festgelegt ist:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

oder alternativ:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Weitere Informationen finden Sie unter Arbeiten mit dem Delta Lake-Tabellenverlauf.

Optimieren einer Tabelle

Nachdem Sie mehrere Änderungen an einer Tabelle vorgenommen haben, verfügen Sie möglicherweise über viele kleine Dateien. Zur Beschleunigung von Leseabfragen können Sie OPTIMIZE verwenden, um kleine Dateien zu größeren zusammenzufassen:

OPTIMIZE people_10m

Z-Reihenfolge nach Spalten

Zur Verbesserung der Leseleistung können Sie verwandte Informationen in einer Dateiengruppe entsprechend der Z-Reihenfolge zusammenstellen. Diese Zusammenstellung wird von Delta Lake-Datensprungalgorithmen automatisch verwendet, um die Menge der zu lesenden Daten erheblich zu reduzieren. Wenn Daten in der Z-Reihenfolge sortiert werden sollen, geben Sie die Spalten, nach denen sortiert werden soll, in der ZORDER BY-Klausel an. Führen Sie beispielsweise zum Sortieren nach gender den folgenden Code aus:

OPTIMIZE people_10m
ZORDER BY (gender)

Alle Optionen, die bei der Ausführung von OPTIMIZE verfügbar sind, finden Sie unter Komprimieren von Datendateien mit OPTIMIZE in Delta Lake.

Bereinigen von Momentaufnahmen VACUUM

Delta Lake ermöglicht die Momentaufnahmenisolation für Lesevorgänge, was bedeutet, dass OPTIMIZE auch dann sicher ausgeführt werden kann, wenn die Tabelle von anderen Benutzern oder Aufträgen abgefragt wird. Irgendwann sollten Sie alte Momentaufnahmen jedoch bereinigen. Verwenden Sie dazu den Befehl VACUUM:

VACUUM people_10m

Einzelheiten zur effektiven Verwendung von VACUUM finden Sie unter Löschen ungenutzter Datendateien mit VACUUM.