Tutorial: Lago Delta

Este tutorial apresenta operações comuns do Delta Lake no Azure Databricks, incluindo as seguintes:

Você pode executar o exemplo de código Python, R, Scala e SQL neste artigo de dentro de um bloco de anotações anexado a um cluster do Azure Databricks. Você também pode executar o código SQL neste artigo de dentro de uma consulta associada a um armazém SQL no Databricks SQL.

Nota

Alguns dos exemplos de código a seguir usam uma notação de namespace de dois níveis que consiste em um esquema (também chamado de banco de dados) e uma tabela ou exibição (por exemplo, default.people10m). Para usar esses exemplos com o Unity Catalog, substitua o namespace de dois níveis pela notação de namespace de três níveis do Unity Catalog que consiste em um catálogo, esquema e tabela ou exibição (por exemplo, main.default.people10m).

Criar uma tabela

Todas as tabelas criadas no Azure Databricks usam o Delta Lake por padrão.

Nota

Delta Lake é o padrão para todos os comandos de leitura, gravação e criação de tabela do Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

As operações anteriores criam uma nova tabela gerenciada usando o esquema que foi inferido dos dados. Para obter informações sobre as opções disponíveis ao criar uma tabela Delta, consulte CREATE TABLE.

Para tabelas gerenciadas, o Azure Databricks determina o local dos dados. Para obter a localização, você pode usar a instrução DESCRIBE DETAIL , por exemplo:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

Às vezes, você pode querer criar uma tabela especificando o esquema antes de inserir dados. Você pode concluir isso com os seguintes comandos SQL:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

No Databricks Runtime 13.3 LTS e superior, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica as propriedades do esquema e da tabela para uma tabela Delta de origem. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para a produção, como no exemplo de código a seguir:

CREATE TABLE prod.people10m LIKE dev.people10m

Você também pode usar a DeltaTableBuilder API no Delta Lake para criar tabelas. Em comparação com as APIs DataFrameWriter, essa API facilita a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.

Importante

Esta funcionalidade está em Pré-visualização Pública.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert para uma mesa

Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use a instrução MERGE INTO . Por exemplo, a instrução a seguir pega dados da tabela de origem e os mescla na tabela Delta de destino. Quando há uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não há uma linha correspondente, o Delta Lake adiciona uma nova linha. Esta operação é conhecida como upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Se você especificar *, isso atualiza ou insere todas as colunas na tabela de destino. Isso pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta lançará um erro de análise.

Você deve especificar um valor para cada coluna na tabela ao executar uma INSERT operação (por exemplo, quando não há nenhuma linha correspondente no conjunto de dados existente). No entanto, não é necessário atualizar todos os valores.

Para ver os resultados, consulte a tabela.

SELECT * FROM people_10m WHERE id >= 9999998

Ler uma tabela

Você acessa dados em tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos exemplos a seguir:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Escrever numa tabela

O Delta Lake usa sintaxe padrão para gravar dados em tabelas.

Para adicionar atomicamente novos dados a uma tabela Delta existente, use append o modo como nos exemplos a seguir:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

Para substituir atomicamente todos os dados em uma tabela, use overwrite o modo como nos exemplos a seguir:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

Atualizar uma tabela

Você pode atualizar dados que correspondam a um predicado em uma tabela Delta. Por exemplo, em uma tabela chamada people10m ou um caminho em /tmp/delta/people-10m, para alterar uma abreviatura na gender coluna de M ou F para ou Female, Male você pode executar o seguinte:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Excluir de uma tabela

Você pode remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, em uma tabela com nome people10m ou um caminho em /tmp/delta/people-10m, para excluir todas as linhas correspondentes a pessoas com um valor na birthDate coluna de antes 1955, você pode executar o seguinte:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Importante

delete remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente aspiradas. Consulte vácuo para obter detalhes.

Exibir histórico da tabela

Para exibir o histórico de uma tabela, use a instrução DESCRIBE HISTORY , que fornece informações de proveniência, incluindo a versão da tabela, operação, usuário e assim por diante, para cada gravação em uma tabela.

DESCRIBE HISTORY people_10m

Consultar uma versão anterior da tabela (viagem no tempo)

A viagem no tempo do Lago Delta permite que você consulte um instantâneo mais antigo de uma tabela Delta.

Para consultar uma versão mais antiga de uma tabela, especifique uma versão ou carimbo de data/hora em uma SELECT instrução. Por exemplo, para consultar a versão 0 do histórico acima, use:

SELECT * FROM people_10m VERSION AS OF 0

ou

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Para carimbos de data/hora, apenas cadeias de caracteres de carimbo de data ou hora são aceitas, por exemplo, "2019-01-01" e "2019-01-01'T'00:00:00.000Z".

As opções de DataFrameReader permitem que você crie um DataFrame a partir de uma tabela Delta que é corrigida para uma versão específica da tabela, por exemplo, em Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

ou, alternativamente:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Para obter detalhes, consulte Trabalhar com o histórico da tabela Delta Lake.

Otimizar uma tabela

Depois de executar várias alterações em uma tabela, você pode ter muitos arquivos pequenos. Para melhorar a velocidade das consultas de leitura, você pode usar OPTIMIZE para recolher arquivos pequenos em arquivos maiores:

OPTIMIZE people_10m

Ordem Z por colunas

Para melhorar ainda mais o desempenho de leitura, você pode colocalizar informações relacionadas no mesmo conjunto de arquivos pelo Z-Ordering. Essa co-localidade é usada automaticamente pelos algoritmos de pulo de dados do Delta Lake para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para os dados Z-Order, especifique as colunas a serem ordenadas na ZORDER BY cláusula. Por exemplo, para colocalizar por gender, execute:

OPTIMIZE people_10m
ZORDER BY (gender)

Para obter o conjunto completo de opções disponíveis durante a execução OPTIMIZE, consulte Compactar arquivos de dados com otimizar no Delta Lake.

Limpe instantâneos com VACUUM

O Delta Lake fornece isolamento de instantâneo para leituras, o que significa que é seguro executar OPTIMIZE mesmo enquanto outros usuários ou trabalhos estão consultando a tabela. Eventualmente, no entanto, você deve limpar instantâneos antigos. Você pode fazer isso executando o VACUUM comando:

VACUUM people_10m

Para obter detalhes sobre como usar VACUUM de forma eficaz, consulte Remover arquivos de dados não utilizados com vácuo.