Служебные команды для таблиц

Разностные таблицы поддерживают ряд команд служебной программы.

Удалить файлы, на которые больше не ссылается разностная таблица

Можно удалить файлы, на которые больше не ссылается разностная таблица и которые старше порогового значения хранения, выполнив vacuum команду в таблице. vacuum не активируется автоматически. Пороговое значение хранения по умолчанию для файлов — 7 дней.

Важно!

  • vacuum Удаляет только файлы данных, а не файлы журнала. Файлы журнала удаляются автоматически и асинхронно после операций контрольной точки. по умолчанию срок хранения файлов журнала составляет 30 дней, настраиваемый с помощью delta.logRetentionDuration свойства, заданного с помощью ALTER TABLE SET TBLPROPERTIES метода SQL. См. раздел Свойства таблицы.
  • Возможность перехода к версии старше, чем срок хранения, теряется после выполнения vacuum .

SQL

VACUUM eventsTable   -- vacuum files not required by versions older than the default retention period

VACUUM '/data/events' -- vacuum files in path-based table

VACUUM delta.`/data/events/`

VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old

VACUUM eventsTable DRY RUN    -- do dry run to get the list of files to be deleted

сведения о синтаксисе Spark SQL см. в разделе

Python

Примечание

API Python доступен в Databricks Runtime 6,1 и более поздних версиях.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)  # path-based tables, or
deltaTable = DeltaTable.forName(spark, tableName)    # Hive metastore-based tables

deltaTable.vacuum()        # vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     # vacuum files not required by versions more than 100 hours old

Scala

Примечание

API Scala доступен в Databricks Runtime 6,0 и более поздних версий.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

Java

Примечание

API Java доступен в Databricks Runtime 6,0 и более поздних версий.

import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

deltaTable.vacuum();        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100);     // vacuum files not required by versions more than 100 hours old

Сведения о синтаксисе Scala, Java и Python см. в статье разностного API-интерфейса .

Предупреждение

Модули данных рекомендуют устанавливать интервал хранения не менее 7 дней, так как старые моментальные снимки и незафиксированные файлы по-прежнему могут использоваться параллельными модулями чтения или записи в таблице. Если VACUUM очищает активные файлы, параллельные модули чтения могут завершаться сбоем или, что еще хуже, таблицы могут быть повреждены при VACUUM удалении файлов, которые еще не были зафиксированы. Необходимо выбрать интервал, который больше, чем самая длинная параллельная транзакция, и самый длинный период, в течение которого любой поток может отставать от последнего обновления таблицы.

Дельта Lake имеет проверку безопасности, чтобы предотвратить запуск опасной VACUUM команды. Если вы уверены, что для этой таблицы не выполняются никакие операции, превышающие указанный интервал хранения, можно отключить эту проверку безопасности, задав для свойства конфигурации Spark значение spark.databricks.delta.retentionDurationCheck.enabled false .

Сведения об аудите

VACUUM Фиксация в журнале транзакций изменений содержит данные аудита. События аудита можно запрашивать с помощью DESCRIBE HISTORY .

Получить журнал разностных таблиц

Для каждой операции записи в разностную таблицу можно получить сведения об операциях, пользователе, метке времени и т. д., выполнив history команду. Операции возвращаются в обратном хронологическом порядке. По умолчанию журнал таблиц сохраняется в течение 30 дней.

SQL

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

сведения о синтаксисе Spark SQL см. в разделе

Python

Примечание

API Python доступен в Databricks Runtime 6,1 и более поздних версиях.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation

Scala

Примечание

API Scala доступен в Databricks Runtime 6,0 и более поздних версий.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) // get the last operation

Java

Примечание

API Java доступен в Databricks Runtime 6,0 и более поздних версий.

import io.delta.tables.*;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

DataFrame fullHistoryDF = deltaTable.history();       // get the full history of the table

DataFrame lastOperationDF = deltaTable.history(1);    // fetch the last operation on the DeltaTable

Сведения о синтаксисе Scala/Java/Python см. в статье разностного API-интерфейса .

Схема журнала

Выходные данные history операции имеют следующие столбцы.

Столбец Type Описание
version long Версия таблицы, созданная операцией.
TIMESTAMP TIMESTAMP При фиксации этой версии.
userId строка Идентификатор пользователя, выполнившего операцию.
userName строка Имя пользователя, выполнившего операцию.
Операция строка Имя операции.
operationParameters карта Параметры операции (например, предикаты).
задание struct Сведения о задании, которое запустило операцию.
записная книжка struct Сведения о записной книжке, из которой выполнялась операция.
clusterId строка ИДЕНТИФИКАТОР кластера, в котором выполнялась операция.
readVersion long Версия таблицы, которая была считана для выполнения операции записи.
isolationLevel строка Уровень изоляции, используемый для этой операции.
isBlindAppend boolean Добавлена ли эта операция к данным.
оператионметрикс карта Метрики операции (например, число измененных строк и файлов).
усерметадата строка Определяемые пользователем метаданные фиксации, если они были указаны
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Примечание

  • Метрики операций доступны, только если команда журнала и операция в журнале выполнялись с использованием Databricks Runtime 6,5 или более поздней версии.
  • Некоторые из других столбцов недоступны при записи в разностную таблицу с помощью следующих методов:
  • Столбцы, добавленные в будущем, всегда будут добавляться после последнего столбца.

Ключи метрик операции

historyОперация возвращает коллекцию метрик операций в operationMetrics сопоставлении столбцов.

В следующих таблицах перечислены определения ключей карт по операциям.

Операция Имя метрики Описание
ЗАПИСЬ, CREATE TABLE КАК SELECT, ЗАМЕНИТЬ ТАБЛИЦУ КАК SELECT, КОПИРОВАТЬ В
нумфилес Число записанных файлов.
нумаутпутбитес Размер записанного содержимого в байтах.
нумаутпутровс Число записанных строк.
ПОТОКОВАЯ ПЕРЕДАЧА ОБНОВЛЕНИЯ
нумаддедфилес Число добавленных файлов.
нумремоведфилес Число удаленных файлов.
нумаутпутровс Число записанных строк.
нумаутпутбитес Размер записи в байтах.
DELETE
нумаддедфилес Число добавленных файлов. Не указывается при удалении секций таблицы.
нумремоведфилес Число удаленных файлов.
нумделетедровс Число удаленных строк. Не указывается при удалении секций таблицы.
нумкопиедровс Число строк, скопированных в процессе удаления файлов.
ексекутионтимемс Время, затраченное на выполнение всей операции.
скантимемс Время, затраченное на сканирование файлов на соответствие.
ревритетимемс Время, затраченное на перезапись сопоставленных файлов.
TRUNCATE
нумремоведфилес Число удаленных файлов.
ексекутионтимемс Время, затраченное на выполнение всей операции.
MERGE
нумсаурцеровс Количество строк в исходном блоке данных.
нумтаржетровсинсертед Число строк, вставленных в целевую таблицу.
нумтаржетровсупдатед Число строк, обновленных в целевой таблице.
нумтаржетровсделетед Число строк, удаленных в целевой таблице.
нумтаржетровскопиед Количество скопированных целевых строк.
нумаутпутровс Общее число записанных строк.
нумтаржетфилесаддед Число файлов, добавленных в приемник (цель).
нумтаржетфилесремовед Число файлов, удаленных из приемника (целевого объекта).
ексекутионтимемс Время, затраченное на выполнение всей операции.
скантимемс Время, затраченное на сканирование файлов на соответствие.
ревритетимемс Время, затраченное на перезапись сопоставленных файлов.
UPDATE
нумаддедфилес Число добавленных файлов.
нумремоведфилес Число удаленных файлов.
нумупдатедровс Число обновленных строк.
нумкопиедровс Количество строк, только что скопировано в процессе обновления файлов.
ексекутионтимемс Время, затраченное на выполнение всей операции.
скантимемс Время, затраченное на сканирование файлов на соответствие.
ревритетимемс Время, затраченное на перезапись сопоставленных файлов.
ЭКВИВАЛЕНТ нумремоведфилес Число удаленных файлов.
CONVERT нумконвертедфилес Число преобразованных файлов Parquet.
Операция Имя метрики Описание
CLONE (1)
саурцетаблесизе Размер в байтах исходной таблицы в клонированной версии.
саурценумоффилес Число файлов в исходной таблице в клонированной версии.
нумремоведфилес Число файлов, удаленных из целевой таблицы, если была заменена Предыдущая разностная таблица.
ремоведфилессизе Общий размер в байтах файлов, удаленных из целевой таблицы, если была заменена Предыдущая разностная таблица.
нумкопиедфилес Число файлов, которые были скопированы в новое расположение. 0 для поверхностных клонов.
копиедфилессизе Общий размер файлов в байтах, которые были скопированы в новое расположение. 0 для поверхностных клонов.
Восстановление (2)
таблесизеафтерресторе Размер таблицы в байтах после восстановления.
нумоффилесафтерресторе Число файлов в таблице после восстановления.
нумремоведфилес Число файлов, удаленных операцией восстановления.
нумресторедфилес Число файлов, добавленных в результате восстановления.
ремоведфилессизе Размер файлов в байтах, удаленных при восстановлении.
ресторедфилессизе Размер файлов в байтах, добавленных при восстановлении.
УВЕЛИЧИТЬ
нумаддедфилес Число добавленных файлов.
нумремоведфилес Число оптимизированных файлов.
нумаддедбитес Число байтов, добавленных после оптимизации таблицы.
нумремоведбитес Число удаленных байтов.
минфилесизе Размер наименьшего файла после оптимизации таблицы.
p25FileSize Размер 25 процентилей файла после оптимизации таблицы.
p50FileSize Медиана размера файла после оптимизации таблицы.
p75FileSize Размер файла 75 процентилей после оптимизации таблицы.
maxFileSize Размер самого крупного файла после оптимизации таблицы.
Очистка (3)
нумделетедфилес Число удаленных файлов.
нумвакуумеддиректориес Число очищенных каталогов.
нумфилестоделете Число удаляемых файлов.

(1) требуется DATABRICKS Runtime 7,3 LTS или более поздней версии.

(2) требуется Databricks Runtime 7,4 или более поздней версии.

(3) требуется Databricks Runtime 8,2 или более поздней версии.

Получение сведений о разностной таблице

Подробные сведения о разностной таблице (например, число файлов, размер данных) можно получить с помощью команды DESCRIBE DETAIL .

DESCRIBE DETAIL '/data/events/'

DESCRIBE DETAIL eventsTable

сведения о синтаксисе Spark SQL см. в разделе

Схема сведений

Выходные данные этой операции имеют только одну строку со следующей схемой.

Столбец Type Описание
format строка Формат таблицы, то есть delta .
идентификатор строка Уникальный идентификатор таблицы.
name строка Имя таблицы, определенное в хранилище метаданных.
description строка Описание таблицы.
location строка Расположение таблицы.
дата создания TIMESTAMP При создании таблицы.
lastModified TIMESTAMP Дата последнего изменения таблицы.
партитионколумнс Массив строк Имена столбцов секционирования, если таблица секционирована.
нумфилес long Число файлов в последней версии таблицы.
sizeInBytes INT Размер последнего моментального снимка таблицы в байтах.
properties Map строковая строка Все свойства, заданные для этой таблицы.
минреадерверсион INT Минимальная версия модулей чтения (в соответствии с протоколом log), которая может считывать таблицу.
минвритерверсион INT Минимальная версия модулей записи (в соответствии с протоколом журнала), который может записывать в таблицу.
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format|                  id|              name|description|            location|           createdAt|       lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable|       null|file:/Users/tuor/...|2020-06-05 12:20:...|2020-06-05 12:20:20|              []|      10|      12345|        []|               1|               2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+

Преобразование таблицы Parquet в разностную таблицу

Преобразование таблицы Parquet в разностную таблицу на месте. Эта команда выводит список всех файлов в каталоге, создает разностный журнал транзакций, который отслеживает эти файлы, и автоматически определяет схему данных, считывая нижние колонтитулы всех файлов Parquet. Если данные секционированы, необходимо указать схему столбцов секционирования в виде строки в формате DDL (то есть <column-name1> <type>, <column-name2> <type>, ... ).

Примечание

если таблица Parquet была создана структурированной потоковой передачей, то список файлов можно избежать с помощью _spark_metadata вложенного каталога в качестве источника истинности для файлов, содержащихся в таблице, задав для параметра SQL конфигурацию значение spark.databricks.delta.convert.useMetadataLog true .

SQL

-- Convert unpartitioned Parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`<path-to-table>`

-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`<path-to-table>` PARTITIONED BY (part int, part2 int)

Дополнительные сведения о синтаксисе см. в разделе .

Python

Примечание

API Python доступен в Databricks Runtime 6,1 и более поздних версиях.

from delta.tables import *

# Convert unpartitioned Parquet table at path '<path-to-table>'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

# Convert partitioned parquet table at path '<path-to-table>' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int")

Scala

Примечание

API Scala доступен в Databricks Runtime 6,0 и более поздних версий.

import io.delta.tables._

// Convert unpartitioned Parquet table at path '<path-to-table>'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int")

Java

Примечание

API Scala доступен в Databricks Runtime 6,0 и более поздних версий.

import io.delta.tables.*;

// Convert unpartitioned Parquet table at path '<path-to-table>'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`");

// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int");

Примечание

Любой файл, не записанный в разностной файловой системой, недоступен и может быть удален при запуске vacuum . В процессе преобразования следует избегать обновления или добавления файлов данных. После преобразования таблицы убедитесь, что все операции записи проходят через разностную версию Lake.

Преобразование таблицы Delta в таблицу Parquet

Можно легко преобразовать разностную таблицу обратно в таблицу Parquet, выполнив следующие действия.

  1. Если вы выполнили операции дельты Lake, которые могут изменять файлы данных (например, delete или merge ), запустите чистильщик с временем хранения 0 часов, чтобы удалить все файлы данных, не принадлежащие к последней версии таблицы.
  2. Удалите _delta_log каталог в каталоге таблиц.

Восстановление разностной таблицы в более раннее состояние

Примечание

Доступно в Databricks Runtime 7,4 и более поздних версий.

Можно восстановить разностную таблицу в прежнее состояние с помощью RESTORE команды. Разностная таблица внутренне поддерживает исторические версии таблицы, которые позволяют восстановить предыдущее состояние. Версия, соответствующая предыдущему состоянию или отметке времени, когда было создано предыдущее состояние, поддерживается в качестве параметров RESTORE команды.

Важно!

  • Можно восстановить уже восстановленную таблицу и клонированные таблицы.
  • Восстановление таблицы до более старой версии, в которой файлы данных были удалены вручную или с vacuum ошибкой, приведет к сбою. Восстановление до этой версии по-прежнему возможно, если spark.sql.files.ignoreMissingFiles для задано значение true .
  • Формат метки времени для восстановления в более раннем состоянии — yyyy-MM-dd HH:mm:ss . yyyy-MM-ddТакже поддерживается только строка даты ().

SQL

RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, <path-to-table>)  # path-based tables, or
deltaTable = DeltaTable.forName(spark, <table-name>)    # Hive metastore-based tables

deltaTable.restoreToVersion(0) # restore table to oldest version

deltaTable.restoreToTimestamp('2019-02-14') # restore to a specific timestamp

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, <path-to-table>)
val deltaTable = DeltaTable.forName(spark, <table-name>)

deltaTable.restoreToVersion(0) // restore table to oldest version

deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp

Java

import io.delta.tables.*;

DeltaTable deltaTable = DeltaTable.forPath(spark, <path-to-table>);
DeltaTable deltaTable = DeltaTable.forName(spark, <table-name>);

deltaTable.restoreToVersion(0) // restore table to oldest version

deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp

Сведения о синтаксисе см. в разделе Restore (Delta Lake on Azure Databricks).

Восстановить метрики

Примечание

Доступно в Databricks Runtime 8,2 и более поздних версий.

RESTORE передает следующие метрики в виде кадров данных одной строки после завершения операции:

  • table_size_after_restore: Размер таблицы после восстановления.
  • num_of_files_after_restore: Число файлов в таблице после восстановления.
  • num_removed_files: Число удаленных (логически удаленных) файлов из таблицы.
  • num_restored_files: Число файлов, восстановленных из-за отката.
  • removed_files_size: Общий размер файлов в байтах, удаляемых из таблицы.
  • restored_files_size: Общий размер восстанавливаемых файлов в байтах.

Пример восстановления метрик

Контроль доступа к таблице

Необходимо иметь MODIFY разрешение на восстанавливаемую таблицу.

Клонировать разностную таблицу

Примечание

Доступно в Databricks Runtime 7,2 и более поздних версий.

Вы можете создать копию существующей разностной таблицы в определенной версии с помощью clone команды. Клоны могут быть либо глубокими, либо поверхностными.

В этом разделе рассматриваются следующие вопросы.

Типы клонов

  • Глубокий клон — это клон, который копирует данные исходной таблицы в целевой объект клона в дополнение к метаданным существующей таблицы. Кроме того, метаданные потока также клонируется таким, что поток, записывающий данные в разностную таблицу, может быть остановлен в исходной таблице и продолжаться на целевом объекте клона, где она была отключена.
  • Неглубокий клон — это клон, который не копирует файлы данных в цель клонирования. Метаданные таблицы эквивалентны источнику. Создание этих клонов является дешевле.

Любые изменения, вносимые в глубокие или поверхностные клоны, влияют только на сами клоны, а не исходную таблицу.

Клонированные метаданные включают в себя: схему, сведения о секционировании, инвариантность, допустимость значений NULL. Только для глубоких клонов метаданные Stream и Copy INTO (Дельта Lake on Azure Databricks) также клонированы. Метаданные не клонированы — описание таблицы и определяемые пользователем метаданные фиксации.

Важно!

  • Неполные клоны файлы ссылочных данных в исходном каталоге. Если запустить vacuum на клиентах исходной таблицы, они больше не смогут считывать файлы данных, на которые имеются ссылки, и FileNotFoundException будет выдано исключение. В этом случае выполнение клонирования с заменой на неполном клоне приведет к исправлению клона. Если это происходит часто, попробуйте использовать глубокий клон, который не зависит от исходной таблицы.
  • Глубокие клоны не зависят от источника, из которого они были клонированы, но требуют больших затрат, так как при глубоком клонировании данные и метаданные копируются.
  • При клонировании с целью, в которой replace уже есть таблица в этом пути, создается разностный журнал, если он не существует по этому пути. Можно очистить существующие данные, выполнив vacuum . Если существующая таблица является разностной, в существующей разностной таблице создается новая фиксация, включающая новые метаданные и новые данные из исходной таблицы.
  • Клонирование таблицы отличается от Create Table As Select или CTAS . В дополнение к данным, клон копирует метаданные исходной таблицы. Кроме того, клонирование имеет более простой синтаксис: вам не нужно указывать секционирование, формат, инварианты, допустимость значений NULL и т. д., так как они берутся из исходной таблицы.
  • Клонированная таблица имеет независимый журнал из своей исходной таблицы. Запросы на нерабочее время на клонированную таблицу не будут работать с теми же входными данными, что и в исходной таблице.

SQL

 CREATE TABLE delta.`/data/target/` CLONE delta.`/data/source/` -- Create a deep clone of /data/source at /data/target

 CREATE OR REPLACE TABLE db.target_table CLONE db.source_table -- Replace the target

 CREATE TABLE IF NOT EXISTS TABLE delta.`/data/target/` CLONE db.source_table -- No-op if the target table exists

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source`

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` VERSION AS OF version

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` TIMESTAMP AS OF timestamp_expression -- timestamp can be like “2019-01-01” or like date_sub(current_date(), 1)

Python

 from delta.tables import *

 deltaTable = DeltaTable.forPath(spark, pathToTable)  # path-based tables, or
 deltaTable = DeltaTable.forName(spark, tableName)    # Hive metastore-based tables

 deltaTable.clone(target, isShallow, replace) # clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) # clone the source at a specific version

# clone the source at a specific timestamp such as timestamp=“2019-01-01”
 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace)

Scala

 import io.delta.tables._

 val deltaTable = DeltaTable.forPath(spark, pathToTable)
 val deltaTable = DeltaTable.forName(spark, tableName)

 deltaTable.clone(target, isShallow, replace) // clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) // clone the source at a specific version

 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace) // clone the source at a specific timestamp

Java

 import io.delta.tables.*;

 DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
 DeltaTable deltaTable = DeltaTable.forName(spark, tableName);

 deltaTable.clone(target, isShallow, replace) // clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) // clone the source at a specific version

 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace) // clone the source at a specific timestamp

Сведения о синтаксисе см. в разделе Clone (Дельта Lake on Azure Databricks).

Клонировать метрики

Примечание

Доступно в Databricks Runtime 8,2 и более поздних версий.

CLONE передает следующие метрики в виде кадров данных одной строки после завершения операции:

  • source_table_size: Размер исходной таблицы, которая клонируется в байтах.
  • source_num_of_files: Число файлов в исходной таблице.
  • num_removed_files: При замене таблицы количество файлов, удаляемых из текущей таблицы.
  • num_copied_files: Количество файлов, скопированных из источника (0 для поверхностных клонов).
  • removed_files_size: Размер в байтах файлов, удаляемых из текущей таблицы.
  • copied_files_size: Размер в байтах файлов, копируемых в таблицу.

Пример метрик клонирования

Разрешения

Необходимо настроить разрешения для Azure Databricks управления доступом к таблицам и поставщика облачных служб.

Контроль доступа к таблице

Для глубоких и поверхностных клонов требуются следующие разрешения.

  • SELECT разрешение на исходную таблицу.
  • Если используется CLONE для создания новой таблицы, CREATE разрешение на базу данных, в которой создается таблица.
  • При использовании CLONE для замены таблицы необходимо иметь MODIFY разрешение на таблицу.

Разрешения поставщика облачных служб

Если вы создали глубокий клон, любой пользователь, который считывает глубокий клон, должен иметь доступ на чтение к каталогу клона. Чтобы внести изменения в клон, пользователи должны иметь доступ на запись к каталогу клона.

Если вы создали поверхностный клон, любой пользователь, который считывает неполную копию, должен иметь разрешение на чтение файлов в исходной таблице, так как файлы данных остаются в исходной таблице с незаполненными клонами, а также с каталогом клона. Чтобы внести изменения в клон, пользователям потребуется доступ для записи к каталогу клона.

Клонирование вариантов использования

В этом разделе рассматриваются следующие вопросы.

Архивация данных

Может потребоваться хранить данные дольше, чем возможно, с помощью поездок по времени или для аварийного восстановления. В таких случаях можно создать глубокий клон, чтобы сохранить состояние таблицы в определенный момент времени для архивации. Добавочное архивирование также позволяет постоянно обновлять состояние исходной таблицы для аварийного восстановления.

-- Every month run
CREATE OR REPLACE TABLE delta.`/some/archive/path` CLONE my_prod_table

Воспроизведение потока машинного обучения

при выполнении машинного обучения может потребоваться архивация определенной версии таблицы, в которой была обучена модель ML. Будущие модели можно тестировать с помощью этого архивного набора данных.

-- Trained model on version 15 of Delta table
CREATE TABLE delta.`/model/dataset` CLONE entire_dataset VERSION AS OF 15

Краткосрочные эксперименты в рабочей таблице

Чтобы протестировать рабочий процесс в рабочей таблице без повреждения таблицы, можно легко создать неглубокий клон. Это позволяет запускать произвольные рабочие процессы в клонированной таблице, которая содержит все рабочие данные, но не влияет на рабочие нагрузки.

-- Perform shallow clone
CREATE OR REPLACE TABLE my_test SHALLOW CLONE my_prod_table;

UPDATE my_test WHERE user_id is null SET invalid=true;
-- Run a bunch of validations. Once happy:

-- This should leverage the update information in the clone to prune to only
-- changed files in the clone if possible
MERGE INTO my_prod_table
USING my_test
ON my_test.user_id <=> my_prod_table.user_id
WHEN MATCHED AND my_test.user_id is null THEN UPDATE *;

DROP TABLE my_test;

Общий доступ к данным

Другим подразделениям в пределах одной организации может потребоваться получить доступ к тем же данным, но при этом могут не требоваться последние обновления. Вместо предоставления доступа к исходной таблице напрямую можно предоставить клоны с разными разрешениями для разных подразделений. Производительность клона может превыситься для простого представления.

-- Perform deep clone
CREATE OR REPLACE TABLE shared_table CLONE my_prod_table;

-- Grant other users access to the shared table
GRANT SELECT ON shared_table TO `<user-name>@<user-domain>.com`;

Переопределения свойств таблицы

Переопределения свойств таблицы особенно полезны для:

  • Добавление заметок к таблицам с данными о владельце или пользователе при совместном использовании данных с разными подразделениями.

  • Требуется архивировать разностные таблицы и время поездок. Срок хранения журнала можно указать независимо для архивной таблицы. Например:

    SQL
    CREATE OR REPLACE TABLE archive.my_table CLONE prod.my_table
    TBLPROPERTIES (
      delta.logRetentionDuration = '3650 days',
      delta.deletedFileRetentionDuration = '3650 days'
    )
    LOCATION 'xx://archive/my_table'
    
    Python
    dt = DeltaTable.forName(spark, "prod.my_table")
    tblProps = {
      "delta.logRetentionDuration": "3650 days",
      "delta.deletedFileRetentionDuration": "3650 days"
    }
    dt.clone('xx://archive/my_table', isShallow=False, replace=True, tblProps)
    
    Scala
    val dt = DeltaTable.forName(spark, "prod.my_table")
    val tblProps = Map(
      "delta.logRetentionDuration" -> "3650 days",
      "delta.deletedFileRetentionDuration" -> "3650 days"
    )
    dt.clone("xx://archive/my_table", isShallow = false, replace = true, properties = tblProps)