Kurz: Delta Lake

Tento kurz představuje běžné operace Delta Lake v Azure Databricks, včetně následujících:

Příklad kódu Python, R, Scala a SQL v tomto článku můžete spustit z poznámkového bloku připojeného ke clusteru Azure Databricks. Kód SQL v tomto článku můžete spustit také z dotazu přidruženého k SQL Warehouse v Databricks SQL.

Poznámka:

Některé z následujících příkladů kódu používají dvouúrovňovou notaci oboru názvů skládající se ze schématu (označovaného také jako databáze) a tabulky nebo zobrazení (například default.people10m). Pokud chcete tyto příklady použít s katalogem Unity, nahraďte dvouúrovňový obor názvů zápisem tříúrovňového oboru názvů Katalogu Unity, který se skládá z katalogu, schématu a tabulky nebo zobrazení (napříkladmain.default.people10m).

Vytvoření tabulky

Všechny tabulky vytvořené v Azure Databricks ve výchozím nastavení používají Delta Lake.

Poznámka:

Delta Lake je výchozí nastavení pro všechny příkazy pro čtení, zápisy a vytváření tabulek v Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Předchozí operace vytvoří novou spravovanou tabulku pomocí schématu odvozeného z dat. Informace o dostupných možnostech při vytváření tabulky Delta najdete v tématu CREATE TABLE.

U spravovaných tabulek Azure Databricks určuje umístění dat. K získání umístění můžete použít příkaz DESCRIBE DETAIL , například:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Někdy můžete chtít vytvořit tabulku zadáním schématu před vložením dat. Můžete to provést pomocí následujících příkazů SQL:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

Ve službě Databricks Runtime 13.3 LTS a vyšší můžete vytvořit CREATE TABLE LIKE novou prázdnou tabulku Delta, která duplikuje vlastnosti schématu a tabulky pro zdrojovou tabulku Delta. To může být užitečné zejména při propagaci tabulek z vývojového prostředí do produkčního prostředí, například v následujícím příkladu kódu:

CREATE TABLE prod.people10m LIKE dev.people10m

K vytváření tabulek můžete také použít DeltaTableBuilder rozhraní API v Delta Lake. V porovnání s rozhraními DATAFrameWriter API toto rozhraní API usnadňuje zadávání dalších informací, jako jsou komentáře ke sloupcům, vlastnosti tabulky a generované sloupce.

Důležité

Tato funkce je ve verzi Public Preview.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Přenesení do tabulky

Pokud chcete sloučit sadu aktualizací a vložení do existující tabulky Delta, použijte příkaz MERGE INTO . Následující příkaz například přebírá data ze zdrojové tabulky a slučuje je do cílové tabulky Delta. Pokud v obou tabulkách existuje odpovídající řádek, Delta Lake aktualizuje sloupec dat pomocí daného výrazu. Pokud neexistuje žádný odpovídající řádek, Delta Lake přidá nový řádek. Tato operace se označuje jako upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Pokud zadáte *, aktualizuje se nebo vloží všechny sloupce v cílové tabulce. Předpokládá se, že zdrojová tabulka má stejné sloupce jako sloupce v cílové tabulce, jinak dotaz vyvolá chybu analýzy.

Při provádění INSERT operace musíte zadat hodnotu pro každý sloupec v tabulce (například pokud v existující datové sadě není žádný odpovídající řádek). Nemusíte ale aktualizovat všechny hodnoty.

Pokud chcete zobrazit výsledky, zadejte dotaz na tabulku.

SELECT * FROM people_10m WHERE id >= 9999998

Čtení tabulky

K datům v tabulkách Delta přistupujete podle názvu tabulky nebo cesty k tabulce, jak je znázorněno v následujících příkladech:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Zápis do tabulky

Delta Lake používá pro zápis dat do tabulek standardní syntaxi.

Pokud chcete do existující tabulky Delta atomicky přidat nová data, použijte append režim jako v následujících příkladech:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Pokud chcete atomicky nahradit všechna data v tabulce, použijte overwrite režim jako v následujících příkladech:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Aktualizace tabulky

Data, která odpovídají predikátu v tabulce Delta, můžete aktualizovat. Pokud chcete například změnit zkratku ve sloupci z M sloupce nebo F na Male nebo Femalev tabulce s názvem people10m nebo cestou/tmp/delta/people-10m, můžete spustit následující příkaz:gender

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Odstranění z tabulky

Data, která odpovídají predikátu, můžete odebrat z tabulky Delta. Pokud například chcete odstranit všechny řádky odpovídající lidem, kteří mají hodnotu ve birthDate sloupci dříve1955, v tabulce s názvem people10m nebo cestou at /tmp/delta/people-10m, můžete spustit následující příkaz:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Důležité

delete odebere data z nejnovější verze tabulky Delta, ale neodebere je z fyzického úložiště, dokud se staré verze explicitně nevysadí. Podrobnosti najdete v vakuu .

Zobrazení historie tabulek

Pokud chcete zobrazit historii tabulky, použijte příkaz DESCRIBE HISTORY , který poskytuje informace o původu, včetně verze tabulky, operace, uživatele atd., pro každý zápis do tabulky.

DESCRIBE HISTORY people_10m

Dotazování na starší verzi tabulky (časová cesta)

Časová cesta Delta Lake umožňuje dotazovat starší snímek tabulky Delta.

Pokud chcete dotazovat starší verzi tabulky, zadejte v SELECT příkazu verzi nebo časové razítko. Pokud například chcete dotazovat verzi 0 z výše uvedené historie, použijte:

SELECT * FROM people_10m VERSION AS OF 0

nebo

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

U časových razítek se přijímají pouze řetězce data nebo časového razítka, "2019-01-01" například a "2019-01-01'T'00:00:00.000Z".

Možnosti třídy DataFrameReader umožňují vytvořit datový rámec z tabulky Delta, která je pevná pro konkrétní verzi tabulky, například v Pythonu:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

nebo alternativně:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Podrobnosti najdete v tématu Práce s historií tabulek Delta Lake.

Optimalizace tabulky

Po provedení několika změn v tabulce můžete mít velké množství malých souborů. Ke zlepšení rychlosti čtení dotazů můžete použít OPTIMIZE ke sbalení malých souborů do větších:

OPTIMIZE people_10m

Pořadí vykreslování podle sloupců

Pokud chcete zlepšit výkon čtení, můžete spolu vyhledat související informace ve stejné sadě souborů řazením Z. Tato společné umístění se automaticky používá algoritmy pro přeskakování dat Delta Lake k výraznému snížení množství dat, která je potřeba číst. Do dat objednávky Z zadáte sloupce, které se mají v klauzuli ZORDER BY objednat. Pokud například chcete co-najít gender, spusťte:

OPTIMIZE people_10m
ZORDER BY (gender)

Úplnou sadu možností dostupných při spuštění OPTIMIZEnajdete v tématu Komprimace datových souborů s optimalizací v Delta Lake.

Vyčištění snímků pomocí VACUUM

Delta Lake poskytuje izolaci snímků pro čtení, což znamená, že je bezpečné spustit OPTIMIZE i v případě, že se na tabulku dotazují jiní uživatelé nebo úlohy. Nakonec byste ale měli staré snímky vyčistit. Můžete to provést spuštěním VACUUM příkazu:

VACUUM people_10m

Podrobnosti o efektivním používání VACUUM najdete v tématu Odebrání nepoužívaných datových souborů pomocí vakua.