Delta Lake-Schnellstart

Die Schnellstartanleitung zu Delta Lake enthält eine Übersicht über die Grundlagen zur Verwendung von Delta Lake. Hier erfahren Sie, wie Sie Daten in eine Delta-Tabelle laden, die Tabelle ändern, die Tabelle lesen, den Tabellenverlauf anzeigen und die Tabelle optimieren.

Eine Demonstration einiger Features, die in diesem Artikel (und vielen weiteren) beschrieben werden, finden Sie in diesem YouTube-Video (9 Minuten).

Sie können den Python-, R-, Scala- und SQL-Beispielcode in diesem Artikel in einem Notebook ausführen, das an einen Azure Databricks-Cluster angefügt ist. Sie können den SQL-Code in diesem Artikel auch in einer Abfrage ausführen, die einem SQL-Endpunkt in Databricks SQL zugeordnet ist.

Informationen zu vorhandenen Azure Databricks-Notebooks zur Veranschaulichung dieser Features finden Sie unter Einführungsnotebooks.

Erstellen einer Tabelle

Zum Erstellen einer Delta-Tabelle können Sie vorhandenen Apache Spark-SQL-Code verwenden und das Schreibformat von parquet, csv, json usw. in delta ändern.

Bei allen Dateitypen werden die Dateien in einen Datenrahmen mit dem entsprechenden Eingabeformat (z. B. parquet, csv, json usw.) gelesen und anschließend im Delta-Format geschrieben. Im folgenden Codebeispiel befinden sich die Eingabedateien bereits im Delta-Format in Azure Databricks-Datasets. Mit diesem Code werden die Daten im Delta-Format im Databricks File System (DBFS) an dem durch save_path angegebenen Speicherort gespeichert.

Python

# Define the input and output formats and paths and the table name.
read_format = 'delta'
write_format = 'delta'
load_path = '/databricks-datasets/learning-spark-v2/people/people-10m.delta'
save_path = '/tmp/delta/people-10m'
table_name = 'default.people10m'

# Load the data from its source.
people = spark \
  .read \
  .format(read_format) \
  .load(load_path)

# Write the data to its target.
people.write \
  .format(write_format) \
  .save(save_path)

# Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

R

library(SparkR)
sparkR.session()

# Define the input and output formats and paths and the table name.
read_format = "delta"
write_format = "delta"
load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
save_path = "/tmp/delta/people-10m/"
table_name = "default.people10m"

# Load the data from its source.
people = read.df(load_path, source = read_format)

# Write the data to its target.
write.df(people, source = write_format, path = save_path)

# Create the table.
sql(paste("CREATE TABLE ", table_name, " USING DELTA LOCATION '", save_path, "'", sep = ""))

Scala

// Define the input and output formats and paths and the table name.
val read_format = "delta"
val write_format = "delta"
val load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
val save_path = "/tmp/delta/people-10m"
val table_name = "default.people10m"

// Load the data from its source.
val people = spark
  .read
  .format(read_format)
  .load(load_path)

// Write the data to its target.
people.write
  .format(write_format)
  .save(save_path)

// Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

SQL

-- The path for LOCATION must already exist
-- and must be in Delta format.

CREATE TABLE default.people10m
USING DELTA
LOCATION '/tmp/delta/people-10m'

Bei den vorherigen Vorgängen wird eine neue nicht verwaltete Tabelle mithilfe des Schemas erstellt, das aus den Daten abgeleitet wurde. Bei nicht verwalteten Tabellen legen Sie den Speicherort der Daten fest. In Azure Databricks werden Name und Speicherort der Tabelle nachverfolgt. Informationen zu den verfügbaren Optionen beim Erstellen einer Delta-Tabelle finden Sie unter Erstellen einer Tabelle und unter Schreiben in eine Tabelle.

Wenn Ihre Quelldateien im Parquet-Format vorliegen, können Sie die CONVERT TO DELTA-Anweisung zum Konvertieren vorhandener Dateien verwenden. Wenn die entsprechende Tabelle nicht verwaltet wird, bleibt die Tabelle nach der Konvertierung nicht verwaltet:

CONVERT TO DELTA parquet.`/tmp/delta/people-10m`

Wenn Sie eine neue verwaltete Tabelle erstellen möchten, können Sie die CREATE TABLE-Anweisung verwenden, um den Tabellennamen anzugeben. Danach können Sie Daten in die Tabelle laden. Alternativ können Sie die saveAsTable-Methode in Python, R oder Scala verwenden. Beispiel:

Python

tableName = 'people10m'
sourceType = 'delta'
loadPath = '/databricks-datasets/learning-spark-v2/people/people-10m.delta'

people = spark \
  .read \
  .format(sourceType) \
  .load(loadPath)

people.write \
  .format(sourceType) \
  .saveAsTable(tableName)

display(spark.sql("SELECT * FROM " + tableName))

R

library(SparkR)
sparkR.session()

tableName = "people10m"
sourceType = "delta"
loadPath = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"

people = read.df(
  path = loadPath,
  source = sourceType
)

saveAsTable(
  df = people,
  source = sourceType,
  tableName = tableName
)

display(sql(paste("SELECT * FROM ", tableName, sep="")))

Scala

val tableName = "people10m"
val sourceType = "delta"
val loadPath = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"

val people = spark
  .read
  .format(sourceType)
  .load(loadPath)

people.write
  .format(sourceType)
  .saveAsTable(tableName)

display(spark.sql("SELECT * FROM " + tableName))

SQL

CREATE TABLE people10m USING DELTA AS
SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

SELECT * FROM people10m;

Bei verwalteten Tabellen bestimmt Azure Databricks den Speicherort für die Daten. Zum Abrufen des Speicherorts können Sie die DESCRIBE DETAIL-Anweisung verwenden. Beispiel:

Python

display(spark.sql('DESCRIBE DETAIL people10m'))

R

display(sql("DESCRIBE DETAIL people10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people10m"))

SQL

DESCRIBE DETAIL people10m;

Weitere Informationen hierzu finden Sie auch unter Erstellen einer Tabelle und unter Steuern des Datenspeicherorts.

Partitionieren von Daten

Abfragen mit Prädikaten, die die Partitionsspalten betreffen, können Sie durch Partitionieren von der Daten beschleunigen. Das folgende Codebeispiel ähnelt dem in Erstellen einer Tabelle, aber in diesem Beispiel werden die Daten partitioniert.

Python

# Define the input and output formats and paths and the table name.
read_format = 'delta'
write_format = 'delta'
load_path = '/databricks-datasets/learning-spark-v2/people/people-10m.delta'
partition_by = 'gender'
save_path = '/tmp/delta/people-10m'
table_name = 'default.people10m'

# Load the data from its source.
people = spark \
  .read \
  .format(read_format) \
  .load(load_path)

# Write the data to its target.
people.write \
  .partitionBy(partition_by) \
  .format(write_format) \
  .save(save_path)

# Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

Wenn Sie bereits das Python-Codebeispiel unter Erstellen einer Tabelle ausgeführt haben, müssen Sie zuerst die vorhandene Tabelle und die gespeicherten Daten löschen:

# Define the table name and the output path.
table_name = 'default.people10m'
save_path = '/tmp/delta/people-10m'

# Delete the table.
spark.sql("DROP TABLE " + table_name)

# Delete the saved data.
dbutils.fs.rm(save_path, True)

R

library(SparkR)
sparkR.session()

# Define the input and output formats and paths and the table name.
read_format = "delta"
write_format = "delta"
load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
partition_by = "gender"
save_path = "/tmp/delta/people-10m/"
table_name = "default.people10m"

# Load the data from its source.
people = read.df(load_path, source = read_format)

# Write the data to its target.
write.df(people, source = write_format, partitionBy = partition_by, path = save_path)

# Create the table.
sql(paste("CREATE TABLE ", table_name, " USING DELTA LOCATION '", save_path, "'", sep = ""))

Wenn Sie bereits das R-Codebeispiel unter Erstellen einer Tabelle ausgeführt haben, müssen Sie zuerst die vorhandene Tabelle und die gespeicherten Daten löschen:

library(SparkR)
sparkR.session()

# Define the table name and the output path.
table_name = "default.people10m"
save_path = "/tmp/delta/people-10m"

# Delete the table.
sql(paste("DROP TABLE ", table_name, sep = ""))

# Delete the saved data.
dbutils.fs.rm(save_path, TRUE)

Scala

// Define the input and output formats and paths and the table name.
val read_format = "delta"
val write_format = "delta"
val load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
val partition_by = "gender"
val save_path = "/tmp/delta/people-10m"
val table_name = "default.people10m"

// Load the data from its source.
val people = spark
  .read
  .format(read_format)
  .load(load_path)

// Write the data to its target.
people.write
  .partitionBy(partition_by)
  .format(write_format)
  .save(save_path)

// Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

Wenn Sie bereits das Scala-Codebeispiel unter Erstellen einer Tabelle ausgeführt haben, müssen Sie zuerst die vorhandene Tabelle und die gespeicherten Daten löschen:

// Define the table name and the output path.
val table_name = "default.people10m"
val save_path = "/tmp/delta/people-10m"

// Delete the table.
spark.sql("DROP TABLE " + table_name)

// Delete the saved data.
dbutils.fs.rm(save_path, true)

SQL

Wenn Sie beim Erstellen einer Delta-Tabelle mit SQL Daten partitionieren möchten, geben Sie die PARTITIONED BY-Spalten an.

-- The path in LOCATION must already exist
-- and must be in Delta format.

CREATE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)
USING DELTA
PARTITIONED BY (gender)
LOCATION '/tmp/delta/people-10m'

Wenn Sie bereits das SQL-Codebeispiel unter Erstellen einer Tabelle ausgeführt haben, müssen Sie zuerst die vorhandene Tabelle löschen:

DROP TABLE default.people10m

Ändern einer Tabelle

Delta Lake unterstützt zahlreiche Vorgänge zum Ändern von Tabellen.

Streamschreibvorgänge in einer Tabelle

Daten können mithilfe von strukturiertem Streaming in eine Delta-Tabelle geschrieben werden. Durch das Delta Lake-Transaktionsprotokoll wird eine Exactly Once-Verarbeitung garantiert, auch wenn parallel andere Datenströme oder Batchabfragen für die Tabelle aktiv sind. Datenströme werden standardmäßig im Anfügemodus ausgeführt, wodurch der Tabelle neue Datensätze hinzugefügt werden.

Im folgenden Codebeispiel wird das strukturierte Streaming gestartet. Dabei wird der in json_read_path angegebene DBFS-Speicherort überwacht und nach JSON-Dateien gesucht, die in diesen Speicherort hochgeladen werden. Da beim strukturierter Streaming ein Dateiupload erkannt wird, werden die Daten mithilfe des in read_schema angegebenen Schemas in den in save_path angegebenen DBFS-Speicherort geschrieben. Beim strukturierten Streaming wird die Überwachung hochgeladener Dateien fortgesetzt, bis der Code beendet wird. Beim strukturierten Streaming wird der in checkpoint_path angegebene DBFS-Speicherort verwendet, um sicherzustellen, dass hochgeladene Dateien nur einmal ausgewertet werden.

Python

# Define the schema and the input, checkpoint, and output paths.
read_schema = ("id int, " +
               "firstName string, " +
               "middleName string, " +
               "lastName string, " +
               "gender string, " +
               "birthDate timestamp, " +
               "ssn string, " +
               "salary int")
json_read_path = '/FileStore/streaming-uploads/people-10m'
checkpoint_path = '/tmp/delta/people-10m/checkpoints'
save_path = '/tmp/delta/people-10m'

people_stream = (spark
  .readStream
  .schema(read_schema)
  .option('maxFilesPerTrigger', 1)
  .option('multiline', True)
  .format("json")
  .load(json_read_path)
)

(people_stream.writeStream
  .format('delta')
  .outputMode('append')
  .option('checkpointLocation', checkpoint_path)
  .start(save_path)
)

R

library(SparkR)
sparkR.session()

# Define the schema and the input, checkpoint, and output paths.
read_schema = "id int, firstName string, middleName string, lastName string, gender string, birthDate timestamp, ssn string, salary int"
json_read_path = "/FileStore/streaming-uploads/people-10m"
checkpoint_path = "/tmp/delta/people-10m/checkpoints"
save_path = "/tmp/delta/people-10m"

people_stream = read.stream(
  "json",
  path = json_read_path,
  schema = read_schema,
  multiline = TRUE,
  maxFilesPerTrigger = 1
)

write.stream(
  people_stream,
  path = save_path,
  mode = "append",
  checkpointLocation = checkpoint_path
)

Scala

// Define the schema and the input, checkpoint, and output paths.
val read_schema = ("id int, " +
                  "firstName string, " +
                  "middleName string, " +
                  "lastName string, " +
                  "gender string, " +
                  "birthDate timestamp, " +
                  "ssn string, " +
                  "salary int")
val json_read_path = "/FileStore/streaming-uploads/people-10m"
val checkpoint_path = "/tmp/delta/people-10m/checkpoints"
val save_path = "/tmp/delta/people-10m"

val people_stream = (spark
  .readStream
  .schema(read_schema)
  .option("maxFilesPerTrigger", 1)
  .option("multiline", true)
  .format("json")
  .load(json_read_path))

people_stream.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", checkpoint_path)
  .start(save_path)

Zum Testen dieses Verhaltens können Sie die folgende JSON-Datei in den in json_read_path angegebenen Speicherort hochladen und den Speicherort anschließend in save_path abfragen, um die beim strukturierten Streaming geschriebenen Daten anzuzeigen.

[
  {
    "id": 10000021,
    "firstName": "Joe",
    "middleName": "Alexander",
    "lastName": "Smith",
    "gender": "M",
    "birthDate": 188712000,
    "ssn": "123-45-6789",
    "salary": 50000
  },
  {
    "id": 10000022,
    "firstName": "Mary",
    "middleName": "Jane",
    "lastName": "Doe",
    "gender": "F",
    "birthDate": "1968-10-27T04:00:00.000+000",
    "ssn": "234-56-7890",
    "salary": 75500
  }
]

Weitere Informationen zur Delta Lake-Integration für strukturiertes Streaming finden Sie unter Tabelle: Streaming für Lese- und Schreibvorgänge und Strukturiertes Streaming in der Produktion. Weitere Informationen finden Sie auch im Structured Streaming Programming Guide (Programmierleitfaden für strukturiertes Streaming) auf der Apache Spark-Website.

Batchupsertvorgänge

Zum Zusammenführen mehrerer Updates und Einfügungen in eine vorhandene Delta-Tabelle verwenden Sie die MERGE INTO-Anweisung. Mit der folgenden Anweisung werden beispielsweise Daten aus der Quelltabelle übernommen und mit der Delta-Zieltabelle zusammengeführt. Wenn in beiden Tabellen eine übereinstimmende Zeile vorhanden ist, wird die Datenspalte in Delta Lake mithilfe des angegebenen Ausdrucks aktualisiert. Wenn keine übereinstimmende Zeile vorhanden ist, wird in Delta Lake eine neue Zeile hinzugefügt. Dieser Vorgang wird als Upsert bezeichnet.

MERGE INTO default.people10m
USING default.people10m_upload
ON default.people10m.id = default.people10m_upload.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

Wenn Sie * angeben, werden alle Spalten in der Zieltabelle aktualisiert oder eingefügt. Dabei wird davon ausgegangen, dass die Quelltabelle dieselben Spalten wie die Zieltabelle enthält. Andernfalls wird durch die Abfrage einen Analysefehler ausgelöst.

Beim Ausführen eines INSERT-Vorgangs muss für jede Spalte in der Tabelle ein Wert angegeben werden (z. B. wenn im vorhandenen Dataset keine übereinstimmende Zeile vorhanden ist). Es müssen jedoch nicht alle Werte aktualisiert werden.

Erstellen Sie zum Ausprobieren des vorstehenden Beispiels die Quelltabelle wie folgt:

CREATE TABLE default.people10m_upload (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA

Füllen Sie zum Testen der WHEN MATCHED-Klausel die Quelltabelle mit den folgenden Zeilen aus, und führen Sie dann die vorhergehende MERGE INTO-Anweisung aus. Da beide Tabellen Zeilen enthalten, die mit der ON-Klausel übereinstimmen, werden die übereinstimmenden Zeilen der Zieltabelle aktualisiert.

INSERT INTO default.people10m_upload VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000)

Fragen Sie die Tabelle ab, um die Ergebnisse anzuzeigen.

SELECT * FROM default.people10m WHERE id BETWEEN 9999998 AND 10000000 SORT BY id ASC

Füllen Sie zum Testen der WHEN NOT MATCHED-Klausel die Quelltabelle mit den folgenden Zeilen aus, und führen Sie dann die vorhergehende MERGE INTO-Anweisung aus. Da die folgenden Zeilen in der Zieltabelle nicht vorhanden sind, werden sie der Zieltabelle hinzugefügt.

INSERT INTO default.people10m_upload VALUES
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900)

Fragen Sie die Tabelle ab, um die Ergebnisse anzuzeigen.

SELECT * FROM default.people10m WHERE id BETWEEN 20000001 AND 20000003 SORT BY id ASC

Übergeben Sie die Anweisung zum Ausführen einer der vorhergehenden SQL-Anweisungen in Python, R oder Scala als Zeichenfolgenargument an die spark.sql-Funktion in Python oder Scala oder die sql-Funktion in R.

Lesen einer Tabelle

In diesem Abschnitt

Auf Daten in Delta-Tabellen können Sie zugreifen, indem Sie den Pfad in DBFS ("/tmp/delta/people-10m") oder den Tabellennamen ("default.people10m") angeben:

Python

people = spark.read.format('delta').load('/tmp/delta/people-10m')

display(people)

oder

people = spark.table('default.people10m')

display(people)

R

library(SparkR)
sparkR.session()

people = read.df(path = "/tmp/delta/people-10m", source = "delta")

display(people)

oder

library(SparkR)
sparkR.session()

people = tableToDF("default.people10m")

display(people)

Scala

val people = spark.read.format("delta").load("/tmp/delta/people-10m")

display(people)

oder

val people = spark.table("default.people10m")

display(people)

SQL

SELECT * FROM delta.`/tmp/delta/people-10m`

oder

SELECT * FROM default.people10m

Anzeigen des Tabellenverlaufs

Verwenden Sie zum Anzeigen des Verlaufs einer Tabelle die DESCRIBE HISTORY-Anweisung, mit für jeden Schreibvorgang in einer Tabelle Informationen zur Herkunft wie etwa zu Tabellenversion, Vorgang, Benutzer usw. bereitgestellt werden.

Abfragen einer früheren Version der Tabelle (Zeitreise)

Mit einer Delta Lake-Zeitreise können Sie eine ältere Momentaufnahme einer Delta-Tabelle abfragen.

Zum Abfragen einer älteren Version einer Tabelle geben Sie in einer SELECT-Anweisung eine Version oder einen Zeitstempel an. Verwenden Sie beispielsweise Folgendes, um Version 0 aus dem obigen Verlauf abzufragen:

Python

spark.sql('SELECT * FROM default.people10m VERSION AS OF 0')

oder

spark.sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")

R

library(SparkR)
sparkR.session()

sql("SELECT * FROM default.people10m VERSION AS OF 0")

oder

library(SparkR)
sparkR.session()

sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")

Scala

spark.sql("SELECT * FROM default.people10m VERSION AS OF 0")

oder

spark.sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")

SQL

SELECT * FROM default.people10m VERSION AS OF 0

oder

SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Als Zeitstempel werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert, z. B. "2019-01-01" und "2019-01-01'T'00:00:00.000Z".

Hinweis

Da Version 1 den Zeitstempel '2019-01-29 00:38:10' aufweist, können Sie zum Abfragen von Version 0 einen beliebigen Zeitstempel zwischen '2019-01-29 00:37:58' und einen '2019-01-29 00:38:09' verwenden.

Mit den DataFrameReader-Optionen können Sie beispielsweise in Python einen Datenrahmen aus einer Delta-Tabelle erstellen, die auf eine bestimmte Version der Tabelle festgelegt ist:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').load('/tmp/delta/people-10m')

display(df1)

oder

df2 = spark.read.format('delta').option('versionAsOf', 2).load('/tmp/delta/people-10m')

display(df2)

Einzelheiten finden Sie unter Abfragen einer älteren Momentaufnahme einer Tabelle (Zeitreise).

Optimieren einer Tabelle

Nachdem Sie mehrere Änderungen an einer Tabelle vorgenommen haben, verfügen Sie möglicherweise über viele kleine Dateien. Zur Beschleunigung von Leseabfragen können Sie OPTIMIZE verwenden, um kleine Dateien zu größeren zusammenzufassen:

Python

spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m`")

oder

spark.sql('OPTIMIZE default.people10m')

R

library(SparkR)
sparkR.session()

sql("OPTIMIZE delta.`/tmp/delta/people-10m`")

oder

library(SparkR)
sparkR.session()

sql("OPTIMIZE default.people10m")

Scala

spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m`")

oder

spark.sql("OPTIMIZE default.people10m")

SQL

OPTIMIZE delta.`/tmp/delta/people-10m`

oder

OPTIMIZE default.people10m

Z-Reihenfolge nach Spalten

Zur Verbesserung der Leseleistung können Sie verwandte Informationen in einer Dateiengruppe entsprechend der Z-Reihenfolge zusammenstellen. Diese Zusammenstellung wird von Delta Lake-Datensprungalgorithmen automatisch verwendet, um die Menge der zu lesenden Daten erheblich zu reduzieren. Wenn Daten in der Z-Reihenfolge sortiert werden sollen, geben Sie die Spalten, nach denen sortiert werden soll, in der ZORDER BY-Klausel an. Führen Sie beispielsweise zum Sortieren nach gender den folgenden Code aus:

Python

spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m` ZORDER BY (gender)")

oder

spark.sql('OPTIMIZE default.people10m ZORDER BY (gender)')

R

library(SparkR)
sparkR.session()

sql("OPTIMIZE delta.`/tmp/delta/people-10m` ZORDER BY (gender)")

oder

library(SparkR)
sparkR.session()

sql("OPTIMIZE default.people10m ZORDER BY (gender)")

Scala

spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m` ZORDER BY (gender)")

oder

spark.sql("OPTIMIZE default.people10m ZORDER BY (gender)")

SQL

OPTIMIZE delta.`/tmp/delta/people-10m`
  ZORDER BY (gender)

oder

OPTIMIZE default.people10m
  ZORDER BY (gender)

Die Optionen, die beim Ausführen von OPTIMIZE verfügbar sind, finden Sie unter Komprimierung (Behälter).

Bereinigen von Momentaufnahmen

Delta Lake ermöglicht die Momentaufnahmenisolation für Lesevorgänge, was bedeutet, dass OPTIMIZE auch dann sicher ausgeführt werden kann, wenn die Tabelle von anderen Benutzern oder Aufträgen abgefragt wird. Irgendwann sollten Sie alte Momentaufnahmen jedoch bereinigen. Verwenden Sie dazu den Befehl VACUUM:

Python

spark.sql('VACUUM default.people10m')

R

library(SparkR)
sparkR.session()

sql("VACUUM default.people10m")

Scala

spark.sql("VACUUM default.people10m")

SQL

VACUUM default.people10m

Das Alter der letzten beibehaltenen Momentaufnahme können Sie mithilfe der Option RETAIN <N> HOURS steuern:

Python

spark.sql('VACUUM default.people10m RETAIN 24 HOURS')

R

library(SparkR)
sparkR.session()

sql("VACUUM default.people10m RETAIN 24 HOURS")

Scala

spark.sql("VACUUM default.people10m RETAIN 24 HOURS")

SQL

VACUUM default.people10m RETAIN 24 HOURS

Ausführliche Informationen zur effektiven Verwendung von VACUUM finden Sie unter Entfernen von Dateien, auf die nicht mehr von einer Delta-Tabelle verwiesen wird.