Trabalhar com o histórico da tabela Delta Lake

Cada operação que modifica uma tabela Delta Lake cria uma nova versão da tabela. Você pode usar as informações do histórico para auditar operações, reverter uma tabela ou consultar uma tabela em um ponto específico no tempo usando viagens no tempo.

Nota

O Databricks não recomenda a utilização do histórico de tabelas do Delta Lake como uma solução de cópia de segurança de longo prazo para o arquivo de dados. O Databricks recomenda a utilização apenas dos últimos 7 dias para operações de viagem no tempo, a menos que tenha definido as configurações de retenção de dados e registos para um valor maior.

Obter o histórico da tabela Delta

Você pode recuperar informações, incluindo as operações, o usuário e o carimbo de data/hora para cada gravação em uma tabela Delta executando o history comando. As operações são retornadas em ordem cronológica inversa.

A retenção do histórico da tabela é determinada pela configuração delta.logRetentionDurationda tabela, que é de 30 dias por padrão.

Nota

As viagens no tempo e o histórico de tabelas são controlados por diferentes limiares de retenção. Veja O que é a viagem no tempo do Delta Lake?

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Para obter detalhes da sintaxe do Spark SQL, consulte DESCRIBE HISTORY.

Consulte a documentação da API Delta Lake para obter detalhes da sintaxe Scala/Java/Python.

O Catalog Explorer fornece uma exibição visual dessas informações detalhadas da tabela e do histórico das tabelas Delta. Além do esquema da tabela e dos dados de exemplo, você pode clicar na guia Histórico para ver o histórico da tabela exibido com DESCRIBE HISTORYo .

Esquema do histórico

A saída da history operação tem as seguintes colunas.

Column Tipo Description
versão long Versão da tabela gerada pela operação.
carimbo de data/hora carimbo de data/hora Quando esta versão foi confirmada.
ID de Utilizador string ID do usuário que executou a operação.
nome de utilizador string Nome do usuário que executou a operação.
operation string Nome da operação.
operationParameters map Parâmetros da operação (por exemplo, predicados.)
tarefa estruturar Detalhes do trabalho que executou a operação.
bloco de notas estruturar Detalhes do caderno a partir do qual a operação foi executada.
clusterId string ID do cluster no qual a operação foi executada.
readVersion long Versão da tabela que foi lida para executar a operação de gravação.
Nível de isolamento string Nível de isolamento usado para esta operação.
isBlindAppend boolean Se esta operação anexou dados.
operationMetrics map Métricas da operação (por exemplo, número de linhas e arquivos modificados.)
userMetadata string Metadados de confirmação definidos pelo usuário, se especificados
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Nota

Chaves de métricas de operação

A history operação retorna uma coleção de métricas de operações no mapa de operationMetrics colunas.

As tabelas a seguir listam as definições de chave do mapa por operação.

Operação Nome da métrica Description
ESCREVER, CRIAR TABELA COMO SELECIONAR, SUBSTITUIR TABELA COMO SELECIONAR, COPIAR PARA
numFiles Número de arquivos gravados.
numOutputBytes Tamanho em bytes do conteúdo escrito.
numOutputRows Número de linhas escritas.
ATUALIZAÇÃO DE STREAMING
numAddedFiles Número de ficheiros adicionados.
numRemovedFiles Número de ficheiros removidos.
numOutputRows Número de linhas escritas.
numOutputBytes Tamanho da escrita em bytes.
DELETE
numAddedFiles Número de ficheiros adicionados. Não fornecido quando as partições da tabela são excluídas.
numRemovedFiles Número de ficheiros removidos.
numDeletedRows Número de linhas removidas. Não fornecido quando as partições da tabela são excluídas.
numCopiedRows Número de linhas copiadas no processo de exclusão de arquivos.
executionTimeMs Tempo necessário para executar toda a operação.
scanTimeMs Tempo necessário para verificar os arquivos em busca de correspondências.
rewriteTimeMs Tempo necessário para reescrever os arquivos correspondentes.
TRUNCATE
numRemovedFiles Número de ficheiros removidos.
executionTimeMs Tempo necessário para executar toda a operação.
MESCLAR
numSourceRows Número de linhas no DataFrame de origem.
numTargetRowsInserted Número de linhas inseridas na tabela de destino.
numTargetRowsUpdated Número de linhas atualizadas na tabela de destino.
numTargetRowsDeleted Número de linhas excluídas na tabela de destino.
numTargetRowsCopiado Número de linhas de destino copiadas.
numOutputRows Número total de linhas escritas.
numTargetFilesAdded Número de arquivos adicionados ao coletor (destino).
numTargetFilesRemovido Número de arquivos removidos do coletor (destino).
executionTimeMs Tempo necessário para executar toda a operação.
scanTimeMs Tempo necessário para verificar os arquivos em busca de correspondências.
rewriteTimeMs Tempo necessário para reescrever os arquivos correspondentes.
ATUALIZAR
numAddedFiles Número de ficheiros adicionados.
numRemovedFiles Número de ficheiros removidos.
numUpdatedRows Número de linhas atualizadas.
numCopiedRows Número de linhas apenas copiadas no processo de atualização de arquivos.
executionTimeMs Tempo necessário para executar toda a operação.
scanTimeMs Tempo necessário para verificar os arquivos em busca de correspondências.
rewriteTimeMs Tempo necessário para reescrever os arquivos correspondentes.
FSCK numRemovedFiles Número de ficheiros removidos.
CONVERTER numConvertedFiles Número de arquivos Parquet que foram convertidos.
OPTIMIZE
numAddedFiles Número de ficheiros adicionados.
numRemovedFiles Número de arquivos otimizados.
numAddedBytes Número de bytes adicionados depois que a tabela foi otimizada.
numRemovedBytes Número de bytes removidos.
minFileSize Tamanho do menor arquivo depois que a tabela foi otimizada.
p25FileSize Tamanho do arquivo de percentil 25 depois que a tabela foi otimizada.
p50FileSize Tamanho médio do arquivo após a otimização da tabela.
p75FileSize Tamanho do arquivo de percentil 75 depois que a tabela foi otimizada.
maxFileSize Tamanho do arquivo maior depois que a tabela foi otimizada.
CLONE
sourceTableSize Tamanho em bytes da tabela de origem na versão clonada.
sourceNumOfFiles Número de arquivos na tabela de origem na versão clonada.
numRemovedFiles Número de arquivos removidos da tabela de destino se uma tabela Delta anterior foi substituída.
removedFilesSize Tamanho total, em bytes, dos arquivos removidos da tabela de destino se uma tabela Delta anterior tiver sido substituída.
numCopiedFiles Número de arquivos que foram copiados para o novo local. 0 para clones superficiais.
copiedFilesSize Tamanho total em bytes dos arquivos que foram copiados para o novo local. 0 para clones superficiais.
RESTAURAR
tableSizeAfterRestore Tamanho da tabela em bytes após a restauração.
numOfFilesAfterRestore Número de arquivos na tabela após a restauração.
numRemovedFiles Número de ficheiros removidos pela operação de restauro.
numRestoredFiles Número de arquivos que foram adicionados como resultado da restauração.
removedFilesSize Tamanho em bytes dos arquivos removidos pela restauração.
restoredFilesSize Tamanho em bytes dos arquivos adicionados pela restauração.
VACUUM
numDeletedFiles Número de ficheiros eliminados.
numVacuumedDirectories Número de diretórios aspirados.
numFilesToDelete Número de ficheiros a eliminar.

O que é Delta Lake viagem no tempo?

A viagem no tempo do Delta Lake suporta a consulta de versões anteriores da tabela com base no carimbo de data/hora ou na versão da tabela (conforme registrado no log de transações). Você pode usar a viagem no tempo para aplicativos como os seguintes:

  • Recriar análises, relatórios ou saídas (por exemplo, a saída de um modelo de aprendizado de máquina). Isso pode ser útil para depuração ou auditoria, especialmente em setores regulamentados.
  • Escrever consultas temporais complexas.
  • Corrigir erros nos seus dados.
  • Fornecendo isolamento de instantâneo para um conjunto de consultas para tabelas de mudança rápida.

Importante

As versões de tabela acessíveis com viagens no tempo são determinadas por uma combinação do limite de retenção para arquivos de log de transações e a frequência e retenção especificada para VACUUM operações. Se você executar VACUUM diariamente com os valores padrão, 7 dias de dados estarão disponíveis para viagem no tempo.

Sintaxe de viagem no tempo delta

Você consulta uma tabela Delta com viagem no tempo adicionando uma cláusula após a especificação do nome da tabela.

  • timestamp_expression pode ser qualquer um:
    • '2018-10-18T22:15:12.013Z', ou seja, uma cadeia de caracteres que pode ser convertida em um carimbo de data/hora
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', ou seja, uma cadeia de caracteres de data
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Qualquer outra expressão que seja ou possa ser convertida em um carimbo de data/hora
  • version é um valor longo que pode ser obtido a partir da saída de DESCRIBE HISTORY table_spec.

Nem timestamp_expressionversion podem ser subconsultas.

Somente cadeias de caracteres de carimbo de data ou hora são aceitas. Por exemplo, "2019-01-01" e "2019-01-01T00:00:00.000Z". Consulte o seguinte código para exemplo de sintaxe:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

Você também pode usar a @ sintaxe para especificar o carimbo de data/hora ou a versão como parte do nome da tabela. O carimbo de data/hora deve estar no yyyyMMddHHmmssSSS formato. Você pode especificar uma versão depois @ antecipando a v para a versão. Consulte o seguinte código para exemplo de sintaxe:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

O que são pontos de verificação do log de transações?

O Delta Lake registra as versões da tabela como arquivos JSON dentro do diretório, que é armazenado junto com os dados da _delta_log tabela. Para otimizar a consulta de pontos de verificação, o Delta Lake agrega versões de tabela a arquivos de ponto de verificação Parquet, evitando a necessidade de ler todas as versões JSON do histórico de tabelas. O Azure Databricks otimiza a frequência de pontos de verificação para o tamanho dos dados e a carga de trabalho. Os usuários não devem precisar interagir diretamente com os pontos de verificação. A frequência do ponto de verificação está sujeita a alterações sem aviso prévio.

Configurar a retenção de dados para consultas de viagem no tempo

Para consultar uma versão anterior da tabela, você deve manter o log e os arquivos de dados dessa versão.

Os arquivos de dados são excluídos quando VACUUM executados em uma tabela. O Delta Lake gerencia a remoção do arquivo de log automaticamente após as versões da tabela de pontos de verificação.

Como a maioria das tabelas Delta tem VACUUM sido executada regularmente, as consultas point-in-time devem respeitar o limite de retenção do VACUUM, que é de 7 dias por padrão.

Para aumentar o limite de retenção de dados para tabelas Delta, você deve configurar as seguintes propriedades da tabela:

  • delta.logRetentionDuration = "interval <interval>": controla por quanto tempo o histórico de uma tabela é mantido. A predefinição é interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": Determina o limite VACUUM usado para remover arquivos de dados que não são mais referenciados na versão atual da tabela. A predefinição é interval 7 days.

Você pode especificar propriedades Delta durante a criação da tabela ou defini-las com uma ALTER TABLE instrução. Consulte Referência de propriedades da tabela delta.

Nota

Você deve definir ambas as propriedades para garantir que o histórico de tabelas seja mantido por mais tempo para tabelas com operações frequentes VACUUM . Por exemplo, para acessar 30 dias de dados históricos, defina delta.deletedFileRetentionDuration = "interval 30 days" (que corresponde à configuração padrão para delta.logRetentionDuration).

Aumentar o limite de retenção de dados pode fazer com que os custos de armazenamento aumentem, à medida que mais arquivos de dados são mantidos.

Restaurar uma tabela Delta para um estado anterior

Você pode restaurar uma tabela Delta para seu estado anterior usando o RESTORE comando. Uma tabela Delta mantém internamente versões históricas da tabela que lhe permitem ser restauradas para um estado anterior. Uma versão correspondente ao estado anterior ou a um carimbo de data/hora de quando o estado anterior foi criado é suportada como opções pelo comando RESTORE.

Importante

  • Você pode restaurar uma tabela já restaurada.
  • Você pode restaurar uma tabela clonada .
  • Você deve ter MODIFY permissão na tabela que está sendo restaurada.
  • Não é possível restaurar uma tabela para uma versão mais antiga em que os arquivos de dados foram excluídos manualmente ou pelo vacuum. A restauração parcial para esta versão ainda é possível se spark.sql.files.ignoreMissingFiles estiver definida como true.
  • O formato de carimbo de data/hora para restaurar para um estado anterior é yyyy-MM-dd HH:mm:ss. O fornecimento apenas de uma cadeia de caracteres date(yyyy-MM-dd) também é suportado.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Para obter detalhes de sintaxe, consulte RESTAURAR.

Importante

A restauração é considerada uma operação de alteração de dados. As entradas de log do Delta Lake adicionadas RESTORE pelo comando contêm dataChange definido como true. Se houver um aplicativo downstream, como um trabalho de streaming estruturado que processa as atualizações para uma tabela Delta Lake, as entradas do log de alteração de dados adicionadas pela operação de restauração serão consideradas como novas atualizações de dados e processá-las poderá resultar em dados duplicados.

Por exemplo:

Versão da tabela Operação Atualizações de log delta Registros em atualizações de log de alteração de dados
0 INSERT AddFile(/path/to/file-1, dataChange = true) (nome = Viktor, idade = 29, (nome = George, idade = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (nome = George, idade = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Sem registros, pois a compactação Otimize não altera os dados na tabela)
3 RESTAURAR(versão=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (nome = Viktor, idade = 29), (nome = George, idade = 55), (nome = George, idade = 39)

No exemplo anterior, o RESTORE comando resulta em atualizações que já foram vistas ao ler a tabela Delta versão 0 e 1. Se uma consulta de streaming estava lendo esta tabela, esses arquivos serão considerados como dados recém-adicionados e serão processados novamente.

Restaurar métricas

RESTORE relata as seguintes métricas como um DataFrame de linha única quando a operação for concluída:

  • table_size_after_restore: O tamanho da tabela após a restauração.

  • num_of_files_after_restore: O número de arquivos na tabela após a restauração.

  • num_removed_files: Número de ficheiros removidos (logicamente eliminados) da tabela.

  • num_restored_files: Número de arquivos restaurados devido à reversão.

  • removed_files_size: Tamanho total em bytes dos arquivos que são removidos da tabela.

  • restored_files_size: Tamanho total em bytes dos arquivos que são restaurados.

    Exemplo de métricas de restauração

Exemplos de utilização de viagens no tempo do Delta Lake

  • Corrigir exclusões acidentais em uma tabela para o usuário 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Corrigir atualizações incorretas acidentais em uma tabela:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Consulte o número de novos clientes adicionados na última semana.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Como posso encontrar a última versão de confirmação na sessão do Spark?

Para obter o número da versão da última confirmação escrita pela corrente SparkSession em todos os threads e todas as tabelas, consulte a configuração spark.databricks.delta.lastCommitVersionInSessionSQL .

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Se nenhuma confirmação tiver sido feita pelo SparkSession, consultar a chave retornará um valor vazio.

Nota

Se você compartilhar o mesmo SparkSession em vários threads, é semelhante ao compartilhamento de uma variável em vários threads: você pode atingir as condições de corrida à medida que o valor de configuração é atualizado simultaneamente.