Работа с журналом таблиц Delta Lake

Каждая операция, которая изменяет таблицу Delta Lake, создает новую версию таблицы. Сведения журнала можно использовать для аудита операций, отката таблицы или запроса таблицы в определенный момент времени с помощью перемещения по времени.

Примечание.

Databricks не рекомендует использовать журнал таблиц Delta Lake в качестве долгосрочного решения резервного копирования для архивации данных. Databricks рекомендует использовать только последние 7 дней для операций перехода по времени, если только вы не установили большее значение для конфигураций хранения данных и журналов.

Получение журнала таблицы Delta

Вы можете получить сведения, включая операции, пользователя и метку времени для каждой записи в таблицу Delta, выполнив history команду. Операции возвращаются в обратном хронологическом порядке.

Хранение журнала таблиц определяется параметром delta.logRetentionDurationтаблицы, который составляет 30 дней по умолчанию.

Примечание.

Журнал перехода по времени и таблиц управляется различными пороговыми значения для хранения. См. статью Что такое переход по времени Delta Lake?.

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

Сведения о синтаксисе Spark SQL см. в разделе "ОПИСАНИЕ ЖУРНАЛА".

Дополнительные сведения о синтаксисе Scala/Java/Python см. в документации по API Delta Lake.

Обозреватель каталога предоставляет визуальное представление этой подробной информации о таблице и журнале для таблиц Delta. Помимо схемы таблицы и выборки данных можно также щелкнуть вкладку Журнал, чтобы просмотреть журнал таблицы, который отображается с DESCRIBE HISTORY.

Схема журнала

Выходные данные операции history имеют указанніе ниже столбцы.

Column Type Описание
версия длинный Версия таблицы, созданная операцией.
TIMESTAMP Метка времени Когда версия была зафиксирована.
userId строка Идентификатор пользователя, запустившего операцию.
userName строка Имя пользователя, запустившего операцию.
Операция строка Имя операции.
operationParameters map Параметры операции (например, предикаты).
задание struct Сведения о задании, которое запустило операцию.
записная книжка struct Сведения о записной книжке, из которой выполнялась операция.
clusterId строка Идентификатор кластера, в котором выполнялась операция.
readVersion длинный Версия таблицы, которая была считана для выполнения операции записи.
isolationLevel строка Уровень изоляции, использованный для этой операции.
isBlindAppend boolean Выплнялось ли добавление данных в ходе этой операции.
operationMetrics map Метрики операции (например, число измененных строк и файлов).
userMetadata строка Определяемые пользователем метаданные фиксации, если они были указаны.
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Примечание.

Ключевые метрики операции

Операция history возвращает коллекцию метрик операции в сопоставлении столбцов operationMetrics.

В следующих таблицах перечислены ключевые определения по операции.

Операция Имя метрики Description
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Число записанных файлов.
numOutputBytes Размер записанного содержимого в байтах.
numOutputRows Число записанных строк.
ПОТОКОВАЯ ПЕРЕДАЧА ОБНОВЛЕНИЯ
numAddedFiles Число добавленных файлов
numRemovedFiles Число удаленных файлов.
numOutputRows Число записанных строк.
numOutputBytes Размер записанных данных в байтах.
DELETE
numAddedFiles Число добавленных файлов Не указывается при удалении секций таблицы.
numRemovedFiles Число удаленных файлов.
numDeletedRows Число удаленных строк. Не указывается при удалении секций таблицы.
numCopiedRows Число строк, скопированных в процессе удаления файлов.
executionTimeMs Время, затраченное на выполнение всей операции.
scanTimeMs Время, затраченное на сканирование файлов на соответствие.
rewriteTimeMs Время, затраченное на перезапись сопоставленных файлов.
TRUNCATE
numRemovedFiles Число удаленных файлов.
executionTimeMs Время, затраченное на выполнение всей операции.
MERGE
numSourceRows Число строк в исходном DataFrame.
numTargetRowsInserted Число строк, вставленных в целевую таблицу.
numTargetRowsUpdated Число строк, обновленных в целевой таблице.
numTargetRowsDeleted Число строк, удаленных в целевой таблице.
numTargetRowsCopied Число скопированных целевых строк.
numOutputRows Число выведенных строк.
numTargetFilesAdded Число файлов, добавленных в приемник (целевой объект).
numTargetFilesRemoved Число файлов, удаленных из приемника (целевого объекта).
executionTimeMs Время, затраченное на выполнение всей операции.
scanTimeMs Время, затраченное на сканирование файлов на соответствие.
rewriteTimeMs Время, затраченное на перезапись сопоставленных файлов.
UPDATE
numAddedFiles Число добавленных файлов
numRemovedFiles Число удаленных файлов.
numUpdatedRows Число обновленных строк.
numCopiedRows Число строк, только что скопированных в процессе обновления файлов.
executionTimeMs Время, затраченное на выполнение всей операции.
scanTimeMs Время, затраченное на сканирование файлов на соответствие.
rewriteTimeMs Время, затраченное на перезапись сопоставленных файлов.
FSCK numRemovedFiles Число удаленных файлов.
CONVERT numConvertedFiles Число преобразованных файлов Parquet.
OPTIMIZE
numAddedFiles Число добавленных файлов
numRemovedFiles Число оптимизированных файлов.
numAddedBytes Число байтов, добавленных после оптимизации таблицы.
numRemovedBytes Число удаленніх байтов.
minFileSize Размер наименьшего файла после оптимизации таблицы.
p25FileSize Размер файла 25-го процентиля после оптимизации таблицы.
p50FileSize Медианный размер файла после оптимизации таблицы.
p75FileSize Размер файла 75-го процентиля после оптимизации таблицы.
maxFileSize Размер наибольшего файла после оптимизации таблицы.
CLONE;
sourceTableSize Размер в байтах исходной таблицы в клонированной версии.
sourceNumOfFiles Число файлов в исходной таблице в клонированной версии.
numRemovedFiles Число файлов, удаленных из целевой таблицы, если была заменена предыдущая таблица Delta.
removedFilesSize Общий размер в байтах файлов, удаленных из целевой таблицы, если была заменена предыдущая таблица Delta.
numCopiedFiles Число файлов, которые были скопированы в новое расположение. 0 для поверхностных клонов.
copiedFilesSize Общий размер файлов в байтах, которые были скопированы в новое расположение. 0 для поверхностных клонов.
RESTORE
tableSizeAfterRestore Размер таблицы в байтах после восстановления.
numOfFilesAfterRestore Число файлов в таблице после восстановления.
numRemovedFiles Число файлов, удаленных операцией восстановления.
numRestoredFiles Число файлов, добавленных в результате восстановления.
removedFilesSize Размер в байтах файлов, удаленных при восстановлении.
restoredFilesSize Размер в байтах файлов, добавленных при восстановлении.
Команда VACUUM
numDeletedFiles Число удаленных файлов.
numVacuumedDirectories Число каталогов, очищенных с помощью операции vacuum.
numFilesToDelete Число удаляемых файлов.

Что такое путешествие по времени Delta Lake?

Время Delta Lake поддерживает запросы предыдущих версий таблицы на основе метки времени или таблицы (как записано в журнале транзакций). Для таких приложений можно использовать поездку по времени:

  • Воссоздание анализа, отчетов или выходных данных (например, выходных данных модели машинного обучения). Это может быть полезно для отладки или аудита, особенно в регулируемых отраслях.
  • Написание сложных темпоральных запросов.
  • Устранение ошибок в данных.
  • Изоляция моментальных снимков для набора запросов для быстро меняющихся таблиц.

Внимание

Версии таблиц, доступные с помощью перемещения по времени, определяются сочетанием порогового значения хранения для файлов журнала транзакций и частоты и указанного хранения для VACUUM операций. Если вы работаете VACUUM ежедневно со значениями по умолчанию, для перемещения по времени доступно 7 дней данных.

Синтаксис разностных версий перехода по времени

Запросите таблицу Delta со временем, добавив предложение после спецификации имени таблицы.

  • timestamp_expression может быть одним из следующих вариантов:
    • '2018-10-18T22:15:12.013Z', то есть строкой, которая может приводиться к метке времени;
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', то есть строкой даты.
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Любое другое выражение, которое является меткой времени или может быть приведено к ней
  • version — это длинное значение, которое можно получить из выходных данных DESCRIBE HISTORY table_spec.

Ни timestamp_expression, ни version не может быть подзапросом.

Принимаются только строки метки даты или времени. Например, "2019-01-01" и "2019-01-01T00:00:00.000Z". См. следующий код, например синтаксис:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

Можно также использовать @ синтаксис, чтобы указать метку времени или версию в составе имени таблицы. Метка времени должна быть указана в формате yyyyMMddHHmmssSSS. Вы можете указать версию после @, добавив к версии v. См. следующий код, например синтаксис:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

Что такое проверка точки журнала транзакций?

Delta Lake записывает версии таблиц в виде JSON-файлов в _delta_log каталоге, которые хранятся вместе с данными таблицы. Чтобы оптимизировать запросы проверка point, Delta Lake объединяет версии таблиц в файлы Parquet проверка point, предотвращая необходимость чтения всех версий журнала таблиц JSON. Azure Databricks оптимизирует частоту проверка назначения для размера данных и рабочей нагрузки. Пользователям не нужно взаимодействовать с проверка точками напрямую. Частота проверка точки может изменяться без уведомления.

Настройка хранения данных для запросов на поездки по времени

Чтобы запросить предыдущую версию таблицы, необходимо сохранить журнал и файлы данных для этой версии.

Файлы данных удаляются при VACUUM выполнении таблицы. Delta Lake автоматически удаляет файл журнала после проверка назначения версий таблицы.

Так как большинство разностных таблиц VACUUM регулярно выполняются против них, запросы на определенный момент времени должны учитывать пороговое значение VACUUMхранения, которое составляет 7 дней по умолчанию.

Чтобы увеличить порог хранения данных для таблиц Delta, необходимо настроить следующие свойства таблицы:

  • delta.logRetentionDuration = "interval <interval>": управляет продолжительностью хранения журнала для таблицы. Значение по умолчанию — interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": определяет пороговое значение VACUUM , используемое для удаления файлов данных, на которые больше не ссылается текущая версия таблицы. Значение по умолчанию — interval 7 days.

Свойства Delta можно указать во время создания таблицы или задать их с помощью инструкции ALTER TABLE . См. справочник по свойствам таблицы Delta.

Примечание.

Эти свойства необходимо задать, чтобы журнал таблиц сохранялся в течение длительной длительности для таблиц с частыми VACUUM операциями. Например, чтобы получить доступ к 30 дням исторических данных, задайте (delta.deletedFileRetentionDuration = "interval 30 days"который соответствует параметру по умолчанию).delta.logRetentionDuration

Увеличение порогового значения хранения данных может привести к увеличению затрат на хранение, так как сохраняются дополнительные файлы данных.

Восстановление таблицы Delta до предыдущего состояния

Вы можете восстановить прежнее состояние таблицы Delta с помощью команды RESTORE. Разностная Delta внутренне поддерживает предыдущие версии таблицы, что позволяет восстановить ее предыдущее состояние. В качестве параметров команды RESTORE поддерживаются версия, соответствующая предыдущему состоянию, или метка времени, когда было создано предыдущее состояние.

Внимание

  • Вы можете восстановить уже восстановленную таблицу.
  • Вы можете восстановить клонированную таблицу.
  • Требуется разрешение MODIFY для восстанавливаемой таблицы.
  • Нельзя восстановить таблицу до более старой версии, в которой файлы данных были удалены вручную или вручную vacuum. Восстановление до этой версии по-прежнему возможно, если для spark.sql.files.ignoreMissingFiles задано значение true.
  • Формат метки времени для восстановления в более раннем состоянии — yyyy-MM-dd HH:mm:ss. Также поддерживается предоставление только строки даты (yyyy-MM-dd).
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Сведения о синтаксисе см. в разделе RESTORE.

Внимание

Восстановление считается операцией изменения данных. Записи журнала Delta Lake, добавленные командой RESTORE, содержат dataChange со значением true. Если у вас есть подчиненное приложение, например задание структурированного потока, которое обрабатывает обновления таблицы Delta Lake, записи журнала изменений данных, добавленные операцией восстановления, рассматриваются как новые обновления данных, и их обработка может привести к дублированию данных.

Например:

Версия таблицы Операция Обновления журналов изменений Записи в обновлениях журнала изменений данных
0 ВСТАВИТЬ AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 ВСТАВИТЬ AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Нет записей, так как оптимизация сжатия не изменяет данные в таблице.)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

В предыдущем примере выполнение команды RESTORE приводит к обновлениям, которые уже были видны при чтении таблицы Delta версий 0 и 1. Если запрос потоковой передачи выполнял чтение этой таблицы, эти файлы будут считаться новыми добавленными данными и будут обработаны снова.

Метрики восстановления

RESTORE передает следующие метрики в виде DataFrame из одной строки после завершения операции:

  • table_size_after_restore — размер таблицы после восстановления;

  • num_of_files_after_restore — число файлов в таблице после восстановления;

  • num_removed_files — число файлов, удаленных (логически удаленных) из таблицы;

  • num_restored_files — число файлов, восстановленных из-за отката;

  • removed_files_size — общий размер в байтах файлов, удаленных из таблицы;

  • restored_files_size — общий размер восстановленных файлов в байтах.

    Пример восстановления метрик

Примеры использования перехода по времени Delta Lake

  • Исправление случайных удалений в таблице для пользователя 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Исправление случайного неправильного обновления таблицы:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Запросите количество новых клиентов, добавленных за последнюю неделю.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Разделы справки найти версию последней фиксации в сеансе Spark?

Чтобы получить номер версии последней фиксации, записанной текущим SparkSession для всех потоков и всех таблиц, запросите конфигурацию SQL spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Если фиксация объектом SparkSession не выполнена, запрос ключа возвращает пустое значение.

Примечание.

Если одни и те же SparkSession используются совместно в нескольких потоках, это аналогично совместному использованию переменной в нескольких потоках, и могут возникнуть состояния гонки при одновременном обновлении значения конфигурации.