API APPLY CHANGES: Simplifique a captura de dados de alterações nas Tabelas Dinâmicas Delta
As Tabelas Dinâmicas Delta simplificam a CDC (captura de dados de alteração) com a API APPLY CHANGES
. Anteriormente, a instrução MERGE INTO
era comumente usada para processar registros CDC no Azure Databricks. No entanto, MERGE INTO
pode produzir resultados incorretos devido a registros fora de sequência ou exigir lógica complexa para re-ordenar registros.
Ao lidar automaticamente com registros fora de sequência, a API APPLY CHANGES
nas Tabelas Dinâmicas Delta garante o processamento correto dos registros CDC e remove a necessidade de desenvolver uma lógica complexa para lidar com registros fora de sequência.
A API APPLY CHANGES
tem suporte nas interfaces SQL e Python das Tabelas Dinâmicas Delta, incluindo suporte para atualização de tabelas com SCD tipo 1 e tipo 2:
- Use o SCD tipo 1 para atualizar registros diretamente. O histórico não é retido para registros atualizados.
- Use o SCD tipo 2 para reter um histórico de registros, seja em todas as atualizações ou nas atualizações de um conjunto específico de colunas.
Para sintaxe e outras referências, consulte:
- Alterar a captura de dados com Python em Delta Live Tables
- Captura de dados de alterações com SQL em Delta Live Tables
- Controlar o gerenciamento de marca de exclusão para consultas SCD tipo 1
Observação
Este artigo descreve como atualizar tabelas em seu pipeline do Delta Live Tables com base em alterações nos dados de origem. Para saber como registrar e consultar informações de alteração em nível de linha em tabelas Delta, confira Usar o feed de dados de alterações do Delta Lake no Azure Databricks.
Como o CDC é implementado com o Delta Live Tables?
Você deve especificar uma coluna nos dados de origem na qual sequenciar os registros, que o Delta Live Tables interpreta como uma representação monotonicamente crescente da ordem adequada dos dados de origem. O Delta Live Tables manipula automaticamente os dados que chegam fora de ordem. Para alterações do TIPO 2 do SCD, Delta Live Tables propagam os valores de sequenciamento apropriados para as colunas __START_AT
e __END_AT
da tabela de destino. Deve haver uma atualização distinta por chave em cada valor de sequenciamento, e não há suporte para valores de sequenciamento NULL.
Para executar o processamento CDC com Delta Live Tables, primeiro crie uma tabela de streaming e, em seguida, use uma instrução APPLY CHANGES INTO
para especificar a origem, as chaves e o sequenciamento para o feed de alterações. Para criar a tabela de streaming de destino, use a instrução CREATE OR REFRESH STREAMING TABLE
no SQL ou a função create_streaming_table()
no Python. Para criar a instrução que define o processamento CDC, use a instrução APPLY CHANGES
no SQL ou a função apply_changes()
no Python. Para obter detalhes de sintaxe, consulte Alterar a captura de dados com SQL em de Delta Live Tables ou Alterar captura de dados com Python em Delta Live Tables.
Quais objetos de dados são usados para processamento CDFC de Delta Live Tables?
Quando você declara a tabela de destino no metastore do Hive, duas estruturas de dados são criadas:
- Uma exibição usando o nome atribuído à tabela de destino.
- Uma tabela de backup interna usada por Delta Live Tables para gerenciar o processamento CDC. Esta tabela é nomeada anexando
__apply_changes_storage_
ao nome da tabela de destino.
Por exemplo, se você declarar uma tabela de destino chamada dlt_cdc_target
, verá uma exibição chamada dlt_cdc_target
e uma tabela chamada __apply_changes_storage_dlt_cdc_target
no metastore. A criação de uma exibição permite que o Delta Live Tables filtre as informações extras (por exemplo, marcas de exclusão e versões) necessárias para lidar com dados fora de ordem. Para exibir os dados processados, consulte a exibição de destino. Como o esquema da tabela __apply_changes_storage_
pode ser alterado para dar suporte a recursos ou aprimoramentos futuros, você não deve consultar a tabela para uso em produção. Se você adicionar dados manualmente à tabela, os registros deverão vir antes de outras alterações porque as colunas de versão estão ausentes.
Se um pipeline for publicado no Catálogo do Unity, as tabelas de backup internas não estarão acessíveis aos usuários.
Obter dados sobre registros processados por uma consulta CDC do Delta Live Tables
As seguintes métricas são capturadas por consultas apply changes
:
num_upserted_rows
: o número de linhas de saída para as quais foi executado upsert no conjunto de dados durante uma atualização.num_deleted_rows
: o número de linhas de saída existentes excluídas do conjunto de dados durante uma atualização.
A métrica num_output_rows
, que é a saída para fluxos não CDC, não é capturada para consultas apply changes
.
Limitações
O destino da consulta APPLY CHANGES INTO
ou da função apply_changes
não pode ser usado como uma origem para uma tabela de streaming. Uma tabela que lê do destino de uma consulta APPLY CHANGES INTO
ou função apply_changes
deve ser uma exibição materializada.
SCD tipo 1 e SCD tipo 2 no Azure Databricks
As seções a seguir fornecem exemplos que demonstram as consultas do SCD tipo 1 e tipo 2 do Delta Live Tables que atualizam as tabelas de destino com base em eventos de origem que:
- Cria novos registros de usuário.
- Exclui um registro de usuário.
- Atualizam registros de usuário. No exemplo do SCD tipo 1, as últimas operações
UPDATE
chegam tarde e são removidas da tabela de destino, demonstrando a manipulação de eventos fora de ordem.
Os exemplos a seguir assumem familiaridade com a configuração e atualização de pipelines do Delta Live Tables. Consulte Tutorial: como executar seu primeiro pipeline do Delta Live Tables.
Para executar esses exemplos, você deve começar criando um conjunto de dados de exemplo. Consulte Gerar dados de teste.
Veja a seguir os registros de entrada para esses exemplos:
userId | name | city | operação | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | nulo | null | Delete (excluir) | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Se você descompactar a linha final nos dados de exemplo, ele inserirá o seguinte registro que especifica onde os registros devem ser truncados:
userId | name | city | operação | sequenceNum |
---|---|---|---|---|
null | nulo | nulo | TRUNCATE | 3 |
Observação
Todos os exemplos a seguir incluem opções para especificar operações de DELETE
e TRUNCATE
, mas cada uma delas é opcional.
Processar atualizações do SCD tipo 1
O exemplo de código a seguir demonstra o processamento de atualizações do tipo SCD 1:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Depois de executar o exemplo do SCD tipo 1, a tabela de destino contém os seguintes registros:
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
Depois de executar o exemplo SCD tipo 1 com o registro adicional TRUNCATE
, os registros 124
e 126
são truncados devido à operação TRUNCATE
em sequenceNum=3
, e a tabela de destino contém o seguinte registro:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
Processar atualizações do SCD tipo 2
O exemplo de código a seguir demonstra o processamento de atualizações do tipo SCD 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Depois de executar o exemplo do SCD tipo 2, a tabela de destino contém os seguintes registros:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | nulo |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | nulo |
126 | Lily | Cancun | 2 | nulo |
Uma consulta SCD tipo 2 também pode especificar um subconjunto de colunas de saída a serem rastreadas para o histórico na tabela de destino. As alterações em outras colunas são atualizadas em vez de gerar novos registros de histórico. O exemplo a seguir demonstra a exclusão da coluna city
do acompanhamento:
O exemplo a seguir demonstra o uso do histórico de faixas com o SCD tipo 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Depois de executar este exemplo sem o registro adicional TRUNCATE
, a tabela de destino contém os seguintes registros:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | nulo |
125 | Mercedes | Guadalajara | 2 | nulo |
126 | Lily | Cancun | 2 | nulo |
Gerar dados de teste
O código a seguir é fornecido para gerar um conjunto de dados de exemplo para uso nas consultas de exemplo presentes neste tutorial. Supondo que você tenha as credenciais adequadas para criar um novo esquema e criar uma nova tabela, você pode executar essas instruções com um notebook ou o SQL do Databricks. O código a seguir não pretende ser executado como parte de um pipeline do Delta Live Tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Adicionar, alterar ou excluir dados em uma tabela de streaming de destino
Se o pipeline publicar tabelas no Catálogo do Unity, você poderá usar instruções DML linguagem de manipulação de dados, incluindo instruções de inserção, atualização, exclusão e mesclagem, para modificar as tabelas de streaming de destino criadas por instruções APPLY CHANGES INTO
.
Observação
- Não há suporte para instruções DML que modificam o esquema de tabela de uma tabela de streaming. Verifique se as instruções DML não tentam desenvolver o esquema da tabela.
- Instruções DML que atualizam uma tabela de streaming só podem ser executadas em um cluster compartilhado do Catálogo do Unity ou em um SQL warehouse usando o Databricks Runtime 13.3 LTS e versões superiores.
- Como o streaming exige fontes de dados somente acréscimo, se o processamento exigir streaming de uma tabela de streaming de origem com alterações (por exemplo, por instruções DML), defina o sinalizador skipChangeCommits ao ler a tabela de streaming de origem. Quando
skipChangeCommits
é definido, as transações que excluem ou modificam registros na tabela de origem são ignoradas. Se o processamento não exigir uma tabela de streaming, você poderá usar uma exibição materializada (que não tem a restrição somente acréscimo) como a tabela de destino.
Como o Delta Live Tables usa uma coluna SEQUENCE BY
especificada e propaga valores de sequenciamento apropriados para as colunas __START_AT
e __END_AT
da tabela de destino (para SCD tipo 2), você deve garantir que as instruções DML usem valores válidos para essas colunas para manter a ordenação adequada de registros. Consulte Como o CDC é implementado com o Delta Live Tables?.
Para obter mais informações sobre como usar instruções DML com tabelas de streaming, consulte Adicionar, alterar ou excluir dados em uma tabela de streaming.
O exemplo a seguir insere um registro ativo com uma sequência inicial de 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Comentários
https://aka.ms/ContentUserFeedback.
Em breve: Ao longo de 2024, eliminaremos os problemas do GitHub como o mecanismo de comentários para conteúdo e o substituiremos por um novo sistema de comentários. Para obter mais informações, consulteEnviar e exibir comentários de