Delta Lake-Schnellstart
Die Schnellstartanleitung zu Delta Lake enthält eine Übersicht über die Grundlagen zur Verwendung von Delta Lake. Hier erfahren Sie, wie Sie Daten in eine Delta-Tabelle laden, die Tabelle ändern, die Tabelle lesen, den Tabellenverlauf anzeigen und die Tabelle optimieren.
Eine Demonstration einiger Features, die in diesem Artikel (und vielen weiteren) beschrieben werden, finden Sie in diesem YouTube-Video (9 Minuten).
Sie können den Python-, R-, Scala- und SQL-Beispielcode in diesem Artikel in einem Notebook ausführen, das an einen Azure Databricks-Cluster angefügt ist. Sie können den SQL-Code in diesem Artikel auch in einer Abfrage ausführen, die einem SQL-Endpunkt in Databricks SQL zugeordnet ist.
Informationen zu vorhandenen Azure Databricks-Notebooks zur Veranschaulichung dieser Features finden Sie unter Einführungsnotebooks.
Erstellen einer Tabelle
Zum Erstellen einer Delta-Tabelle können Sie vorhandenen Apache Spark-SQL-Code verwenden und das Schreibformat von parquet
, csv
, json
usw. in delta
ändern.
Bei allen Dateitypen werden die Dateien in einen Datenrahmen mit dem entsprechenden Eingabeformat (z. B. parquet
, csv
, json
usw.) gelesen und anschließend im Delta-Format geschrieben. Im folgenden Codebeispiel befinden sich die Eingabedateien bereits im Delta-Format in Azure Databricks-Datasets. Mit diesem Code werden die Daten im Delta-Format im Databricks File System (DBFS) an dem durch save_path
angegebenen Speicherort gespeichert.
Python
# Define the input and output formats and paths and the table name.
read_format = 'delta'
write_format = 'delta'
load_path = '/databricks-datasets/learning-spark-v2/people/people-10m.delta'
save_path = '/tmp/delta/people-10m'
table_name = 'default.people10m'
# Load the data from its source.
people = spark \
.read \
.format(read_format) \
.load(load_path)
# Write the data to its target.
people.write \
.format(write_format) \
.save(save_path)
# Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")
R
library(SparkR)
sparkR.session()
# Define the input and output formats and paths and the table name.
read_format = "delta"
write_format = "delta"
load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
save_path = "/tmp/delta/people-10m/"
table_name = "default.people10m"
# Load the data from its source.
people = read.df(load_path, source = read_format)
# Write the data to its target.
write.df(people, source = write_format, path = save_path)
# Create the table.
sql(paste("CREATE TABLE ", table_name, " USING DELTA LOCATION '", save_path, "'", sep = ""))
Scala
// Define the input and output formats and paths and the table name.
val read_format = "delta"
val write_format = "delta"
val load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
val save_path = "/tmp/delta/people-10m"
val table_name = "default.people10m"
// Load the data from its source.
val people = spark
.read
.format(read_format)
.load(load_path)
// Write the data to its target.
people.write
.format(write_format)
.save(save_path)
// Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")
SQL
-- The path for LOCATION must already exist
-- and must be in Delta format.
CREATE TABLE default.people10m
USING DELTA
LOCATION '/tmp/delta/people-10m'
Bei den vorherigen Vorgängen wird eine neue nicht verwaltete Tabelle mithilfe des Schemas erstellt, das aus den Daten abgeleitet wurde. Bei nicht verwalteten Tabellen legen Sie den Speicherort der Daten fest. In Azure Databricks werden Name und Speicherort der Tabelle nachverfolgt. Informationen zu den verfügbaren Optionen beim Erstellen einer Delta-Tabelle finden Sie unter Erstellen einer Tabelle und unter Schreiben in eine Tabelle.
Wenn Ihre Quelldateien im Parquet-Format vorliegen, können Sie die CONVERT TO DELTA-Anweisung zum Konvertieren vorhandener Dateien verwenden. Wenn die entsprechende Tabelle nicht verwaltet wird, bleibt die Tabelle nach der Konvertierung nicht verwaltet:
CONVERT TO DELTA parquet.`/tmp/delta/people-10m`
Wenn Sie eine neue verwaltete Tabelle erstellen möchten, können Sie die CREATE TABLE-Anweisung verwenden, um den Tabellennamen anzugeben. Danach können Sie Daten in die Tabelle laden. Alternativ können Sie die saveAsTable
-Methode in Python, R oder Scala verwenden. Beispiel:
Python
tableName = 'people10m'
sourceType = 'delta'
loadPath = '/databricks-datasets/learning-spark-v2/people/people-10m.delta'
people = spark \
.read \
.format(sourceType) \
.load(loadPath)
people.write \
.format(sourceType) \
.saveAsTable(tableName)
display(spark.sql("SELECT * FROM " + tableName))
R
library(SparkR)
sparkR.session()
tableName = "people10m"
sourceType = "delta"
loadPath = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
people = read.df(
path = loadPath,
source = sourceType
)
saveAsTable(
df = people,
source = sourceType,
tableName = tableName
)
display(sql(paste("SELECT * FROM ", tableName, sep="")))
Scala
val tableName = "people10m"
val sourceType = "delta"
val loadPath = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
val people = spark
.read
.format(sourceType)
.load(loadPath)
people.write
.format(sourceType)
.saveAsTable(tableName)
display(spark.sql("SELECT * FROM " + tableName))
SQL
CREATE TABLE people10m USING DELTA AS
SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
SELECT * FROM people10m;
Bei verwalteten Tabellen bestimmt Azure Databricks den Speicherort für die Daten. Zum Abrufen des Speicherorts können Sie die DESCRIBE DETAIL-Anweisung verwenden. Beispiel:
Python
display(spark.sql('DESCRIBE DETAIL people10m'))
R
display(sql("DESCRIBE DETAIL people10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people10m"))
SQL
DESCRIBE DETAIL people10m;
Weitere Informationen hierzu finden Sie auch unter Erstellen einer Tabelle und unter Steuern des Datenspeicherorts.
Partitionieren von Daten
Abfragen mit Prädikaten, die die Partitionsspalten betreffen, können Sie durch Partitionieren von der Daten beschleunigen. Das folgende Codebeispiel ähnelt dem in Erstellen einer Tabelle, aber in diesem Beispiel werden die Daten partitioniert.
Python
# Define the input and output formats and paths and the table name.
read_format = 'delta'
write_format = 'delta'
load_path = '/databricks-datasets/learning-spark-v2/people/people-10m.delta'
partition_by = 'gender'
save_path = '/tmp/delta/people-10m'
table_name = 'default.people10m'
# Load the data from its source.
people = spark \
.read \
.format(read_format) \
.load(load_path)
# Write the data to its target.
people.write \
.partitionBy(partition_by) \
.format(write_format) \
.save(save_path)
# Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")
Wenn Sie bereits das Python-Codebeispiel unter Erstellen einer Tabelle ausgeführt haben, müssen Sie zuerst die vorhandene Tabelle und die gespeicherten Daten löschen:
# Define the table name and the output path.
table_name = 'default.people10m'
save_path = '/tmp/delta/people-10m'
# Delete the table.
spark.sql("DROP TABLE " + table_name)
# Delete the saved data.
dbutils.fs.rm(save_path, True)
R
library(SparkR)
sparkR.session()
# Define the input and output formats and paths and the table name.
read_format = "delta"
write_format = "delta"
load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
partition_by = "gender"
save_path = "/tmp/delta/people-10m/"
table_name = "default.people10m"
# Load the data from its source.
people = read.df(load_path, source = read_format)
# Write the data to its target.
write.df(people, source = write_format, partitionBy = partition_by, path = save_path)
# Create the table.
sql(paste("CREATE TABLE ", table_name, " USING DELTA LOCATION '", save_path, "'", sep = ""))
Wenn Sie bereits das R-Codebeispiel unter Erstellen einer Tabelle ausgeführt haben, müssen Sie zuerst die vorhandene Tabelle und die gespeicherten Daten löschen:
library(SparkR)
sparkR.session()
# Define the table name and the output path.
table_name = "default.people10m"
save_path = "/tmp/delta/people-10m"
# Delete the table.
sql(paste("DROP TABLE ", table_name, sep = ""))
# Delete the saved data.
dbutils.fs.rm(save_path, TRUE)
Scala
// Define the input and output formats and paths and the table name.
val read_format = "delta"
val write_format = "delta"
val load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
val partition_by = "gender"
val save_path = "/tmp/delta/people-10m"
val table_name = "default.people10m"
// Load the data from its source.
val people = spark
.read
.format(read_format)
.load(load_path)
// Write the data to its target.
people.write
.partitionBy(partition_by)
.format(write_format)
.save(save_path)
// Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")
Wenn Sie bereits das Scala-Codebeispiel unter Erstellen einer Tabelle ausgeführt haben, müssen Sie zuerst die vorhandene Tabelle und die gespeicherten Daten löschen:
// Define the table name and the output path.
val table_name = "default.people10m"
val save_path = "/tmp/delta/people-10m"
// Delete the table.
spark.sql("DROP TABLE " + table_name)
// Delete the saved data.
dbutils.fs.rm(save_path, true)
SQL
Wenn Sie beim Erstellen einer Delta-Tabelle mit SQL Daten partitionieren möchten, geben Sie die PARTITIONED BY
-Spalten an.
-- The path in LOCATION must already exist
-- and must be in Delta format.
CREATE TABLE default.people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
USING DELTA
PARTITIONED BY (gender)
LOCATION '/tmp/delta/people-10m'
Wenn Sie bereits das SQL-Codebeispiel unter Erstellen einer Tabelle ausgeführt haben, müssen Sie zuerst die vorhandene Tabelle löschen:
DROP TABLE default.people10m
Ändern einer Tabelle
Delta Lake unterstützt zahlreiche Vorgänge zum Ändern von Tabellen.
Streamschreibvorgänge in einer Tabelle
Daten können mithilfe von strukturiertem Streaming in eine Delta-Tabelle geschrieben werden. Durch das Delta Lake-Transaktionsprotokoll wird eine Exactly Once-Verarbeitung garantiert, auch wenn parallel andere Datenströme oder Batchabfragen für die Tabelle aktiv sind. Datenströme werden standardmäßig im Anfügemodus ausgeführt, wodurch der Tabelle neue Datensätze hinzugefügt werden.
Im folgenden Codebeispiel wird das strukturierte Streaming gestartet. Dabei wird der in json_read_path
angegebene DBFS-Speicherort überwacht und nach JSON-Dateien gesucht, die in diesen Speicherort hochgeladen werden. Da beim strukturierter Streaming ein Dateiupload erkannt wird, werden die Daten mithilfe des in read_schema
angegebenen Schemas in den in save_path
angegebenen DBFS-Speicherort geschrieben. Beim strukturierten Streaming wird die Überwachung hochgeladener Dateien fortgesetzt, bis der Code beendet wird. Beim strukturierten Streaming wird der in checkpoint_path
angegebene DBFS-Speicherort verwendet, um sicherzustellen, dass hochgeladene Dateien nur einmal ausgewertet werden.
Python
# Define the schema and the input, checkpoint, and output paths.
read_schema = ("id int, " +
"firstName string, " +
"middleName string, " +
"lastName string, " +
"gender string, " +
"birthDate timestamp, " +
"ssn string, " +
"salary int")
json_read_path = '/FileStore/streaming-uploads/people-10m'
checkpoint_path = '/tmp/delta/people-10m/checkpoints'
save_path = '/tmp/delta/people-10m'
people_stream = (spark
.readStream
.schema(read_schema)
.option('maxFilesPerTrigger', 1)
.option('multiline', True)
.format("json")
.load(json_read_path)
)
(people_stream.writeStream
.format('delta')
.outputMode('append')
.option('checkpointLocation', checkpoint_path)
.start(save_path)
)
R
library(SparkR)
sparkR.session()
# Define the schema and the input, checkpoint, and output paths.
read_schema = "id int, firstName string, middleName string, lastName string, gender string, birthDate timestamp, ssn string, salary int"
json_read_path = "/FileStore/streaming-uploads/people-10m"
checkpoint_path = "/tmp/delta/people-10m/checkpoints"
save_path = "/tmp/delta/people-10m"
people_stream = read.stream(
"json",
path = json_read_path,
schema = read_schema,
multiline = TRUE,
maxFilesPerTrigger = 1
)
write.stream(
people_stream,
path = save_path,
mode = "append",
checkpointLocation = checkpoint_path
)
Scala
// Define the schema and the input, checkpoint, and output paths.
val read_schema = ("id int, " +
"firstName string, " +
"middleName string, " +
"lastName string, " +
"gender string, " +
"birthDate timestamp, " +
"ssn string, " +
"salary int")
val json_read_path = "/FileStore/streaming-uploads/people-10m"
val checkpoint_path = "/tmp/delta/people-10m/checkpoints"
val save_path = "/tmp/delta/people-10m"
val people_stream = (spark
.readStream
.schema(read_schema)
.option("maxFilesPerTrigger", 1)
.option("multiline", true)
.format("json")
.load(json_read_path))
people_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_path)
.start(save_path)
Zum Testen dieses Verhaltens können Sie die folgende JSON-Datei in den in json_read_path
angegebenen Speicherort hochladen und den Speicherort anschließend in save_path
abfragen, um die beim strukturierten Streaming geschriebenen Daten anzuzeigen.
[
{
"id": 10000021,
"firstName": "Joe",
"middleName": "Alexander",
"lastName": "Smith",
"gender": "M",
"birthDate": 188712000,
"ssn": "123-45-6789",
"salary": 50000
},
{
"id": 10000022,
"firstName": "Mary",
"middleName": "Jane",
"lastName": "Doe",
"gender": "F",
"birthDate": "1968-10-27T04:00:00.000+000",
"ssn": "234-56-7890",
"salary": 75500
}
]
Weitere Informationen zur Delta Lake-Integration für strukturiertes Streaming finden Sie unter Tabelle: Streaming für Lese- und Schreibvorgänge und Strukturiertes Streaming in der Produktion. Weitere Informationen finden Sie auch im Structured Streaming Programming Guide (Programmierleitfaden für strukturiertes Streaming) auf der Apache Spark-Website.
Batchupsertvorgänge
Zum Zusammenführen mehrerer Updates und Einfügungen in eine vorhandene Delta-Tabelle verwenden Sie die MERGE INTO-Anweisung. Mit der folgenden Anweisung werden beispielsweise Daten aus der Quelltabelle übernommen und mit der Delta-Zieltabelle zusammengeführt. Wenn in beiden Tabellen eine übereinstimmende Zeile vorhanden ist, wird die Datenspalte in Delta Lake mithilfe des angegebenen Ausdrucks aktualisiert. Wenn keine übereinstimmende Zeile vorhanden ist, wird in Delta Lake eine neue Zeile hinzugefügt. Dieser Vorgang wird als Upsert bezeichnet.
MERGE INTO default.people10m
USING default.people10m_upload
ON default.people10m.id = default.people10m_upload.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
Wenn Sie *
angeben, werden alle Spalten in der Zieltabelle aktualisiert oder eingefügt. Dabei wird davon ausgegangen, dass die Quelltabelle dieselben Spalten wie die Zieltabelle enthält. Andernfalls wird durch die Abfrage einen Analysefehler ausgelöst.
Beim Ausführen eines INSERT
-Vorgangs muss für jede Spalte in der Tabelle ein Wert angegeben werden (z. B. wenn im vorhandenen Dataset keine übereinstimmende Zeile vorhanden ist). Es müssen jedoch nicht alle Werte aktualisiert werden.
Erstellen Sie zum Ausprobieren des vorstehenden Beispiels die Quelltabelle wie folgt:
CREATE TABLE default.people10m_upload (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
) USING DELTA
Füllen Sie zum Testen der WHEN MATCHED
-Klausel die Quelltabelle mit den folgenden Zeilen aus, und führen Sie dann die vorhergehende MERGE INTO
-Anweisung aus. Da beide Tabellen Zeilen enthalten, die mit der ON
-Klausel übereinstimmen, werden die übereinstimmenden Zeilen der Zieltabelle aktualisiert.
INSERT INTO default.people10m_upload VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000)
Fragen Sie die Tabelle ab, um die Ergebnisse anzuzeigen.
SELECT * FROM default.people10m WHERE id BETWEEN 9999998 AND 10000000 SORT BY id ASC
Füllen Sie zum Testen der WHEN NOT MATCHED
-Klausel die Quelltabelle mit den folgenden Zeilen aus, und führen Sie dann die vorhergehende MERGE INTO
-Anweisung aus. Da die folgenden Zeilen in der Zieltabelle nicht vorhanden sind, werden sie der Zieltabelle hinzugefügt.
INSERT INTO default.people10m_upload VALUES
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900)
Fragen Sie die Tabelle ab, um die Ergebnisse anzuzeigen.
SELECT * FROM default.people10m WHERE id BETWEEN 20000001 AND 20000003 SORT BY id ASC
Übergeben Sie die Anweisung zum Ausführen einer der vorhergehenden SQL-Anweisungen in Python, R oder Scala als Zeichenfolgenargument an die spark.sql
-Funktion in Python oder Scala oder die sql
-Funktion in R.
Lesen einer Tabelle
In diesem Abschnitt
Auf Daten in Delta-Tabellen können Sie zugreifen, indem Sie den Pfad in DBFS ("/tmp/delta/people-10m"
) oder den Tabellennamen ("default.people10m"
) angeben:
Python
people = spark.read.format('delta').load('/tmp/delta/people-10m')
display(people)
oder
people = spark.table('default.people10m')
display(people)
R
library(SparkR)
sparkR.session()
people = read.df(path = "/tmp/delta/people-10m", source = "delta")
display(people)
oder
library(SparkR)
sparkR.session()
people = tableToDF("default.people10m")
display(people)
Scala
val people = spark.read.format("delta").load("/tmp/delta/people-10m")
display(people)
oder
val people = spark.table("default.people10m")
display(people)
SQL
SELECT * FROM delta.`/tmp/delta/people-10m`
oder
SELECT * FROM default.people10m
Anzeigen des Tabellenverlaufs
Verwenden Sie zum Anzeigen des Verlaufs einer Tabelle die DESCRIBE HISTORY-Anweisung, mit für jeden Schreibvorgang in einer Tabelle Informationen zur Herkunft wie etwa zu Tabellenversion, Vorgang, Benutzer usw. bereitgestellt werden.
Abfragen einer früheren Version der Tabelle (Zeitreise)
Mit einer Delta Lake-Zeitreise können Sie eine ältere Momentaufnahme einer Delta-Tabelle abfragen.
Zum Abfragen einer älteren Version einer Tabelle geben Sie in einer SELECT
-Anweisung eine Version oder einen Zeitstempel an. Verwenden Sie beispielsweise Folgendes, um Version 0 aus dem obigen Verlauf abzufragen:
Python
spark.sql('SELECT * FROM default.people10m VERSION AS OF 0')
oder
spark.sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")
R
library(SparkR)
sparkR.session()
sql("SELECT * FROM default.people10m VERSION AS OF 0")
oder
library(SparkR)
sparkR.session()
sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")
Scala
spark.sql("SELECT * FROM default.people10m VERSION AS OF 0")
oder
spark.sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")
SQL
SELECT * FROM default.people10m VERSION AS OF 0
oder
SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Als Zeitstempel werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert, z. B. "2019-01-01"
und "2019-01-01'T'00:00:00.000Z"
.
Hinweis
Da Version 1 den Zeitstempel '2019-01-29 00:38:10'
aufweist, können Sie zum Abfragen von Version 0 einen beliebigen Zeitstempel zwischen '2019-01-29 00:37:58'
und einen '2019-01-29 00:38:09'
verwenden.
Mit den DataFrameReader-Optionen können Sie beispielsweise in Python einen Datenrahmen aus einer Delta-Tabelle erstellen, die auf eine bestimmte Version der Tabelle festgelegt ist:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').load('/tmp/delta/people-10m')
display(df1)
oder
df2 = spark.read.format('delta').option('versionAsOf', 2).load('/tmp/delta/people-10m')
display(df2)
Einzelheiten finden Sie unter Abfragen einer älteren Momentaufnahme einer Tabelle (Zeitreise).
Optimieren einer Tabelle
Nachdem Sie mehrere Änderungen an einer Tabelle vorgenommen haben, verfügen Sie möglicherweise über viele kleine Dateien. Zur Beschleunigung von Leseabfragen können Sie OPTIMIZE
verwenden, um kleine Dateien zu größeren zusammenzufassen:
Python
spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m`")
oder
spark.sql('OPTIMIZE default.people10m')
R
library(SparkR)
sparkR.session()
sql("OPTIMIZE delta.`/tmp/delta/people-10m`")
oder
library(SparkR)
sparkR.session()
sql("OPTIMIZE default.people10m")
Scala
spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m`")
oder
spark.sql("OPTIMIZE default.people10m")
SQL
OPTIMIZE delta.`/tmp/delta/people-10m`
oder
OPTIMIZE default.people10m
Z-Reihenfolge nach Spalten
Zur Verbesserung der Leseleistung können Sie verwandte Informationen in einer Dateiengruppe entsprechend der Z-Reihenfolge zusammenstellen. Diese Zusammenstellung wird von Delta Lake-Datensprungalgorithmen automatisch verwendet, um die Menge der zu lesenden Daten erheblich zu reduzieren. Wenn Daten in der Z-Reihenfolge sortiert werden sollen, geben Sie die Spalten, nach denen sortiert werden soll, in der ZORDER BY
-Klausel an. Führen Sie beispielsweise zum Sortieren nach gender
den folgenden Code aus:
Python
spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m` ZORDER BY (gender)")
oder
spark.sql('OPTIMIZE default.people10m ZORDER BY (gender)')
R
library(SparkR)
sparkR.session()
sql("OPTIMIZE delta.`/tmp/delta/people-10m` ZORDER BY (gender)")
oder
library(SparkR)
sparkR.session()
sql("OPTIMIZE default.people10m ZORDER BY (gender)")
Scala
spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m` ZORDER BY (gender)")
oder
spark.sql("OPTIMIZE default.people10m ZORDER BY (gender)")
SQL
OPTIMIZE delta.`/tmp/delta/people-10m`
ZORDER BY (gender)
oder
OPTIMIZE default.people10m
ZORDER BY (gender)
Die Optionen, die beim Ausführen von OPTIMIZE
verfügbar sind, finden Sie unter Komprimierung (Behälter).
Bereinigen von Momentaufnahmen
Delta Lake ermöglicht die Momentaufnahmenisolation für Lesevorgänge, was bedeutet, dass OPTIMIZE
auch dann sicher ausgeführt werden kann, wenn die Tabelle von anderen Benutzern oder Aufträgen abgefragt wird. Irgendwann sollten Sie alte Momentaufnahmen jedoch bereinigen. Verwenden Sie dazu den Befehl VACUUM
:
Python
spark.sql('VACUUM default.people10m')
R
library(SparkR)
sparkR.session()
sql("VACUUM default.people10m")
Scala
spark.sql("VACUUM default.people10m")
SQL
VACUUM default.people10m
Das Alter der letzten beibehaltenen Momentaufnahme können Sie mithilfe der Option RETAIN <N> HOURS
steuern:
Python
spark.sql('VACUUM default.people10m RETAIN 24 HOURS')
R
library(SparkR)
sparkR.session()
sql("VACUUM default.people10m RETAIN 24 HOURS")
Scala
spark.sql("VACUUM default.people10m RETAIN 24 HOURS")
SQL
VACUUM default.people10m RETAIN 24 HOURS
Ausführliche Informationen zur effektiven Verwendung von VACUUM
finden Sie unter Entfernen von Dateien, auf die nicht mehr von einer Delta-Tabelle verwiesen wird.