Tutorial: Lago Delta
Este tutorial apresenta operações comuns do Delta Lake no Azure Databricks, incluindo as seguintes:
- Criar uma tabela.
- Upsert numa tabela.
- Ler a partir de uma tabela.
- Apresentar o histórico de tabelas.
- Consultar uma versão anterior de uma tabela.
- Otimizar uma tabela.
- Adicionar um índice de ordenação Z.
- Limpar ficheiros não referenciados.
Você pode executar o exemplo de código Python, R, Scala e SQL neste artigo de dentro de um bloco de anotações anexado a um cluster do Azure Databricks. Você também pode executar o código SQL neste artigo de dentro de uma consulta associada a um armazém SQL no Databricks SQL.
Nota
Alguns dos exemplos de código a seguir usam uma notação de namespace de dois níveis que consiste em um esquema (também chamado de banco de dados) e uma tabela ou exibição (por exemplo, default.people10m
). Para usar esses exemplos com o Unity Catalog, substitua o namespace de dois níveis pela notação de namespace de três níveis do Unity Catalog que consiste em um catálogo, esquema e tabela ou exibição (por exemplo, main.default.people10m
).
Criar uma tabela
Todas as tabelas criadas no Azure Databricks usam o Delta Lake por padrão.
Nota
Delta Lake é o padrão para todos os comandos de leitura, gravação e criação de tabela do Azure Databricks.
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
As operações anteriores criam uma nova tabela gerenciada usando o esquema que foi inferido dos dados. Para obter informações sobre as opções disponíveis ao criar uma tabela Delta, consulte CREATE TABLE.
Para tabelas gerenciadas, o Azure Databricks determina o local dos dados. Para obter a localização, você pode usar a instrução DESCRIBE DETAIL , por exemplo:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
Às vezes, você pode querer criar uma tabela especificando o esquema antes de inserir dados. Você pode concluir isso com os seguintes comandos SQL:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
No Databricks Runtime 13.3 LTS e superior, você pode usar CREATE TABLE LIKE
para criar uma nova tabela Delta vazia que duplica as propriedades do esquema e da tabela para uma tabela Delta de origem. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para a produção, como no exemplo de código a seguir:
CREATE TABLE prod.people10m LIKE dev.people10m
Você também pode usar a DeltaTableBuilder
API no Delta Lake para criar tabelas. Em comparação com as APIs DataFrameWriter, essa API facilita a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.
Importante
Esta funcionalidade está em Pré-visualização Pública.
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
Upsert para uma mesa
Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use a instrução MERGE INTO . Por exemplo, a instrução a seguir pega dados da tabela de origem e os mescla na tabela Delta de destino. Quando há uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não há uma linha correspondente, o Delta Lake adiciona uma nova linha. Esta operação é conhecida como upsert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Se você especificar *
, isso atualiza ou insere todas as colunas na tabela de destino. Isso pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino, caso contrário, a consulta lançará um erro de análise.
Você deve especificar um valor para cada coluna na tabela ao executar uma INSERT
operação (por exemplo, quando não há nenhuma linha correspondente no conjunto de dados existente). No entanto, não é necessário atualizar todos os valores.
Para ver os resultados, consulte a tabela.
SELECT * FROM people_10m WHERE id >= 9999998
Ler uma tabela
Você acessa dados em tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos exemplos a seguir:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
Escrever numa tabela
O Delta Lake usa sintaxe padrão para gravar dados em tabelas.
Para adicionar atomicamente novos dados a uma tabela Delta existente, use append
o modo como nos exemplos a seguir:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
Para substituir atomicamente todos os dados em uma tabela, use overwrite
o modo como nos exemplos a seguir:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
Atualizar uma tabela
Você pode atualizar dados que correspondam a um predicado em uma tabela Delta. Por exemplo, em uma tabela chamada people10m
ou um caminho em /tmp/delta/people-10m
, para alterar uma abreviatura na gender
coluna de M
ou F
para ou Female
, Male
você pode executar o seguinte:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
Excluir de uma tabela
Você pode remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, em uma tabela com nome people10m
ou um caminho em /tmp/delta/people-10m
, para excluir todas as linhas correspondentes a pessoas com um valor na birthDate
coluna de antes 1955
, você pode executar o seguinte:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Importante
delete
remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente aspiradas. Consulte vácuo para obter detalhes.
Exibir histórico da tabela
Para exibir o histórico de uma tabela, use a instrução DESCRIBE HISTORY , que fornece informações de proveniência, incluindo a versão da tabela, operação, usuário e assim por diante, para cada gravação em uma tabela.
DESCRIBE HISTORY people_10m
Consultar uma versão anterior da tabela (viagem no tempo)
A viagem no tempo do Lago Delta permite que você consulte um instantâneo mais antigo de uma tabela Delta.
Para consultar uma versão mais antiga de uma tabela, especifique uma versão ou carimbo de data/hora em uma SELECT
instrução. Por exemplo, para consultar a versão 0 do histórico acima, use:
SELECT * FROM people_10m VERSION AS OF 0
ou
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Para carimbos de data/hora, apenas cadeias de caracteres de carimbo de data ou hora são aceitas, por exemplo, "2019-01-01"
e "2019-01-01'T'00:00:00.000Z"
.
As opções de DataFrameReader permitem que você crie um DataFrame a partir de uma tabela Delta que é corrigida para uma versão específica da tabela, por exemplo, em Python:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
ou, alternativamente:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
Para obter detalhes, consulte Trabalhar com o histórico da tabela Delta Lake.
Otimizar uma tabela
Depois de executar várias alterações em uma tabela, você pode ter muitos arquivos pequenos. Para melhorar a velocidade das consultas de leitura, você pode usar OPTIMIZE
para recolher arquivos pequenos em arquivos maiores:
OPTIMIZE people_10m
Ordem Z por colunas
Para melhorar ainda mais o desempenho de leitura, você pode colocalizar informações relacionadas no mesmo conjunto de arquivos pelo Z-Ordering. Essa co-localidade é usada automaticamente pelos algoritmos de pulo de dados do Delta Lake para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para os dados Z-Order, especifique as colunas a serem ordenadas na ZORDER BY
cláusula. Por exemplo, para colocalizar por gender
, execute:
OPTIMIZE people_10m
ZORDER BY (gender)
Para obter o conjunto completo de opções disponíveis durante a execução OPTIMIZE
, consulte Compactar arquivos de dados com otimizar no Delta Lake.
Limpe instantâneos com VACUUM
O Delta Lake fornece isolamento de instantâneo para leituras, o que significa que é seguro executar OPTIMIZE
mesmo enquanto outros usuários ou trabalhos estão consultando a tabela. Eventualmente, no entanto, você deve limpar instantâneos antigos. Você pode fazer isso executando o VACUUM
comando:
VACUUM people_10m
Para obter detalhes sobre como usar VACUUM
de forma eficaz, consulte Remover arquivos de dados não utilizados com vácuo.