Usar o feed de dados de alteração do Delta Lake no Azure Databricks

Nota

  • Este artigo descreve como registrar e consultar informações de alteração em nível de linha para tabelas Delta usando o recurso de feed de dados de alteração. Para saber como atualizar tabelas em um pipeline Delta Live Tables com base em alterações nos dados de origem, consulte APPLY CHANGES API: Simplify change data capture in Delta Live Tables.

O feed de dados de alteração permite que o Azure Databricks rastreie alterações no nível da linha entre versões de uma tabela Delta. Quando habilitado em uma tabela Delta, os registros de tempo de execução alteram eventos para todos os dados gravados na tabela. Isso inclui os dados da linha juntamente com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.

Você pode ler os eventos de alteração em consultas em lote usando Spark SQL, Apache Spark DataFrames e Structured Streaming.

Importante

O feed de dados de alteração funciona em conjunto com o histórico de tabelas para fornecer informações de alteração. Como a clonagem de uma tabela Delta cria um histórico separado, o feed de dados de alteração em tabelas clonadas não corresponde ao da tabela original.

Casos de utilização

O feed de dados de alteração não está habilitado por padrão. Os seguintes casos de uso devem ser direcionados quando você habilita o feed de dados de alteração.

  • Tabelas Silver e Gold: melhore o desempenho do Delta Lake processando apenas alterações no nível da linha após as operações iniciais MERGE, UPDATEou DELETE para acelerar e simplificar as operações ETL e ELT.
  • Visualizações materializadas: crie visualizações atualizadas e agregadas de informações para uso em BI e análises sem ter que reprocessar as tabelas subjacentes completas, atualizando apenas onde as alterações surgiram.
  • Transmitir alterações: envie um feed de dados de alteração para sistemas downstream, como Kafka ou RDBMS, que podem usá-lo para processar incrementalmente em estágios posteriores de pipelines de dados.
  • Tabela de trilha de auditoria: capture o feed de dados de alteração como uma tabela Delta fornece armazenamento perpétuo e capacidade de consulta eficiente para ver todas as alterações ao longo do tempo, incluindo quando as exclusões ocorrem e quais atualizações foram feitas.

Ativar feed de dados de alteração

Você deve habilitar explicitamente a opção de feed de dados de alteração usando um dos seguintes métodos:

  • Nova tabela: defina a propriedade delta.enableChangeDataFeed = true table no CREATE TABLE comando.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabela existente: defina a propriedade delta.enableChangeDataFeed = true table no ALTER TABLE comando.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Todas as novas tabelas:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Somente as alterações feitas depois que você habilita o feed de dados de alteração são registradas, as alterações anteriores em uma tabela não são capturadas.

Alterar o armazenamento de dados

O Azure Databricks registra dados de alteração para UPDATE, DELETEe MERGE operações na _change_data pasta sob o diretório da tabela. Algumas operações, como operações somente inserção e exclusões de partição completas, não geram dados no diretório porque o _change_data Azure Databricks pode calcular com eficiência o feed de dados de alteração diretamente do log de transações.

Os arquivos na pasta seguem a _change_data política de retenção da tabela. Portanto, se você executar o comando VACUUM , os dados do feed de dados de alteração também serão excluídos.

Ler alterações em consultas em lote

Você pode fornecer a versão ou o carimbo de data/hora para o início e o fim. As versões inicial e final e os carimbos de data/hora estão incluídos nas consultas. Para ler as alterações de uma versão inicial específica para a versão mais recente da tabela, especifique apenas a versão inicial ou o carimbo de data/hora.

Você especifica uma versão como um inteiro e um carimbo de data/hora como uma cadeia de caracteres no formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

Se você fornecer uma versão inferior ou um carimbo de data/hora mais antigo do que um que registrou eventos de alteração, ou seja, quando o feed de dados de alteração foi habilitado, um erro será lançado indicando que o feed de dados de alteração não estava habilitado.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Ler alterações em consultas de streaming

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Para obter os dados de alteração durante a leitura da tabela, defina a opção readChangeFeed como true. O startingVersion ou startingTimestamp são opcionais e, se não fornecido, o fluxo retorna o instantâneo mais recente da tabela no momento do streaming como um INSERT e alterações futuras como dados de alteração. Opções como limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex também são suportadas ao ler dados de alteração.

Nota

O limite de taxa pode ser atômico para versões diferentes da versão inicial do snapshot. Ou seja, toda a versão de confirmação será limitada ou toda a confirmação será retornada.

Por padrão, se um usuário passar em uma versão ou carimbo de data/hora excedendo a última confirmação em uma tabela, o erro timestampGreaterThanLatestCommit será lançado. No Databricks Runtime 11.3 LTS e superior, o feed de dados de alteração pode lidar com o caso da versão fora do intervalo se o usuário definir a seguinte configuração como true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Se você fornecer uma versão inicial maior que a última confirmação em uma tabela ou um carimbo de data/hora de início mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada, um resultado de leitura vazio será retornado.

Se você fornecer uma versão final maior que a última confirmação em uma tabela ou um carimbo de data/hora final mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada no modo de leitura em lote, todas as alterações entre a versão inicial e a última confirmação serão retornadas.

Qual é o esquema para o feed de dados de alteração?

Quando você lê a partir do feed de dados de alteração de uma tabela, o esquema para a versão mais recente da tabela é usado.

Nota

A maioria das operações de mudança e evolução de esquema são totalmente suportadas. A tabela com mapeamento de colunas ativado não suporta todos os casos de uso e demonstra um comportamento diferente. Consulte Alterar limitações do feed de dados para tabelas com mapeamento de colunas habilitado.

Além das colunas de dados do esquema da tabela Delta, o feed de dados de alteração contém colunas de metadados que identificam o tipo de evento de alteração:

Nome da coluna Type Valores
_change_type String insert, update_preimage , update_postimage, ( delete1)
_commit_version Longo A versão de log ou tabela Delta que contém a alteração.
_commit_timestamp Carimbo de Data/Hora O carimbo de data/hora associado quando a confirmação foi criada.

(1)preimage é o valor antes da atualização, postimage é o valor após a atualização.

Nota

Não é possível habilitar a alteração do feed de dados em uma tabela se o esquema contiver colunas com os mesmos nomes dessas colunas adicionadas. Renomeie colunas na tabela para resolver esse conflito antes de tentar habilitar o feed de dados de alteração.

Alterar limitações de feed de dados para tabelas com mapeamento de colunas habilitado

Com o mapeamento de colunas habilitado em uma tabela Delta, você pode soltar ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes. Com o mapeamento de colunas habilitado, o feed de dados de alteração tem limitações depois de executar alterações de esquema não aditivas, como renomear ou soltar uma coluna, alterar o tipo de dados ou alterações de anulabilidade.

Importante

  • Não é possível ler o feed de dados de alteração para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica em lote.
  • No Databricks Runtime 12.2 LTS e inferior, as tabelas com mapeamento de colunas habilitado que sofreram alterações de esquema não aditivas não suportam leituras de streaming no feed de dados de alteração. Consulte Transmissão em fluxo com mapeamento de colunas e alterações de esquema.
  • No Databricks Runtime 11.3 LTS e abaixo, não é possível ler o feed de dados de alteração para tabelas com mapeamento de colunas habilitado que sofreram renomeação ou queda de coluna.

No Databricks Runtime 12.2 LTS e superior, você pode executar leituras em lote no feed de dados de alteração para tabelas com mapeamento de colunas habilitado que sofreram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falham se o intervalo de versão especificado abranger uma alteração de esquema não aditiva.

Perguntas mais frequentes (FAQ)

Qual é a sobrecarga de habilitar o feed de dados de alteração?

Não há impacto significativo. Os registros de dados de alteração são gerados em linha durante o processo de execução da consulta e geralmente são muito menores do que o tamanho total dos arquivos regravados.

Qual é a política de retenção para registros de alteração?

Os registros de alteração seguem a mesma política de retenção das versões de tabela desatualizadas e serão limpos por meio do VACUUM se estiverem fora do período de retenção especificado.

Quando é que os novos registos ficam disponíveis no feed de dados de alteração?

Os dados de alteração são confirmados juntamente com a transação Delta Lake e ficarão disponíveis ao mesmo tempo em que os novos dados estiverem disponíveis na tabela.

Exemplo de bloco de anotações: Propagar alterações com o feed de dados de alteração Delta

Este caderno mostra como propagar as alterações feitas em uma tabela prateada de número absoluto de vacinas para uma tabela ouro de taxas de vacinação.

Alterar o bloco de notas do feed de dados

Obter o bloco de notas