Trabalhar com o histórico de tabelas do Delta Lake

Cada operação que modifica uma tabela do Delta Lake cria uma nova versão da tabela. Você pode usar informações de histórico para auditar operações ou consultar uma tabela em um ponto específico no tempo.

Observação

O Databricks não recomenda usar o histórico de tabelas do Delta Lake como uma solução de backup de longo prazo para arquivamento de dados. O Databricks recomenda usar apenas os últimos sete dias para operações de viagem no tempo, a menos que você tenha definido as configurações de retenção de dados e logs como um valor maior.

Recuperar o histórico da tabela Delta

Você pode recuperar informações de operações, usuário, carimbo de data/hora e assim por diante relativas a cada gravação em uma tabela do Delta por meio do comando history. As operações são retornadas em ordem cronológica inversa.

A retenção do histórico de tabelas é determinada pela configuração da tabela delta.logRetentionDuration, que é de 30 dias por padrão.

Observação

A viagem no tempo e o histórico de tabelas são controlados por diferentes limites de retenção. Consulte O que é a viagem no tempo do Delta Lake?

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Para obter detalhes de sintaxe do Spark SQL, confira DESCRIBE HISTORY.

Confira a documentação da API do Delta Lake para obter detalhes da sintaxe de Scala/Java/Python.

O Explorador de Catálogos fornece uma exibição visual dessas informações detalhadas da tabela e do histórico de tabelas Delta. Além do esquema de tabela e dos dados de exemplo, você pode clicar na guia Histórico para ver o histórico da tabela que é exibido com DESCRIBE HISTORY.

Esquema de histórico

A saída da operação history tem as colunas a seguir.

Coluna Type Descrição
version long Versão da tabela gerada pela operação.
timestamp timestamp Quando esta versão foi confirmada.
userId string ID do usuário que executou a operação.
userName string Nome do usuário que executou a operação.
operação string Nome da operação.
operationParameters map Parâmetros da operação (por exemplo, predicados).
trabalho struct Detalhes do trabalho que executou a operação.
notebook struct Detalhes do notebook do qual a operação foi executada.
clusterId string ID do cluster em que a operação foi executada.
readVersion long Versão da tabela lida para executar a operação de gravação.
isolationLevel string Nível de isolamento usado nesta operação.
isBlindAppend booleano Se essa operação acrescenta dados.
operationMetrics map Métricas da operação (por exemplo, número de linhas e arquivos modificados).
userMetadata string Metadados de commit definidos pelo usuário se foram especificados
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Observação

Chaves de métricas da operação

A operação history retorna uma coleção de métricas de operações no mapa de colunas operationMetrics.

As tabelas a seguir listam as definições essenciais do mapa por operação.

Operação Nome da métrica Descrição
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Número de arquivos gravados.
numOutputBytes Tamanho em bytes do conteúdo gravado.
numOutputRows Número de linhas gravadas.
ATUALIZAÇÃO DE STREAMING
numAddedFiles Número de arquivos adicionados.
numRemovedFiles Número de arquivos removidos.
numOutputRows Número de linhas gravadas.
numOutputBytes Tamanho da gravação em bytes.
Delete (excluir)
numAddedFiles Número de arquivos adicionados. Não fornecido quando as partições da tabela são excluídas.
numRemovedFiles Número de arquivos removidos.
numDeletedRows Número de linhas removidas. Não fornecido quando as partições da tabela são excluídas.
numCopiedRows Número de linhas copiadas no processo de exclusão de arquivos.
executionTimeMs Tempo para executar toda a operação.
scanTimeMs Tempo para procurar correspondes nos arquivos.
rewriteTimeMs Tempo para reescrever os arquivos correspondentes.
TRUNCATE
numRemovedFiles Número de arquivos removidos.
executionTimeMs Tempo para executar toda a operação.
MESCLAR
numSourceRows Número de linhas do DataFrame de origem.
numTargetRowsInserted Número de linhas inseridas na tabela de destino.
numTargetRowsUpdated Número de linhas atualizadas na tabela de destino.
numTargetRowsDeleted Número de linhas excluídas na tabela de destino.
numTargetRowsCopied Número de linhas de destino copiadas.
numOutputRows Número total de linhas gravadas.
numTargetFilesAdded Número de arquivos adicionados ao sink(target).
numTargetFilesRemoved Número de arquivos removidos do sink(target).
executionTimeMs Tempo para executar toda a operação.
scanTimeMs Tempo para procurar correspondes nos arquivos.
rewriteTimeMs Tempo para reescrever os arquivos correspondentes.
UPDATE
numAddedFiles Número de arquivos adicionados.
numRemovedFiles Número de arquivos removidos.
numUpdatedRows Número de linhas atualizadas.
numCopiedRows Número de linhas copiadas no processo de atualização de arquivos.
executionTimeMs Tempo para executar toda a operação.
scanTimeMs Tempo para procurar correspondes nos arquivos.
rewriteTimeMs Tempo para reescrever os arquivos correspondentes.
FSCK numRemovedFiles Número de arquivos removidos.
CONVERT numConvertedFiles Número de arquivos Parquet convertidos.
OPTIMIZE
numAddedFiles Número de arquivos adicionados.
numRemovedFiles Número de arquivos otimizados.
numAddedBytes Número de bytes adicionados depois que a tabela foi otimizada.
numRemovedBytes Número de bytes removidos.
minFileSize Tamanho do menor arquivo depois que a tabela foi otimizada.
p25FileSize Tamanho do arquivo do 25º percentil depois que a tabela foi otimizada.
p50FileSize Tamanho do arquivo mediano depois que a tabela foi otimizada.
p75FileSize Tamanho do arquivo do 75º percentil depois que a tabela foi otimizada.
maxFileSize Tamanho do maior arquivo depois que a tabela foi otimizada.
CLONAR
sourceTableSize Tamanho em bytes da tabela de origem na versão clonada.
sourceNumOfFiles Número de arquivos da tabela de origem na versão clonada.
numRemovedFiles Número de arquivos removidos da tabela de destino se uma tabela Delta anterior foi substituída.
removedFilesSize Tamanho total em bytes dos arquivos removidos da tabela de destino se uma tabela do Delta anterior foi substituída.
numCopiedFiles Número de arquivos que foram copiados no novo local. 0 para clones superficiais.
copiedFilesSize Tamanho total em bytes dos arquivos que foram copiados no novo local. 0 para clones superficiais.
RESTORE
tableSizeAfterRestore Tamanho da tabela em bytes após a restauração.
numOfFilesAfterRestore Número de arquivos da tabela após a restauração.
numRemovedFiles Número de arquivos removidos pela operação de restauração.
numRestoredFiles Número de arquivos adicionados como resultado da restauração.
removedFilesSize Tamanho em bytes de arquivos removidos pela restauração.
restoredFilesSize Tamanho em bytes de arquivos adicionados pela restauração.
VACUUM
numDeletedFiles Número de arquivos excluídos.
numVacuumedDirectories Número de diretórios aspirados.
numFilesToDelete Número de arquivos a serem excluídos.

O que é a viagem no tempo do Delta Lake?

O Delta Lake dá suporte à consulta de versões de tabela anteriores com base no carimbo de data/hora ou na versão da tabela (conforme registrado no log de transações). Você pode usar viagens no tempo para aplicativos como o seguinte:

  • Recriando análises, relatórios ou saídas (por exemplo, a saída de um modelo de aprendizado de máquina). Isso pode ser útil para depuração ou auditoria, especialmente em setores regulamentados.
  • Escrevendo consultas temporais complexas.
  • Corrigindo erros nos seus dados.
  • Fornecimento de isolamento de instantâneos para um conjunto de consultas de tabelas que são alteradas rapidamente.

Importante

As versões de tabela acessíveis com viagem no tempo são determinadas por uma combinação do limite de retenção para arquivos de log de transações e a frequência e a retenção especificada para VACUUM operações. Se você executar VACUUM diariamente com os valores padrão, sete dias de dados estão disponíveis para viagem no tempo.

Sintaxe de viagem no tempo do Delta

Você consulta uma tabela do Delta com viagem no tempo adicionando uma cláusula após a especificação do nome da tabela.

  • timestamp_expression pode ser qualquer uma das seguintes opções:
    • '2018-10-18T22:15:12.013Z', ou seja, uma cadeia de caracteres que pode ser convertida em um carimbo de data/hora
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', ou seja, uma cadeia de caracteres de data
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Qualquer outra expressão que seja ou possa ser convertida em um carimbo de data/hora
  • version é um valor longo que pode ser obtido da saída de DESCRIBE HISTORY table_spec.

timestamp_expression e version não podem ser subconsultas.

Somente cadeias de caracteres de carimbo de data/hora ou data são aceitas. Por exemplo, "2019-01-01" e "2019-01-01T00:00:00.000Z". Confira o código a seguir para obter uma sintaxe de exemplo:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

Você também pode usar a sintaxe @ para especificar o carimbo de data/hora ou a versão como parte do nome da tabela. O carimbo de data/hora precisa estar no formato yyyyMMddHHmmssSSS. Você pode especificar uma versão após @, incluindo v no início da versão. Confira o código a seguir para obter uma sintaxe de exemplo:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

O que são pontos de verificação de log de transações?

O Delta Lake registra versões de tabela como arquivos JSON no diretório _delta_log, que é armazenado juntamente com os dados da tabela. Para otimizar a consulta de ponto de verificação, o Delta Lake agrega versões de tabela aos arquivos de ponto de verificação Parquet, eliminando a necessidade de ler todas as versões JSON do histórico de tabelas. O Azure Databricks otimiza a frequência de ponto de verificação para o tamanho dos dados e a carga de trabalho. Os usuários não devem interagir diretamente com pontos de verificação. A frequência do ponto de verificação está sujeita a alterações sem aviso prévio.

Configurar retenção de dados para consultas de viagem no tempo

Para consultar uma versão anterior da tabela, você precisa manter tanto o log quanto os arquivos de dados para essa versão.

Os arquivos de dados são excluídos quando VACUUM é executado em uma tabela. O Delta Lake gerencia a remoção de arquivo de log automaticamente após versões de tabela de ponto de verificação.

Como a maioria das tabelas Delta têm VACUUM executadas nelas regularmente, as consultas pontuais devem respeitar o limite de retenção para VACUUM, que é de sete dias por padrão.

Para aumentar o limite de retenção de dados para tabelas Delta, você deve configurar as seguintes propriedades da tabela:

  • delta.logRetentionDuration = "interval <interval>": controla por quanto tempo o histórico de uma tabela é mantido. O padrão é interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": determina o limite VACUUM usado para remover arquivos de dados que não são mais referenciados na versão atual da tabela. O padrão é interval 7 days.

Você pode especificar as propriedades Delta durante a criação da tabela ou defini-las com uma instrução ALTER TABLE. Confira Referência de propriedades da tabela Delta.

Observação

Você deve definir ambas as propriedades para garantir que o histórico da tabela seja retido por mais tempo para tabelas com operações VACUUM frequentes. Por exemplo, para acessar 30 dias de dados históricos, defina delta.deletedFileRetentionDuration = "interval 30 days" (que corresponde à configuração padrão para delta.logRetentionDuration).

Aumentar o limite de retenção de dados pode fazer com que os custos de armazenamento aumentem à medida que mais arquivos de dados forem mantidos.

Restaurar um estado anterior de uma tabela Delta

Você pode restaurar uma tabela Delta para seu estado anterior com o RESTORE comando. As tabela do Delta mantêm internamente as versões históricas que permitem a restauração para um estado anterior. Uma versão correspondente ao estado anterior ou um carimbo de data/hora de quando o estado anterior foi criado têm suporte como opções pelo comando RESTORE.

Importante

  • Você pode restaurar uma tabela já restaurada.
  • Você pode restaurar uma tabela clonada.
  • Você precisa ter a permissão MODIFY na tabela que está sendo restaurada.
  • Você não pode restaurar uma tabela para uma versão mais antiga em que os arquivos de dados foram excluídos manualmente ou pelo vacuum. A restauração parcial para essa versão ainda é possível se spark.sql.files.ignoreMissingFiles estiver definido como true.
  • O formato de carimbo de data/hora para restaurar para um estado anterior é yyyy-MM-dd HH:mm:ss. Também há suporte para fornecer apenas uma cadeia de caracteres de data (yyyy-MM-dd).
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Para obter detalhes da sintaxe, confira RESTORE.

Importante

A restauração é considerada uma operação de alteração de dados. As entradas de log do Delta Lake adicionadas pelo comando RESTORE contêm dataChange definido como true. Se houver um aplicativo downstream, como um trabalho de streaming estruturado que processa as atualizações para uma tabela do Delta Lake, as entradas de log de alterações de dados adicionadas pela operação de restauração serão consideradas como novas atualizações de dados e processá-las poderá resultar em dados duplicados.

Por exemplo:

Versão da tabela Operação Atualizações de log delta Registros em atualizações de log de alterações de dados
0 INSERT AddFile(/path/to/file-1, dataChange = true) (nome = Viktor, idade = 29 anos, (nome = George, idade = 55 anos)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (nome = George, idade = 39 anos)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Nenhum registro como Otimizar compactação não altera os dados na tabela)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (nome = Viktor, idade = 29 anos), (nome = George, idade = 55 anos), (nome = George, idade = 39 anos)

No exemplo anterior, o comando RESTORE resulta em atualizações que já foram vistas durante a leitura da tabela Delta versão 0 e 1. Se uma consulta de streaming fez a leitura dessa tabela, esses arquivos serão considerados como dados recém-adicionados e serão processados novamente.

Restaurar métricas

RESTORE relata as seguintes métricas como um DataFrame de linha única quando a operação é concluída:

  • table_size_after_restore: o tamanho da tabela após a restauração.

  • num_of_files_after_restore: o número de arquivos da tabela após a restauração.

  • num_removed_files: o número de arquivos removidos (logicamente excluídos) da tabela.

  • num_restored_files: o número de arquivos restaurados devido à reversão.

  • removed_files_size: o tamanho total em bytes dos arquivos removidos da tabela.

  • restored_files_size: o tamanho total em bytes dos arquivos restaurados.

    Exemplo de restauração de métricas

Exemplos de como usar a viagem no tempo do Delta Lake

  • Corrigir exclusões acidentais em uma tabela para o usuário 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Corrigir atualizações incorretas acidentais em uma tabela:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Consultar o número de novos clientes adicionados na última semana.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Como encontrar a versão do último commit na sessão do Spark?

Para ter acesso ao número da versão do último commit gravado pelo atual SparkSession em todos os threads e todas as tabelas, confira a configuração do SQL spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Se o SparkSession não tiver feito nenhum commit, a consulta à chave irá retornar um valor vazio.

Observação

Compartilhar o mesmo SparkSession com vários threads é semelhante a compartilhar uma variável com vários threads. Você pode atingir condições de corrida, pois o valor de configuração é atualizado simultaneamente.