Tutoriel : Delta Lake

Ce tutoriel présente les opérations Delta Lake courantes sur Azure Databricks, dont les suivantes :

Vous pouvez exécuter l’exemple de code Python, R, Scala et SQL dans cet article à partir d’un notebook attaché à un cluster Azure Databricks. Vous pouvez également exécuter le code SQL indiqué dans cet article à partir d’une requête associée à un entrepôt SQL dans Databricks SQL.

Notes

Certains exemples de code suivants utilisent une notation d’espace de noms à deux niveaux composée d’un schéma (également appelé base de données) et d’une table ou d’une vue (par exemple default.people10m). Pour utiliser ces exemples avec Unity Catalog, remplacez l’espace de noms à deux niveaux par la notation d’espace de noms de trois niveaux du catalogue Unity composée d’un catalogue, d’un schéma et d’une table ou d’une vue (par exemple main.default.people10m).

Créez une table

Toutes les tables créées sur Azure Databricks utilisent Delta Lake par défaut.

Remarque

Delta Lake est la valeur par défaut pour toutes les commandes de lecture, d'écriture et de création de table Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

Les opérations précédentes créent une nouvelle table managée en utilisant le schéma inféré à partir des données. Pour plus d’informations sur les options disponibles lors de la création d’une table Delta, consultez CREATE TABLE.

Pour les tables managées, Azure Databricks détermine l’emplacement des données. Pour obtenir l’emplacement, vous pouvez utiliser l’instruction DESCRIBE DETAIL, par exemple :

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Vous pouvez avoir besoin de créer une table en spécifiant le schéma avant d’insérer des données. Vous pouvez effectuer cette opération avec les commandes SQL suivantes :

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez utiliser CREATE TABLE LIKE pour créer une table Delta vide qui duplique les propriétés de schéma et de table d’une table Delta source. Cela peut être particulièrement utile lors de la promotion de tables d’un environnement de développement en production, comme dans l’exemple de code suivant :

CREATE TABLE prod.people10m LIKE dev.people10m

Vous pouvez également utiliser l’API DeltaTableBuilder dans Delta Lake pour créer des tables. Par rapport aux API DataFrameWriter, cette API permet de spécifier plus facilement des informations supplémentaires telles que des commentaires de colonne, des propriétés de table et des colonnes générées.

Important

Cette fonctionnalité est disponible en préversion publique.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Faire un upsert vers une table

Pour fusionner un ensemble de mises à jour et d’insertions dans une table Delta existante, vous utilisez l’instruction MERGE INTO. Par exemple, l’instruction suivante extrait des données de la table source et les fusionne dans la table Delta cible. Lorsqu’il existe une ligne correspondante dans les deux tables, Delta Lake met à jour la colonne de données à l’aide de l’expression donnée. En l’absence de ligne correspondante, Delta Lake ajoute une nouvelle ligne. Cette opération porte le nom d’upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Si vous spécifiez *, cette instruction met à jour ou insère toutes les colonnes dans la table cible. Cela part du principe que la table source a les mêmes colonnes que la table cible, sinon la requête lève une erreur d’analyse.

Vous devez spécifier une valeur pour chaque colonne de votre table lorsque vous effectuez une opération INSERT (par exemple lorsqu’il n’y a aucune ligne correspondante dans le jeu de données existant). Toutefois, vous n’avez pas besoin de mettre à jour toutes les valeurs.

Pour afficher les résultats, interrogez la table.

SELECT * FROM people_10m WHERE id >= 9999998

Lire une table

Vous accédez aux données d’une table Delta en indiquant le nom de la table ou le chemin de la table, comme cela est montré dans les exemples suivants :

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Écrire dans une table

Delta Lake utilise la syntaxe standard pour écrire des données dans des tables.

Pour ajouter atomiquement de nouvelles données à une table Delta existante, utilisez le mode append comme dans les exemples suivants :

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Pour remplacer atomiquement toutes les données dans une table, utilisez le mode overwrite comme dans les exemples suivants :

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Mettre à jour une table

Vous pouvez mettre à jour des données correspondant à un prédicat dans une table Delta. Par exemple, dans une table nommée people10m ou un chemin d’accès /tmp/delta/people-10m, pour modifier une abréviation dans la colonne gender de M ou F en Male ou Female, vous pouvez exécuter la commande suivante :

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Supprimer d’une table

Vous pouvez supprimer d’une table Delta des données correspondant à un prédicat. Par exemple, dans une table nommée people10m ou un chemin d’accès /tmp/delta/people-10m, pour supprimer toutes les lignes correspondant aux personnes ayant une valeur antérieure à 1955 dans la colonne birthDate, vous pouvez exécuter la commande suivante :

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Important

La commande delete supprime les données de la dernière version de la table Delta, mais ne les supprime pas du stockage physique tant que les anciennes versions n’ont pas été explicitement vidées de la mémoire. Pour plus de détails, consultez vacuum.

Afficher l’historique d’une table

Pour afficher l’historique d’une table, utilisez l’instruction DESCRIBE HISTORY , qui fournit des informations de provenance, notamment la version de table, l’opération, l’utilisateur, etc., pour chaque écriture dans une table.

DESCRIBE HISTORY people_10m

Interroger une version antérieure d’une table (voyage dans le temps)

Le voyage dans le temps Delta Lake vous permet d’interroger un ancien instantané d’une table Delta.

Pour interroger une version antérieure d’une table, spécifiez une version ou un horodatage dans une instruction SELECT. Par exemple, pour interroger la version 0 à partir de l’historique ci-dessus, utilisez :

SELECT * FROM people_10m VERSION AS OF 0

or

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Pour les horodatages, seules les chaînes de date ou d’horodatage sont acceptées, par exemple "2019-01-01" et "2019-01-01'T'00:00:00.000Z".

Les options DataFrameReader vous permettent de créer un DataFrame à partir d’une table Delta qui est défini sur une version spécifique de la table, par exemple en Python :

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

ou encore :

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Pour plus d’informations, consultez Utiliser l’historique des tables Delta Lake.

Optimiser une table

Une fois que vous avez apporté de nombreuses modifications à une table, vous risquez de vous retrouver avec un grand nombre de petits fichiers. Pour améliorer la rapidité des requêtes de lecture, vous pouvez utiliser OPTIMIZE afin de combiner les fichiers de petite taille en fichiers plus gros :

OPTIMIZE people_10m

Organiser les colonnes selon l’ordre de plan

Pour améliorer encore davantage les performances de lecture, vous pouvez colocaliser les informations associées dans le même jeu de fichiers en procédant à un ordonnancement Z. Cette colocalisation est automatiquement utilisée par les algorithmes de Data-Skipping Delta Lake pour réduire considérablement la quantité de données qui doit être lue. Pour trier les données dans l’ordre Z, vous spécifiez les colonnes à trier dans la clause ZORDER BY. Par exemple, pour colocaliser par gender, exécutez :

OPTIMIZE people_10m
ZORDER BY (gender)

Pour voir l’ensemble complet des options disponibles lors de l’exécution de OPTIMIZE, consultez Compacter des fichiers de données avec l’optimisation sur Delta Lake.

Nettoyer les instantanés avec VACUUM

Delta Lake fournit une isolation des instantanés pour les lectures, ce qui signifie qu’il est sans danger d’exécuter OPTIMIZE même pendant que d’autres utilisateurs ou travaux interrogent la table. Toutefois, il est préférable de nettoyer les anciens instantanés. Vous pouvez pour cela exécuter la commande VACUUM :

VACUUM people_10m

Pour plus d’informations sur l’utilisation efficace de VACUUM, consultez Supprimer les fichiers de données inutilisés avec le nettoyage.