Eliminações, atualizações e uniões de tabelas
A Delta Lake apoia várias declarações para facilitar a eliminação de dados e a atualização de dados nas tabelas Delta.
Excluir de uma mesa
Pode remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, para eliminar todos os eventos de 2017
antes, pode executar o seguinte:
SQL
DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'
Python
Nota
A API Python está disponível em Databricks Runtime 6.1 e superior.
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'") # predicate using SQL formatted string
deltaTable.delete(col("date") < "2017-01-01") # predicate using Spark SQL functions
Scala
Nota
A API Scala está disponível em Databricks Runtime 6.0 ou superior.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01") // predicate using Spark SQL functions and implicits
Java
Nota
A API java está disponível em Databricks Runtime 6.0 ou superior.
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");
deltaTable.delete("date < '2017-01-01'"); // predicate using SQL formatted string
deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01"))); // predicate using Spark SQL functions
Consulte a referência API para mais detalhes.
Importante
delete
remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente aspiradas. Consulte o vácuo para mais detalhes.
Dica
Quando possível, forneça predicados nas colunas de partição para uma tabela Delta dividida, uma vez que tais predicados podem acelerar significativamente a operação.
Atualizar uma tabela
Pode atualizar dados que correspondam a um predicado numa tabela Delta. Por exemplo, para corrigir um erro ortográfico no eventType
, pode executar o seguinte:
SQL
UPDATE events SET eventType = 'click' WHERE eventType = 'clck'
UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'
Python
Nota
A API Python está disponível em Databricks Runtime 6.1 e superior.
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } ) # predicate using SQL formatted string
deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } ) # predicate using Spark SQL functions
Scala
Nota
A API Scala está disponível em Databricks Runtime 6.0 ou superior.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( // predicate using Spark SQL functions and implicits
col("eventType") === "clck",
Map("eventType" -> lit("click")));
Java
Nota
A API Scala está disponível em Databricks Runtime 6.0 ou superior.
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
new HashMap<String, String>() {{
put("eventType", "'click'");
}}
);
deltaTable.update( // predicate using Spark SQL functions
functions.col(eventType).eq("clck"),
new HashMap<String, Column>() {{
put("eventType", functions.lit("click"));
}}
);
Consulte a referência API para mais detalhes.
Dica
Similares a eliminar, as operações de atualização podem obter uma aceleração significativa com predicados em divisórias.
Upsert em uma mesa usando a fusão
Pode aumentar os dados de uma tabela de origem, visualização ou DataFrame numa tabela Delta alvo utilizando a merge
operação. Esta operação é semelhante ao MERGE INTO
comando SQL, mas tem suporte adicional para eliminações e condições extra em atualizações, inserções e eliminações.
Suponha que tenha um DataFrame spark que contenha novos dados para eventos com eventId
. Alguns destes eventos podem já estar presentes na events
mesa. Para fundir os novos dados na events
tabela, pretende atualizar as linhas correspondentes (ou seja, eventId
já presentes) e inserir as novas linhas (ou seja, eventId
não presentes). Pode executar o seguinte:
SQL
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET events.data = updates.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
Para detalhes da sintaxe, consulte
- Databricks Runtime 7.x: MERGE INTO (Delta Lake on Azure Databricks)
- Databricks Runtime 5.5 LTS e 6.x: Merge Into (Delta Lake on Azure Databricks)
Python
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.alias("events").merge(
updatesDF.alias("updates"),
"events.eventId = updates.eventId") \
.whenMatchedUpdate(set = { "data" : "updates.data" } ) \
.whenNotMatchedInsert(values =
{
"date": "updates.date",
"eventId": "updates.eventId",
"data": "updates.data"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
Dataset<Row> updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched()
.updateExpr(
new HashMap<String, String>() {{
put("data", "events.data");
}})
.whenNotMatched()
.insertExpr(
new HashMap<String, String>() {{
put("date", "updates.date");
put("eventId", "updates.eventId");
put("data", "updates.data");
}})
.execute();
Consulte a referência API para detalhes da sintaxe de Scala, Java e Python.
Semântica de operação
Aqui está uma descrição detalhada da merge
operação programática.
Pode haver qualquer número
whenMatched
dewhenNotMatched
cláusulas.Nota
No Databricks Runtime 7.2 e abaixo,
merge
pode ter no máximo 2whenMatched
cláusulas e no máximo 1whenNotMatched
cláusula.whenMatched
as cláusulas são executadas quando uma linha de origem corresponde a uma linha de tabela alvo com base na condição de jogo. Estas cláusulas têm a seguinte semântica.whenMatched
cláusulas podem ter no máximoupdate
e umadelete
ação. Aupdate
ação só atualiza asmerge
colunas especificadas (semelhantes àupdate
operação) da linha-alvo correspondida. Adelete
ação elimina a linha partida.Cada
whenMatched
cláusula pode ter uma condição opcional. Se esta condição de cláusula existir, aupdate
oudelete
ação é executada para qualquer linha de par de linha de origem-alvo correspondente apenas quando a condição da cláusula é verdadeira.Se houver
whenMatched
várias cláusulas, então são avaliadas por ordem de serem especificadas (isto é, a ordem das cláusulas importa). Todas aswhenMatched
cláusulas, exceto a última, devem ter condições.Se ambas as
whenMatched
cláusulas tiverem condições e nenhuma das condições for verdadeira para um par de linhas de alvo de origem correspondente, então a linha de alvo correspondente fica inalterada.Para atualizar todas as colunas da tabela Delta alvo com as colunas correspondentes do conjunto de dados de origem, utilize
whenMatched(...).updateAll()
. Isto equivale a:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
para todas as colunas da tabela Delta alvo. Por conseguinte, esta ação pressupõe que a tabela de origem tem as mesmas colunas que as da tabela-alvo, caso contrário a consulta lança um erro de análise.
Nota
Este comportamento muda quando a migração automática de esquemas está ativada. Consulte a evolução automática do esquema para obter mais detalhes.
whenNotMatched
as cláusulas são executadas quando uma linha de origem não corresponde a qualquer linha alvo com base na condição de jogo. Estas cláusulas têm a seguinte semântica.whenNotMatched
cláusulas podem ter apenas ainsert
ação. A nova linha é gerada com base na coluna especificada e expressões correspondentes. Não é necessário especificar todas as colunas na tabela alvo. Para colunas-alvo nãoNULL
especificadas, é inserida.Nota
No Databricks Runtime 6.5 e abaixo, deve fornecer todas as colunas na tabela alvo para a
INSERT
ação.Cada
whenNotMatched
cláusula pode ter uma condição opcional. Se a condição da cláusula estiver presente, uma linha de origem só é inserida se essa condição for verdadeira para essa linha. Caso contrário, a coluna de origem é ignorada.Se houver
whenNotMatched
várias cláusulas, então são avaliadas por ordem de serem especificadas (isto é, a ordem das cláusulas importa). Todas aswhenNotMatched
cláusulas, exceto a última, devem ter condições.Para inserir todas as colunas da tabela Delta alvo com as colunas correspondentes do conjunto de dados de origem, utilize
whenNotMatched(...).insertAll()
. Isto equivale a:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
para todas as colunas da tabela Delta alvo. Por conseguinte, esta ação pressupõe que a tabela de origem tem as mesmas colunas que as da tabela-alvo, caso contrário a consulta lança um erro de análise.
Nota
Este comportamento muda quando a migração automática de esquemas está ativada. Consulte a evolução automática do esquema para obter mais detalhes.
Importante
Uma merge
operação pode falhar se várias linhas do conjunto de dados de origem corresponderem e tentar atualizar as mesmas linhas da tabela Delta alvo. De acordo com a semântica SQL de fusão, tal operação de atualização é ambígua, uma vez que não é claro qual a linha de origem que deve ser usada para atualizar a linha-alvo compatível. Pode pré-processar a tabela de origem para eliminar a possibilidade de vários jogos. Consulte o exemplo de captura de dados de alteração— pré-processa o conjunto de dados de alteração (isto é, o conjunto de dados de origem) para reter apenas a última alteração para cada chave antes de aplicar essa alteração na tabela Delta alvo.
Nota
Em Databricks Runtime 7.3 LTS ou superior, são permitidos vários jogos quando os jogos são eliminados incondicionalmente (uma vez que a eliminação incondicional não é ambígua mesmo que existam vários fósforos).
Validação do esquema
merge
valida automaticamente que o esquema dos dados gerados por inserção e atualização de expressões são compatíveis com o esquema da tabela. Utiliza as seguintes regras para determinar se a merge
operação é compatível:
- Para
update
einsert
ações, as colunas-alvo especificadas devem existir na tabela Delta alvo. - Para
updateAll
einsertAll
ações, o conjunto de dados de origem deve ter todas as colunas da tabela Delta alvo. O conjunto de dados de origem pode ter colunas extras e são ignorados. - Para todas as ações, se o tipo de dados gerado pelas expressões que produzem as colunas-alvo for diferente das colunas correspondentes na tabela Delta alvo,
merge
tenta lanhá-los para os tipos da tabela.
Evolução automática do esquema
Nota
A evolução do Schema merge
está disponível em Databricks Runtime 6.6 ou superior.
Por predefinição, updateAll
e insertAll
atribua todas as colunas na tabela Delta alvo com colunas do mesmo nome do conjunto de dados de origem. As colunas no conjunto de dados de origem que não correspondam às colunas na tabela-alvo são ignoradas. No entanto, em alguns casos de utilização, é desejável adicionar automaticamente colunas de origem à tabela Delta alvo. Para atualizar automaticamente o esquema de tabela durante uma merge
operação com updateAll
e insertAll
(pelo menos um deles), pode definir a configuração da sessão Spark spark.databricks.delta.schema.autoMerge.enabled
para antes de executar a true
merge
operação.
Nota
- A evolução do esquema ocorre apenas quando há uma
updateAll
ação ou umainsertAll
ação, ou ambos. update
einsert
as ações não podem referir-se explicitamente a colunas-alvo que já não existem na tabela-alvo (mesmo que existamupdateAll
ou como uma dasinsertAll
cláusulas). Veja os exemplos abaixo.
Nota
Em Databricks Runtime 7.4 e abaixo, merge
suporta a evolução do esquema apenas de colunas de nível superior, e não de colunas aninhadas.
Aqui estão alguns exemplos dos efeitos da merge
operação com e sem evolução de esquemas.
Colunas | Consulta (em Scala) | Comportamento sem evolução de esquema (padrão) | Comportamento com evolução do esquema |
---|---|---|---|
Colunas-alvo: key, value Colunas de origem: key, value, newValue |
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute() |
O esquema da tabela permanece inalterado; apenas colunas key , value são atualizadas/inseridas. |
O esquema da mesa é alterado para (key, value, newValue) . updateAll atualiza colunas value e , e newValue insertAll insere linhas (key, value, newValue) . |
Colunas-alvo: key, oldValue Colunas de origem: key, newValue |
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute() |
updateAll e insertAll as ações lançam um erro porque a oldValue coluna-alvo não está na fonte. |
O esquema da mesa é alterado para (key, oldValue, newValue) . updateAll atualiza colunas key e newValue deixa oldValue inalteradas, e insertAll insere linhas (key, NULL, newValue) (isto é, oldValue é inserida como NULL ). |
Colunas-alvo: key, oldValue Colunas de origem: key, newValue |
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().update(Map( "newValue" -> col("s.newValue"))) .whenNotMatched().insertAll() .execute() |
update lança um erro porque a coluna newValue não existe na tabela alvo. |
update ainda lança um erro porque a coluna newValue não existe na tabela alvo. |
Colunas-alvo: key, oldValue Colunas de origem: key, newValue |
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insert(Map( "key" -> col("s.key"), "newValue" -> col("s.newValue"))) .execute() |
insert lança um erro porque a coluna newValue não existe na tabela alvo. |
insert ainda lança um erro, uma vez que a coluna newValue não existe na tabela alvo. |
Otimização do desempenho
Pode reduzir o tempo desaguisso através da fusão utilizando as seguintes abordagens:
Reduza o espaço de pesquisa para fósforos: Por predefinição, a
merge
operação procura toda a tabela Delta para encontrar correspondências na tabela de origem. Uma maneira de acelerarmerge
é reduzir o espaço de pesquisa adicionando constrangimentos conhecidos na condição de jogo. Por exemplo, suponha que tem uma tabela que é dividida porcountry
e que pretende usar para atualizardate
merge
informações para o último dia e um país específico. Adicionar a condiçãoevents.date = current_date() AND events.country = 'USA'
tornará a consulta mais rápida, uma vez que procura fósforos apenas nas divisórias relevantes. Além disso, reduzirá igualmente as possibilidades de conflitos com outras operações simultâneas. Consulte o controlo da Concurrency para mais detalhes.
Ficheiros compactos: Se os dados forem armazenados em muitos ficheiros pequenos, a leitura dos dados para procurar correspondência pode tornar-se lenta. Pode compactar pequenos ficheiros em ficheiros maiores para melhorar a produção de leitura. Consulte os ficheiros Compactos para obter mais detalhes.
Controle as divisórias baralhar para escrever: A
merge
operação baralha dados várias vezes para calcular e escrever os dados atualizados. O número de tarefas utilizadas para baralhar é controlado pela configuração da sessão Sparkspark.sql.shuffle.partitions
. A definição deste parâmetro não só controla o paralelismo como também determina o número de ficheiros de saída. Aumentar o valor aumenta o paralelismo, mas também gera um maior número de ficheiros de dados menores.Ativar escritas otimizadas: Para mesas divididas,
merge
pode produzir um número muito maior de pequenos ficheiros do que o número de divisórias baralhadas. Isto porque cada tarefa de baralhar pode escrever vários ficheiros em várias divisórias, e pode tornar-se um estrangulamento de desempenho. Pode otimizá-lo ativando as escritas otimizadas.
Exemplos de uniões
Aqui estão alguns exemplos sobre como usar merge
em diferentes cenários.
Nesta secção:
- Deduplicação de dados ao escrever em tabelas Delta
- Alteração lenta da operação de dados (SCD) Tipo 2 em tabelas Delta
- Escreva dados de alteração numa tabela Delta
- Upsert de consultas de streaming usando
foreachBatch
Deduplicação de dados ao escrever em tabelas Delta
Um caso comum de utilização da ETL é recolher registos na tabela Delta, colocando-os numa mesa. No entanto, muitas vezes as fontes podem gerar registos duplicados e são necessárias medidas de deduplica a jusante para cuidar deles. Com merge
, pode evitar inserir os registos duplicados.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Nota
O conjunto de dados que contém os novos registos tem de ser desduplicado dentro de si. Pela semântica SQL de fusão, corresponde e desafina os novos dados com os dados existentes na tabela, mas se houver dados duplicados dentro do novo conjunto de dados, é inserido. Por isso, desduplicar os novos dados antes de se fundir na tabela.
Se souber que pode obter registos duplicados apenas por alguns dias, pode otimizar ainda mais a sua consulta dividindo a tabela por data e, em seguida, especificando o intervalo de data da tabela-alvo para combinar.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Isto é mais eficiente do que o comando anterior, pois procura duplicados apenas nos últimos 7 dias de registos, e não em toda a tabela. Além disso, pode utilizar esta fusão apenas com o Streaming Estruturado para efetuar uma desduplica contínua dos troncos.
- Numa consulta de streaming, pode utilizar a operação de fusão
foreachBatch
para escrever continuamente quaisquer dados de streaming para uma tabela Delta com deduplicação. Consulte o exemplo de streaming a seguir para obter mais informações sobreforeachBatch
. - Em outra consulta de streaming, você pode ler continuamente dados deduplicados a partir desta tabela Delta. Isto é possível porque uma fusão só de inserção anexa novos dados à tabela Delta.
Nota
A fusão só de inserção é otimizada apenas para anexar dados em Databricks Runtime 6.2 e superior. Em Databricks Runtime 6.1 e abaixo, as gravações de operações de fusão apenas inseridas não podem ser lidas como um fluxo.
Alteração lenta da operação de dados (SCD) Tipo 2 em tabelas Delta
Outra operação comum é o SCD Type 2, que mantém a história de todas as alterações feitas a cada chave numa tabela dimensional. Tais operações requerem a atualização das linhas existentes para marcar valores anteriores das teclas tão antigas, e a inserção das novas linhas como os valores mais recentes. Dada uma tabela de origem com atualizações e a tabela-alvo com os dados dimensionais, o SCD Type 2 pode ser expresso com merge
.
Aqui está um exemplo concreto de manter o histórico de endereços para um cliente, juntamente com a gama de datas ativas de cada endereço. Quando o endereço de um cliente precisa de ser atualizado, tem de marcar o endereço anterior como não o atual, atualizar o seu intervalo de datas ativas e adicionar o novo endereço como o atual.
SCD Tipo 2 usando caderno de fusão
Escreva dados de alteração numa tabela Delta
Semelhante ao SCD, outro caso de uso comum, muitas vezes chamado de captura de dados de alteração (CDC), é aplicar todas as alterações de dados geradas a partir de uma base de dados externa numa tabela Delta. Por outras palavras, um conjunto de atualizações, eliminações e inserções aplicadas a uma tabela externa tem de ser aplicada a uma tabela Delta.
Pode fazê-lo da merge
seguinte forma.
Escreva dados de alteração usando o caderno MERGE
Upsert de consultas de streaming usandoforeachBatch
Você pode usar uma combinação de merge
e foreachBatch
(ver foreachbatch para mais informações) para escrever upserts complexos de uma consulta de streaming em uma tabela Delta. Por exemplo:
- Escreva agregados de streaming no modo de atualização: Isto é muito mais eficiente do que o Modo Completo.
- Escreva um fluxo de alterações na base de dados numa tabela Delta: A consulta de fusão para escrever dados de alteração pode ser usada para aplicar
foreachBatch
continuamente um fluxo de alterações a uma tabela Delta. - Escreva um fluxo de dados na tabela Delta com deduplicação: A consulta de fusão apenas de inserção para a desduplicação pode ser utilizada para escrever
foreachBatch
continuamente dados (com duplicados) para uma tabela Delta com deduplica automática.
Nota
- Certifique-se de que a sua
merge
declaração no interior éforeachBatch
idempotente, uma vez que o reinício da consulta de streaming pode aplicar a operação no mesmo lote de dados várias vezes. - Quando
merge
foreachBatch
utilizado, a taxa de dados de entrada da consulta de streaming (reportada atravésStreamingQueryProgress
e visível no gráfico da taxa de portátil) pode ser reportada como um múltiplo da taxa real a que os dados são gerados na fonte. Isto porquemerge
lê os dados de entrada várias vezes fazendo com que as métricas de entrada sejam multiplicadas. Se isto for um estrangulamento, pode cache o DataFrame do lote antesmerge
e, em seguida, desacompi-lo depoismerge
.