Tutorial: Delta Lake

Este tutorial apresenta operações comuns do Delta Lake no Azure Databricks, incluindo o seguinte:

Execute o código Python, R, Scala e SQL de exemplo deste artigo em um notebook anexado a um cluster do Azure Databricks. Também é possível executar o código SQL deste artigo em uma consulta associada a um SQL warehouse no SQL do Databricks.

Observação

Alguns dos exemplos de código a seguir usam uma notação de namespace de dois níveis composta por um esquema (também chamado de banco de dados) e por uma tabela ou exibição (por exemplo, default.people10m). Para usar esses exemplos com o Catálogo do Unity, substitua o namespace de dois níveis pela notação de namespace de três níveis do Catálogo do Unity, composta por um catálogo, um esquema e uma tabela ou exibição (por exemplo, main.default.people10m).

Criar uma tabela

Todas as tabelas criadas no Azure Databricks usam o Delta Lake por padrão.

Observação

O Delta Lake é o padrão para todos os comandos de leitura/gravação e criação de tabelas do Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

As operações anteriores criam uma tabela gerenciada usando o esquema que foi inferido dos dados. Para obter informações sobre as opções disponíveis ao criar uma tabela Delta, confira CREATE TABLE.

Para tabelas gerenciadas, o Azure Databricks determina o local dos dados. Para obter o local, você pode usar a instrução DESCRIBE DETAIL, por exemplo:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Às vezes, talvez você queira criar uma tabela especificando o esquema antes de inserir dados. Você pode concluir isso com os seguintes comandos SQL:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

No Databricks Runtime 13.3 LTS e superior, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica o esquema e as propriedades da tabela para uma tabela Delta de origem. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para produção, como no exemplo de código a seguir:

CREATE TABLE prod.people10m LIKE dev.people10m

você também pode usar a API DeltaTableBuilder no Delta Lake para criar tabelas. Em comparação com as APIs DataFrameWriter, essa API facilita a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.

Importante

Esse recurso está em uma versão prévia.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Executar upsert para uma tabela

Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use a instrução MERGE INTO. Por exemplo, a instrução a seguir usa os dados da tabela de origem e os mescla na tabela Delta de destino. Quando há uma linha correspondente nas duas tabelas, o Delta Lake atualiza a coluna de dados usando a expressão especificada. Quando não há nenhuma linha correspondente, o Delta Lake adiciona uma nova linha. Essa operação é conhecida como upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Se você especificar *, isso vai atualizar ou inserir todas as colunas na tabela de destino. Isso pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta vai gerar um erro de análise.

Você precisa especificar um valor para cada coluna na tabela ao executar uma operação INSERT (por exemplo, quando não há nenhuma linha correspondente no conjunto de dados existente). No entanto, você não precisa atualizar todos os valores.

Para ver os resultados, consulte a tabela.

SELECT * FROM people_10m WHERE id >= 9999998

Ler uma tabela

Você acessa dados em tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos seguintes exemplos:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Gravar em uma tabela

O Delta Lake usa a sintaxe padrão para gravar dados em tabelas.

Para adicionar atomicamente novos dados a uma tabela Delta existente, use o modo append, como nos seguintes exemplos:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Para substituir atomicamente todos os dados em uma tabela, use o modo overwrite, como nos seguintes exemplos:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Atualizar uma tabela

Você pode atualizar dados que corresponde a um predicado em uma tabela Delta. Por exemplo, em uma tabela chamada people10m ou um caminho em /tmp/delta/people-10m, para alterar uma abreviação na coluna gender de M ou F para Male ou Female, você pode executar o seguinte:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Excluir de uma tabela

É possível remover dados que corresponde a um predicado de uma tabela Delta. Por exemplo, em uma tabela chamada people10m ou um caminho em /tmp/delta/people-10m, para excluir todas as linhas correspondentes a pessoas com um valor na coluna birthDate de antes de 1955, você pode executar o seguinte:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Importante

delete remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente a vácuo. Consulte vácuo para obter detalhes.

Exibir o histórico de tabelas

Para ver o histórico de uma tabela, use a instrução DESCRIBE HISTORY, que fornece informações de proveniência, incluindo a versão da tabela, a operação, o usuário etc., para cada gravação em uma tabela.

DESCRIBE HISTORY people_10m

Consultar uma versão anterior da tabela (viagem no tempo)

A viagem no tempo do Delta Lake permite consultar um instantâneo mais antigo de uma tabela Delta.

Para consultar uma versão mais antiga de uma tabela, especifique uma versão ou um carimbo de data/hora em uma instrução SELECT. Por exemplo, para consultar a versão 0 do histórico acima, use:

SELECT * FROM people_10m VERSION AS OF 0

ou

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Para os carimbos de data/hora, somente cadeias de caracteres de data ou de carimbos de data/hora são aceitas, por exemplo, "2019-01-01" e "2019-01-01'T'00:00:00.000Z".

As opções do DataFrameReader permitem que você crie um DataFrame com base em uma tabela Delta que seja corrigido para uma versão específica da tabela, por exemplo, em Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

ou, alternativamente:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Para ver detalhes, confira Trabalhar com o histórico de tabelas do Delta Lake.

Otimizar uma tabela

Depois de realizar várias alterações em uma tabela, você poderá ter muitos arquivos pequenos. A fim de aprimorar a velocidade das consultas de leitura, use OPTIMIZE para recolher arquivos pequenos em arquivos maiores:

OPTIMIZE people_10m

Ordem Z por colunas

Para aprimorar ainda mais o desempenho de leitura, coloque as informações relacionadas no mesmo conjunto de arquivos pela ordenação Z. Essa colocalidade é usada automaticamente por algoritmos que ignoram dados do Delta Lake a fim de reduzir significativamente o volume de dados que precisam ser lidos. Para dados de Ordem Z, especifique as colunas para ordenar na cláusula ZORDER BY. Por exemplo, para colocalizar por gender, execute:

OPTIMIZE people_10m
ZORDER BY (gender)

Para obter o conjunto completo de opções disponíveis ao executar OPTIMIZE, consulte Compactar arquivos de dados com otimização no Delta Lake.

Limpar instantâneos com VACUUM

O Delta Lake fornece isolamento de instantâneo para leituras, o que significa que é seguro executar OPTIMIZE mesmo enquanto outros usuários ou trabalhos estão consultando a tabela. No entanto, por fim, você deve limpar os instantâneos antigos. Faça isso executando o comando VACUUM:

VACUUM people_10m

Para obter detalhes sobre como usar VACUUM com eficiência, consulte Remover arquivos de dados não utilizados com vacuum.