Kurz: Delta Lake
Tento kurz představuje běžné operace Delta Lake v Azure Databricks, včetně následujících:
- Vytvoření tabulky
- Upsertování do tabulky
- Čtení z tabulky
- Zobrazení historie tabulky
- Dotazování starší verze tabulky
- Optimalizace tabulky
- Přidání indexu Z-pořadí
- Úklid neodkazovaných souborů
Příklad kódu Python, R, Scala a SQL v tomto článku můžete spustit z poznámkového bloku připojeného ke clusteru Azure Databricks. Kód SQL v tomto článku můžete spustit také z dotazu přidruženého k SQL Warehouse v Databricks SQL.
Poznámka:
Některé z následujících příkladů kódu používají dvouúrovňovou notaci oboru názvů skládající se ze schématu (označovaného také jako databáze) a tabulky nebo zobrazení (například default.people10m
). Pokud chcete tyto příklady použít s katalogem Unity, nahraďte dvouúrovňový obor názvů zápisem tříúrovňového oboru názvů Katalogu Unity, který se skládá z katalogu, schématu a tabulky nebo zobrazení (napříkladmain.default.people10m
).
Vytvoření tabulky
Všechny tabulky vytvořené v Azure Databricks ve výchozím nastavení používají Delta Lake.
Poznámka:
Delta Lake je výchozí nastavení pro všechny příkazy pro čtení, zápisy a vytváření tabulek v Azure Databricks.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
Předchozí operace vytvoří novou spravovanou tabulku pomocí schématu odvozeného z dat. Informace o dostupných možnostech při vytváření tabulky Delta najdete v tématu CREATE TABLE.
U spravovaných tabulek Azure Databricks určuje umístění dat. K získání umístění můžete použít příkaz DESCRIBE DETAIL , například:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
Někdy můžete chtít vytvořit tabulku zadáním schématu před vložením dat. Můžete to provést pomocí následujících příkazů SQL:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
Ve službě Databricks Runtime 13.3 LTS a vyšší můžete vytvořit CREATE TABLE LIKE
novou prázdnou tabulku Delta, která duplikuje vlastnosti schématu a tabulky pro zdrojovou tabulku Delta. To může být užitečné zejména při propagaci tabulek z vývojového prostředí do produkčního prostředí, například v následujícím příkladu kódu:
CREATE TABLE prod.people10m LIKE dev.people10m
K vytváření tabulek můžete také použít DeltaTableBuilder
rozhraní API v Delta Lake. V porovnání s rozhraními DATAFrameWriter API toto rozhraní API usnadňuje zadávání dalších informací, jako jsou komentáře ke sloupcům, vlastnosti tabulky a generované sloupce.
Důležité
Tato funkce je ve verzi Public Preview.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Přenesení do tabulky
Pokud chcete sloučit sadu aktualizací a vložení do existující tabulky Delta, použijte příkaz MERGE INTO . Následující příkaz například přebírá data ze zdrojové tabulky a slučuje je do cílové tabulky Delta. Pokud v obou tabulkách existuje odpovídající řádek, Delta Lake aktualizuje sloupec dat pomocí daného výrazu. Pokud neexistuje žádný odpovídající řádek, Delta Lake přidá nový řádek. Tato operace se označuje jako upsert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Pokud zadáte *
, aktualizuje se nebo vloží všechny sloupce v cílové tabulce. Předpokládá se, že zdrojová tabulka má stejné sloupce jako sloupce v cílové tabulce, jinak dotaz vyvolá chybu analýzy.
Při provádění INSERT
operace musíte zadat hodnotu pro každý sloupec v tabulce (například pokud v existující datové sadě není žádný odpovídající řádek). Nemusíte ale aktualizovat všechny hodnoty.
Pokud chcete zobrazit výsledky, zadejte dotaz na tabulku.
SELECT * FROM people_10m WHERE id >= 9999998
Čtení tabulky
K datům v tabulkách Delta přistupujete podle názvu tabulky nebo cesty k tabulce, jak je znázorněno v následujících příkladech:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Zápis do tabulky
Delta Lake používá pro zápis dat do tabulek standardní syntaxi.
Pokud chcete do existující tabulky Delta atomicky přidat nová data, použijte append
režim jako v následujících příkladech:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
Pokud chcete atomicky nahradit všechna data v tabulce, použijte overwrite
režim jako v následujících příkladech:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Aktualizace tabulky
Data, která odpovídají predikátu v tabulce Delta, můžete aktualizovat. Pokud chcete například změnit zkratku ve sloupci z M
sloupce nebo F
na Male
nebo Female
v tabulce s názvem people10m
nebo cestou/tmp/delta/people-10m
, můžete spustit následující příkaz:gender
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Odstranění z tabulky
Data, která odpovídají predikátu, můžete odebrat z tabulky Delta. Pokud například chcete odstranit všechny řádky odpovídající lidem, kteří mají hodnotu ve birthDate
sloupci dříve1955
, v tabulce s názvem people10m
nebo cestou at /tmp/delta/people-10m
, můžete spustit následující příkaz:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Důležité
delete
odebere data z nejnovější verze tabulky Delta, ale neodebere je z fyzického úložiště, dokud se staré verze explicitně nevysadí. Podrobnosti najdete v vakuu .
Zobrazení historie tabulek
Pokud chcete zobrazit historii tabulky, použijte příkaz DESCRIBE HISTORY , který poskytuje informace o původu, včetně verze tabulky, operace, uživatele atd., pro každý zápis do tabulky.
DESCRIBE HISTORY people_10m
Dotazování na starší verzi tabulky (časová cesta)
Časová cesta Delta Lake umožňuje dotazovat starší snímek tabulky Delta.
Pokud chcete dotazovat starší verzi tabulky, zadejte v SELECT
příkazu verzi nebo časové razítko. Pokud například chcete dotazovat verzi 0 z výše uvedené historie, použijte:
SELECT * FROM people_10m VERSION AS OF 0
nebo
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
U časových razítek se přijímají pouze řetězce data nebo časového razítka, "2019-01-01"
například a "2019-01-01'T'00:00:00.000Z"
.
Možnosti třídy DataFrameReader umožňují vytvořit datový rámec z tabulky Delta, která je pevná pro konkrétní verzi tabulky, například v Pythonu:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
nebo alternativně:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
Podrobnosti najdete v tématu Práce s historií tabulek Delta Lake.
Optimalizace tabulky
Po provedení několika změn v tabulce můžete mít velké množství malých souborů. Ke zlepšení rychlosti čtení dotazů můžete použít OPTIMIZE
ke sbalení malých souborů do větších:
OPTIMIZE people_10m
Pořadí vykreslování podle sloupců
Pokud chcete zlepšit výkon čtení, můžete spolu vyhledat související informace ve stejné sadě souborů řazením Z. Tato společné umístění se automaticky používá algoritmy pro přeskakování dat Delta Lake k výraznému snížení množství dat, která je potřeba číst. Do dat objednávky Z zadáte sloupce, které se mají v klauzuli ZORDER BY
objednat. Pokud například chcete co-najít gender
, spusťte:
OPTIMIZE people_10m
ZORDER BY (gender)
Úplnou sadu možností dostupných při spuštění OPTIMIZE
najdete v tématu Komprimace datových souborů s optimalizací v Delta Lake.
Vyčištění snímků pomocí VACUUM
Delta Lake poskytuje izolaci snímků pro čtení, což znamená, že je bezpečné spustit OPTIMIZE
i v případě, že se na tabulku dotazují jiní uživatelé nebo úlohy. Nakonec byste ale měli staré snímky vyčistit. Můžete to provést spuštěním VACUUM
příkazu:
VACUUM people_10m
Podrobnosti o efektivním používání VACUUM
najdete v tématu Odebrání nepoužívaných datových souborů pomocí vakua.