Eliminações, atualizações e uniões de tabelas

A Delta Lake apoia várias declarações para facilitar a eliminação de dados e a atualização de dados nas tabelas Delta.

Excluir de uma mesa

Pode remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, para eliminar todos os eventos de 2017 antes, pode executar o seguinte:

SQL

DELETE FROM events WHERE date < '2017-01-01'

DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

Python

Nota

A API Python está disponível em Databricks Runtime 6.1 e superior.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions

Scala

Nota

A API Scala está disponível em Databricks Runtime 6.0 ou superior.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits

Java

Nota

A API java está disponível em Databricks Runtime 6.0 ou superior.

import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.delete("date < '2017-01-01'");            // predicate using SQL formatted string

deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01")));   // predicate using Spark SQL functions

Consulte a referência API para mais detalhes.

Importante

delete remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente aspiradas. Consulte o vácuo para mais detalhes.

Dica

Quando possível, forneça predicados nas colunas de partição para uma tabela Delta dividida, uma vez que tais predicados podem acelerar significativamente a operação.

Atualizar uma tabela

Pode atualizar dados que correspondam a um predicado numa tabela Delta. Por exemplo, para corrigir um erro ortográfico no eventType , pode executar o seguinte:

SQL

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

Python

Nota

A API Python está disponível em Databricks Runtime 6.1 e superior.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } )   # predicate using SQL formatted string

deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } )   # predicate using Spark SQL functions

Scala

Nota

A API Scala está disponível em Databricks Runtime 6.0 ou superior.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  Map("eventType" -> "'click'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                // predicate using Spark SQL functions and implicits
  col("eventType") === "clck",
  Map("eventType" -> lit("click")));

Java

Nota

A API Scala está disponível em Databricks Runtime 6.0 ou superior.

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  new HashMap<String, String>() {{
    put("eventType", "'click'");
  }}
);

deltaTable.update(                // predicate using Spark SQL functions
  functions.col(eventType).eq("clck"),
  new HashMap<String, Column>() {{
    put("eventType", functions.lit("click"));
  }}
);

Consulte a referência API para mais detalhes.

Dica

Similares a eliminar, as operações de atualização podem obter uma aceleração significativa com predicados em divisórias.

Upsert em uma mesa usando a fusão

Pode aumentar os dados de uma tabela de origem, visualização ou DataFrame numa tabela Delta alvo utilizando a merge operação. Esta operação é semelhante ao MERGE INTO comando SQL, mas tem suporte adicional para eliminações e condições extra em atualizações, inserções e eliminações.

Suponha que tenha um DataFrame spark que contenha novos dados para eventos com eventId . Alguns destes eventos podem já estar presentes na events mesa. Para fundir os novos dados na events tabela, pretende atualizar as linhas correspondentes (ou seja, eventId já presentes) e inserir as novas linhas (ou seja, eventId não presentes). Pode executar o seguinte:

SQL

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

Para detalhes da sintaxe, consulte

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

Java

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

Dataset<Row> updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched()
  .updateExpr(
    new HashMap<String, String>() {{
      put("data", "events.data");
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, String>() {{
      put("date", "updates.date");
      put("eventId", "updates.eventId");
      put("data", "updates.data");
    }})
  .execute();

Consulte a referência API para detalhes da sintaxe de Scala, Java e Python.

Semântica de operação

Aqui está uma descrição detalhada da merge operação programática.

  • Pode haver qualquer número whenMatched de whenNotMatched cláusulas.

    Nota

    No Databricks Runtime 7.2 e abaixo, merge pode ter no máximo 2 whenMatched cláusulas e no máximo 1 whenNotMatched cláusula.

  • whenMatched as cláusulas são executadas quando uma linha de origem corresponde a uma linha de tabela alvo com base na condição de jogo. Estas cláusulas têm a seguinte semântica.

    • whenMatched cláusulas podem ter no máximo update e uma delete ação. A update ação só atualiza as merge colunas especificadas (semelhantes à update operação) da linha-alvo correspondida. A delete ação elimina a linha partida.

    • Cada whenMatched cláusula pode ter uma condição opcional. Se esta condição de cláusula existir, a update ou delete ação é executada para qualquer linha de par de linha de origem-alvo correspondente apenas quando a condição da cláusula é verdadeira.

    • Se houver whenMatched várias cláusulas, então são avaliadas por ordem de serem especificadas (isto é, a ordem das cláusulas importa). Todas as whenMatched cláusulas, exceto a última, devem ter condições.

    • Se ambas as whenMatched cláusulas tiverem condições e nenhuma das condições for verdadeira para um par de linhas de alvo de origem correspondente, então a linha de alvo correspondente fica inalterada.

    • Para atualizar todas as colunas da tabela Delta alvo com as colunas correspondentes do conjunto de dados de origem, utilize whenMatched(...).updateAll() . Isto equivale a:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todas as colunas da tabela Delta alvo. Por conseguinte, esta ação pressupõe que a tabela de origem tem as mesmas colunas que as da tabela-alvo, caso contrário a consulta lança um erro de análise.

      Nota

      Este comportamento muda quando a migração automática de esquemas está ativada. Consulte a evolução automática do esquema para obter mais detalhes.

  • whenNotMatched as cláusulas são executadas quando uma linha de origem não corresponde a qualquer linha alvo com base na condição de jogo. Estas cláusulas têm a seguinte semântica.

    • whenNotMatched cláusulas podem ter apenas a insert ação. A nova linha é gerada com base na coluna especificada e expressões correspondentes. Não é necessário especificar todas as colunas na tabela alvo. Para colunas-alvo não NULL especificadas, é inserida.

      Nota

      No Databricks Runtime 6.5 e abaixo, deve fornecer todas as colunas na tabela alvo para a INSERT ação.

    • Cada whenNotMatched cláusula pode ter uma condição opcional. Se a condição da cláusula estiver presente, uma linha de origem só é inserida se essa condição for verdadeira para essa linha. Caso contrário, a coluna de origem é ignorada.

    • Se houver whenNotMatched várias cláusulas, então são avaliadas por ordem de serem especificadas (isto é, a ordem das cláusulas importa). Todas as whenNotMatched cláusulas, exceto a última, devem ter condições.

    • Para inserir todas as colunas da tabela Delta alvo com as colunas correspondentes do conjunto de dados de origem, utilize whenNotMatched(...).insertAll() . Isto equivale a:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todas as colunas da tabela Delta alvo. Por conseguinte, esta ação pressupõe que a tabela de origem tem as mesmas colunas que as da tabela-alvo, caso contrário a consulta lança um erro de análise.

      Nota

      Este comportamento muda quando a migração automática de esquemas está ativada. Consulte a evolução automática do esquema para obter mais detalhes.

Importante

Uma merge operação pode falhar se várias linhas do conjunto de dados de origem corresponderem e tentar atualizar as mesmas linhas da tabela Delta alvo. De acordo com a semântica SQL de fusão, tal operação de atualização é ambígua, uma vez que não é claro qual a linha de origem que deve ser usada para atualizar a linha-alvo compatível. Pode pré-processar a tabela de origem para eliminar a possibilidade de vários jogos. Consulte o exemplo de captura de dados de alteração— pré-processa o conjunto de dados de alteração (isto é, o conjunto de dados de origem) para reter apenas a última alteração para cada chave antes de aplicar essa alteração na tabela Delta alvo.

Nota

Em Databricks Runtime 7.3 LTS ou superior, são permitidos vários jogos quando os jogos são eliminados incondicionalmente (uma vez que a eliminação incondicional não é ambígua mesmo que existam vários fósforos).

Validação do esquema

merge valida automaticamente que o esquema dos dados gerados por inserção e atualização de expressões são compatíveis com o esquema da tabela. Utiliza as seguintes regras para determinar se a merge operação é compatível:

  • Para update e insert ações, as colunas-alvo especificadas devem existir na tabela Delta alvo.
  • Para updateAll e insertAll ações, o conjunto de dados de origem deve ter todas as colunas da tabela Delta alvo. O conjunto de dados de origem pode ter colunas extras e são ignorados.
  • Para todas as ações, se o tipo de dados gerado pelas expressões que produzem as colunas-alvo for diferente das colunas correspondentes na tabela Delta alvo, merge tenta lanhá-los para os tipos da tabela.

Evolução automática do esquema

Nota

A evolução do Schema merge está disponível em Databricks Runtime 6.6 ou superior.

Por predefinição, updateAll e insertAll atribua todas as colunas na tabela Delta alvo com colunas do mesmo nome do conjunto de dados de origem. As colunas no conjunto de dados de origem que não correspondam às colunas na tabela-alvo são ignoradas. No entanto, em alguns casos de utilização, é desejável adicionar automaticamente colunas de origem à tabela Delta alvo. Para atualizar automaticamente o esquema de tabela durante uma merge operação com updateAll e insertAll (pelo menos um deles), pode definir a configuração da sessão Spark spark.databricks.delta.schema.autoMerge.enabled para antes de executar a true merge operação.

Nota

  • A evolução do esquema ocorre apenas quando há uma updateAll ação ou uma insertAll ação, ou ambos.
  • update e insert as ações não podem referir-se explicitamente a colunas-alvo que já não existem na tabela-alvo (mesmo que existam updateAll ou como uma das insertAll cláusulas). Veja os exemplos abaixo.

Nota

Em Databricks Runtime 7.4 e abaixo, merge suporta a evolução do esquema apenas de colunas de nível superior, e não de colunas aninhadas.

Aqui estão alguns exemplos dos efeitos da merge operação com e sem evolução de esquemas.

Colunas Consulta (em Scala) Comportamento sem evolução de esquema (padrão) Comportamento com evolução do esquema
Colunas-alvo: key, value

Colunas de origem: key, value, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
O esquema da tabela permanece inalterado; apenas colunas key , value são atualizadas/inseridas. O esquema da mesa é alterado para (key, value, newValue) . updateAll atualiza colunas value e , e newValue insertAll insere linhas (key, value, newValue) .
Colunas-alvo: key, oldValue

Colunas de origem: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
updateAll e insertAll as ações lançam um erro porque a oldValue coluna-alvo não está na fonte. O esquema da mesa é alterado para (key, oldValue, newValue) . updateAll atualiza colunas key e newValue deixa oldValue inalteradas, e insertAll insere linhas (key, NULL, newValue) (isto é, oldValue é inserida como NULL ).
Colunas-alvo: key, oldValue

Colunas de origem: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().update(Map(
"newValue" -> col("s.newValue")))
.whenNotMatched().insertAll()
.execute()
update lança um erro porque a coluna newValue não existe na tabela alvo. update ainda lança um erro porque a coluna newValue não existe na tabela alvo.
Colunas-alvo: key, oldValue

Colunas de origem: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insert(Map(
"key" -> col("s.key"),
"newValue" -> col("s.newValue")))
.execute()
insert lança um erro porque a coluna newValue não existe na tabela alvo. insert ainda lança um erro, uma vez que a coluna newValue não existe na tabela alvo.

Otimização do desempenho

Pode reduzir o tempo desaguisso através da fusão utilizando as seguintes abordagens:

  • Reduza o espaço de pesquisa para fósforos: Por predefinição, a merge operação procura toda a tabela Delta para encontrar correspondências na tabela de origem. Uma maneira de acelerar merge é reduzir o espaço de pesquisa adicionando constrangimentos conhecidos na condição de jogo. Por exemplo, suponha que tem uma tabela que é dividida por country e que pretende usar para atualizar date merge informações para o último dia e um país específico. Adicionar a condição

    events.date = current_date() AND events.country = 'USA'
    

    tornará a consulta mais rápida, uma vez que procura fósforos apenas nas divisórias relevantes. Além disso, reduzirá igualmente as possibilidades de conflitos com outras operações simultâneas. Consulte o controlo da Concurrency para mais detalhes.

  • Ficheiros compactos: Se os dados forem armazenados em muitos ficheiros pequenos, a leitura dos dados para procurar correspondência pode tornar-se lenta. Pode compactar pequenos ficheiros em ficheiros maiores para melhorar a produção de leitura. Consulte os ficheiros Compactos para obter mais detalhes.

  • Controle as divisórias baralhar para escrever: A merge operação baralha dados várias vezes para calcular e escrever os dados atualizados. O número de tarefas utilizadas para baralhar é controlado pela configuração da sessão Spark spark.sql.shuffle.partitions . A definição deste parâmetro não só controla o paralelismo como também determina o número de ficheiros de saída. Aumentar o valor aumenta o paralelismo, mas também gera um maior número de ficheiros de dados menores.

  • Ativar escritas otimizadas: Para mesas divididas, merge pode produzir um número muito maior de pequenos ficheiros do que o número de divisórias baralhadas. Isto porque cada tarefa de baralhar pode escrever vários ficheiros em várias divisórias, e pode tornar-se um estrangulamento de desempenho. Pode otimizá-lo ativando as escritas otimizadas.

Exemplos de uniões

Aqui estão alguns exemplos sobre como usar merge em diferentes cenários.

Nesta secção:

Deduplicação de dados ao escrever em tabelas Delta

Um caso comum de utilização da ETL é recolher registos na tabela Delta, colocando-os numa mesa. No entanto, muitas vezes as fontes podem gerar registos duplicados e são necessárias medidas de deduplica a jusante para cuidar deles. Com merge , pode evitar inserir os registos duplicados.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Nota

O conjunto de dados que contém os novos registos tem de ser desduplicado dentro de si. Pela semântica SQL de fusão, corresponde e desafina os novos dados com os dados existentes na tabela, mas se houver dados duplicados dentro do novo conjunto de dados, é inserido. Por isso, desduplicar os novos dados antes de se fundir na tabela.

Se souber que pode obter registos duplicados apenas por alguns dias, pode otimizar ainda mais a sua consulta dividindo a tabela por data e, em seguida, especificando o intervalo de data da tabela-alvo para combinar.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Isto é mais eficiente do que o comando anterior, pois procura duplicados apenas nos últimos 7 dias de registos, e não em toda a tabela. Além disso, pode utilizar esta fusão apenas com o Streaming Estruturado para efetuar uma desduplica contínua dos troncos.

  • Numa consulta de streaming, pode utilizar a operação de fusão foreachBatch para escrever continuamente quaisquer dados de streaming para uma tabela Delta com deduplicação. Consulte o exemplo de streaming a seguir para obter mais informações sobre foreachBatch .
  • Em outra consulta de streaming, você pode ler continuamente dados deduplicados a partir desta tabela Delta. Isto é possível porque uma fusão só de inserção anexa novos dados à tabela Delta.

Nota

A fusão só de inserção é otimizada apenas para anexar dados em Databricks Runtime 6.2 e superior. Em Databricks Runtime 6.1 e abaixo, as gravações de operações de fusão apenas inseridas não podem ser lidas como um fluxo.

Alteração lenta da operação de dados (SCD) Tipo 2 em tabelas Delta

Outra operação comum é o SCD Type 2, que mantém a história de todas as alterações feitas a cada chave numa tabela dimensional. Tais operações requerem a atualização das linhas existentes para marcar valores anteriores das teclas tão antigas, e a inserção das novas linhas como os valores mais recentes. Dada uma tabela de origem com atualizações e a tabela-alvo com os dados dimensionais, o SCD Type 2 pode ser expresso com merge .

Aqui está um exemplo concreto de manter o histórico de endereços para um cliente, juntamente com a gama de datas ativas de cada endereço. Quando o endereço de um cliente precisa de ser atualizado, tem de marcar o endereço anterior como não o atual, atualizar o seu intervalo de datas ativas e adicionar o novo endereço como o atual.

SCD Tipo 2 usando caderno de fusão

Obter o bloco de notas

Escreva dados de alteração numa tabela Delta

Semelhante ao SCD, outro caso de uso comum, muitas vezes chamado de captura de dados de alteração (CDC), é aplicar todas as alterações de dados geradas a partir de uma base de dados externa numa tabela Delta. Por outras palavras, um conjunto de atualizações, eliminações e inserções aplicadas a uma tabela externa tem de ser aplicada a uma tabela Delta. Pode fazê-lo da merge seguinte forma.

Escreva dados de alteração usando o caderno MERGE

Obter o bloco de notas

Upsert de consultas de streaming usandoforeachBatch

Você pode usar uma combinação de merge e foreachBatch (ver foreachbatch para mais informações) para escrever upserts complexos de uma consulta de streaming em uma tabela Delta. Por exemplo:

  • Escreva agregados de streaming no modo de atualização: Isto é muito mais eficiente do que o Modo Completo.
  • Escreva um fluxo de alterações na base de dados numa tabela Delta: A consulta de fusão para escrever dados de alteração pode ser usada para aplicar foreachBatch continuamente um fluxo de alterações a uma tabela Delta.
  • Escreva um fluxo de dados na tabela Delta com deduplicação: A consulta de fusão apenas de inserção para a desduplicação pode ser utilizada para escrever foreachBatch continuamente dados (com duplicados) para uma tabela Delta com deduplica automática.

Nota

  • Certifique-se de que a sua merge declaração no interior é foreachBatch idempotente, uma vez que o reinício da consulta de streaming pode aplicar a operação no mesmo lote de dados várias vezes.
  • Quando merge foreachBatch utilizado, a taxa de dados de entrada da consulta de streaming (reportada através StreamingQueryProgress e visível no gráfico da taxa de portátil) pode ser reportada como um múltiplo da taxa real a que os dados são gerados na fonte. Isto porque merge lê os dados de entrada várias vezes fazendo com que as métricas de entrada sejam multiplicadas. Se isto for um estrangulamento, pode cache o DataFrame do lote antes merge e, em seguida, desacompi-lo depois merge .

Escreva agregados de streaming em modo de atualização usando o portátil merge e foreachBatch

Obter o bloco de notas