Zelfstudie: Delta Lake

In deze zelfstudie worden algemene Delta Lake-bewerkingen in Azure Databricks geïntroduceerd, waaronder de volgende:

U kunt de voorbeeldcode Python, R, Scala en SQL in dit artikel uitvoeren vanuit een notebook dat is gekoppeld aan een Azure Databricks-cluster. U kunt de SQL-code in dit artikel ook uitvoeren vanuit een query die is gekoppeld aan een SQL-warehouse in Databricks SQL.

Notitie

Sommige van de volgende codevoorbeelden gebruiken een naamruimte-notatie op twee niveaus die bestaat uit een schema (ook wel een database genoemd) en een tabel of weergave (bijvoorbeeld default.people10m). Als u deze voorbeelden wilt gebruiken met Unity Catalog, vervangt u de naamruimte van twee niveaus door de naamruimte van Unity Catalog met drie niveaus die bestaat uit een catalogus, schema en tabel of weergave (bijvoorbeeld main.default.people10m).

Een tabel maken

Alle tabellen die in Azure Databricks zijn gemaakt, maken standaard gebruik van Delta Lake.

Notitie

Delta Lake is de standaardinstelling voor alle lees-, schrijf- en tabelopmaakopdrachten van Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Met de voorgaande bewerkingen maakt u een nieuwe beheerde tabel met behulp van het schema dat is afgeleid van de gegevens. Zie CREATE TABLE voor informatie over beschikbare opties wanneer u een Delta-tabel maakt.

Voor beheerde tabellen bepaalt Azure Databricks de locatie voor de gegevens. Als u de locatie wilt ophalen, kunt u de instructie DESCRIBE DETAIL gebruiken, bijvoorbeeld:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Soms wilt u een tabel maken door het schema op te geven voordat u gegevens invoegt. U kunt dit voltooien met de volgende SQL-opdrachten:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

In Databricks Runtime 13.3 LTS en hoger kunt CREATE TABLE LIKE u een nieuwe lege Delta-tabel maken waarmee de schema- en tabeleigenschappen voor een delta-brontabel worden gedupliceerd. Dit kan met name handig zijn bij het promoveren van tabellen uit een ontwikkelomgeving naar productie, zoals in het volgende codevoorbeeld:

CREATE TABLE prod.people10m LIKE dev.people10m

U kunt ook de DeltaTableBuilder API in Delta Lake gebruiken om tabellen te maken. Vergeleken met de DataFrameWriter-API's kunt u met deze API eenvoudiger aanvullende informatie opgeven, zoals kolomopmerkingen, tabeleigenschappen en gegenereerde kolommen.

Belangrijk

Deze functie is beschikbaar als openbare preview.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert naar een tabel

Als u een set updates en invoegingen wilt samenvoegen in een bestaande Delta-tabel, gebruikt u de instructie MERGE INTO . Met de volgende instructie worden bijvoorbeeld gegevens uit de brontabel opgehaald en samengevoegd in de delta-doeltabel. Wanneer er een overeenkomende rij in beide tabellen is, werkt Delta Lake de gegevenskolom bij met behulp van de opgegeven expressie. Wanneer er geen overeenkomende rij is, voegt Delta Lake een nieuwe rij toe. Deze bewerking wordt een upsert genoemd.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Als u opgeeft *, worden hiermee alle kolommen in de doeltabel bijgewerkt of ingevoegd. Hierbij wordt ervan uitgegaan dat de brontabel dezelfde kolommen heeft als die in de doeltabel, anders genereert de query een analysefout.

U moet een waarde opgeven voor elke kolom in uw tabel wanneer u een INSERT bewerking uitvoert (bijvoorbeeld wanneer er geen overeenkomende rij in de bestaande gegevensset is). U hoeft echter niet alle waarden bij te werken.

Als u de resultaten wilt bekijken, voert u een query uit op de tabel.

SELECT * FROM people_10m WHERE id >= 9999998

Een tabel lezen

U opent gegevens in Delta-tabellen op basis van de tabelnaam of het tabelpad, zoals wordt weergegeven in de volgende voorbeelden:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Schrijven naar een tabel

Delta Lake gebruikt standaardsyntaxis voor het schrijven van gegevens naar tabellen.

Als u nieuwe gegevens atomisch wilt toevoegen aan een bestaande Delta-tabel, gebruikt append u de modus zoals in de volgende voorbeelden:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Als u alle gegevens in een tabel atomisch wilt vervangen, gebruikt overwrite u de modus zoals in de volgende voorbeelden:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Een tabel bijwerken

U kunt gegevens bijwerken die overeenkomen met een predicaat in een Delta-tabel. Als u bijvoorbeeld in een tabel met de naam people10m of het pad bij /tmp/delta/people-10m, een afkorting in de gender kolom wilt wijzigen van M of F naar Male , Femalekunt u het volgende uitvoeren:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Verwijderen uit een tabel

U kunt gegevens verwijderen die overeenkomen met een predicaat uit een Delta-tabel. Als u bijvoorbeeld in een tabel met de naam people10m of het pad op /tmp/delta/people-10malle rijen wilt verwijderen die overeenkomen met personen met een waarde in de birthDate kolom van vóór 1955, kunt u het volgende uitvoeren:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Belangrijk

delete verwijdert de gegevens uit de nieuwste versie van de Delta-tabel, maar verwijdert deze niet uit de fysieke opslag totdat de oude versies expliciet zijn leeggezogen. Zie vacuüm voor meer informatie.

Tabelgeschiedenis weergeven

Als u de geschiedenis van een tabel wilt weergeven, gebruikt u de instructie DESCRIBE HISTORY , die informatie over de herkomst bevat, waaronder de tabelversie, bewerking, gebruiker, enzovoort, voor elke schrijfbewerking naar een tabel.

DESCRIBE HISTORY people_10m

Een query uitvoeren op een eerdere versie van de tabel (tijdreizen)

Met Delta Lake-tijdreizen kunt u een query uitvoeren op een oudere momentopname van een Delta-tabel.

Als u een query wilt uitvoeren op een oudere versie van een tabel, geeft u een versie of tijdstempel op in een SELECT instructie. Als u bijvoorbeeld een query wilt uitvoeren op versie 0 uit de bovenstaande geschiedenis, gebruikt u:

SELECT * FROM people_10m VERSION AS OF 0

or

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Voor tijdstempels worden alleen datum- of tijdstempeltekenreeksen geaccepteerd, bijvoorbeeld "2019-01-01" en "2019-01-01'T'00:00:00.000Z".

Met DataFrameReader-opties kunt u een DataFrame maken op basis van een Delta-tabel die is vastgezet aan een specifieke versie van de tabel, bijvoorbeeld in Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

of, als alternatief:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Zie Werken met Delta Lake-tabelgeschiedenis voor meer informatie.

Een tabel optimaliseren

Zodra u meerdere wijzigingen in een tabel hebt uitgevoerd, hebt u mogelijk veel kleine bestanden. Als u de snelheid van leesquery's wilt verbeteren, kunt u kleine OPTIMIZE bestanden samenvouwen in grotere bestanden:

OPTIMIZE people_10m

Z-volgorde op kolommen

Als u de leesprestaties verder wilt verbeteren, kunt u gerelateerde informatie in dezelfde set bestanden zoeken door Z-Ordering. Deze co-locatie wordt automatisch gebruikt door Delta Lake-algoritmen voor het overslaan van gegevens om de hoeveelheid gegevens die moet worden gelezen aanzienlijk te verminderen. Voor Z-Order-gegevens geeft u de kolommen op waarop u in de component wilt ZORDER BY orden. Voer bijvoorbeeld het volgende uit om samen te zoeken op gender:

OPTIMIZE people_10m
ZORDER BY (gender)

Voor de volledige set opties die beschikbaar zijn bij het uitvoeren OPTIMIZE, raadpleegt u Compacte gegevensbestanden met optimalisatie op Delta Lake.

Momentopnamen opschonen met VACUUM

Delta Lake biedt isolatie van momentopnamen voor leesbewerkingen, wat betekent dat het veilig is om te worden uitgevoerd OPTIMIZE , zelfs terwijl andere gebruikers of taken query's uitvoeren op de tabel. Uiteindelijk moet u echter oude momentopnamen opschonen. U kunt dit doen door de VACUUM opdracht uit te voeren:

VACUUM people_10m

Zie Ongebruikte gegevensbestanden verwijderen met vacuüm voor meer informatie over effectief gebruik.VACUUM