Рекомендации по использованию Delta Lake

В этой статье приведены рекомендации по использованию Delta Lake.

Databricks рекомендует использовать прогнозную оптимизацию. См . статью "Прогнозная оптимизация" для Delta Lake.

При удалении и повторном создании таблицы в одном расположении всегда следует использовать инструкцию CREATE OR REPLACE TABLE . См. раздел " Удалить или заменить разностную таблицу".

Использование ликвидной кластеризация для оптимизации пропуска данных

Databricks рекомендует использовать ликвидную кластеризация вместо секционирования, Z-порядка или других стратегий организации данных для оптимизации макета данных для пропуска данных. См. статью Использование "жидкой" кластеризации для таблиц Delta.

Сжатие файлов

Прогнозная оптимизация автоматически выполняется OPTIMIZE и VACUUM команды в управляемых таблицах каталога Unity. См . статью "Прогнозная оптимизация" для Delta Lake.

Databricks рекомендует часто выполнять команду OPTIMIZE для сжатия небольших файлов.

Примечание.

Эта операция не удаляет старые файлы. Чтобы удалить их, выполните команду VACUUM.

Замена содержимого или схемы таблицы

Иногда разностную таблицу может потребоваться заменить. Например:

  • Вы обнаружили в таблице неправильные данные и хотите заменить содержимое.
  • Необходимо переписать всю таблицу и внести несовместимые изменения в схему (например, изменить типы столбцов).

Хотя можно удалить весь каталог разностной таблицы и создать по тому же пути новую таблицу, делать это не рекомендуется, поскольку:

  • Удаление каталога не является эффективной операцией. Для удаления каталога, содержащего очень большие файлы, может потребоваться несколько часов или даже дней.
  • Вы теряете все содержимое в удаленных файлах; если удалить неправильную таблицу, трудно восстановить.
  • Удаление каталога не является атомарной операцией. При удалении таблицы параллельный запрос на ее считывание может завершиться ошибкой или вернуть лишь часть таблицы.

Если изменять схему таблицы не требуется, можно удалить данные из разностной таблицы и вставить новое содержимое или обновить таблицу, исправив неверные значения.

Если вы хотите изменить схему таблицы, всю таблицу можно заменить в рамках атомарной операции. Например:

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Такой подход имеет несколько преимуществ.

  • Перезапись таблицы выполняется гораздо быстрее, поскольку при этом не нужно рекурсивно получать содержимое каталога или удалять какие-либо файлы.
  • Старая версия таблицы продолжает существовать. Если удалить неправильную таблицу, можно легко получить старые данные с помощью перемещения по времени. См. Работа с таблицей журнала Delta Lake.
  • Это атомарная операция. В процессе удаления таблицы параллельные запросы по-прежнему могут считывать из нее данные.
  • Если перезапись таблицы завершается ошибкой, таблица благодаря гарантиям выполнения транзакций Delta Lake ACID остается в предыдущем состоянии.

Кроме того, если вы хотите удалить старые файлы, чтобы сэкономить затраты на хранение после перезаписи таблицы, их можно удалить с помощью VACUUM . Он оптимизирован для удаления файлов и обычно быстрее, чем удаление всего каталога.

Кэширование Spark

Databricks не рекомендует использовать кэширование Spark по следующим причинам:

  • Вы теряете любые результаты пропуска данных за счет применения каких-либо фильтров в дополнение к кэшированному DataFrame.
  • Данные, которые кэшируются, могут не обновляться, если таблица обращается с помощью другого идентификатора.

Различия между Delta Lake и Parquet в Apache Spark

Delta Lake автоматически обрабатывает приведенные ниже операции. Эти операции не нужно выполнять вручную:

  • REFRESH TABLE: таблицы Delta всегда возвращают самые актуальные сведения, поэтому нет необходимости вручную вызывать REFRESH TABLE после изменений.
  • Добавление и удаление секций: Delta Lake автоматически отслеживает набор имеющихся в таблице секций и обновляет список по мере добавления или удаления данных. В результате нет необходимости выполнять ALTER TABLE [ADD|DROP] PARTITION или MSCK.
  • Загрузка одной секции: не требуется выполнять чтение секций напрямую. Например, не нужно запускать spark.read.format("parquet").load("/data/date=2017-01-01"). Вместо этого используйте предложение WHERE для пропуска данных, например spark.read.table("<table-name>").where("date = '2017-01-01'").
  • Не нужно изменять данные вручную: Delta Lake использует журнал транзакций для атомарной фиксации изменений в таблице. Не изменяйте, не добавляйте и не удаляйте файлы данных Parquet напрямую в таблице Delta, так как это может привести к потере данных или повреждению таблицы.

Повышение производительности слияния Delta Lake

Вы можете сократить время слияния с помощью следующих подходов:

  • Уменьшить область поиска для совпадений. по умолчанию операция merge выполняет поиск по всей таблице Delta, чтобы найти совпадения в исходной таблице. Один из способов ускорения merge — уменьшить область поиска, добавив известные ограничения в условие соответствия. Например, предположим, что имеется таблица, секционированная по country и date, и вы хотите использовать merge для обновления сведений за последний день и определенную страну. При добавлении следующего условия запрос ускоряется, так как он ищет совпадения только в соответствующих разделах:

    events.date = current_date() AND events.country = 'USA'
    

    Кроме того, этот запрос также снижает вероятность конфликтов с другими параллельными операциями. Дополнительные сведения см. в статье об уровнях изоляции и конфликтах записи в Azure Databricks .

  • Сжать файлы. Если данные хранятся во многих маленьких файлах, чтение данных для поиска совпадений может сильно замедлиться. Небольшие файлы можно сжать в файлы большего размера, чтобы повысить пропускную способность чтения. Дополнительные сведения см. в разделе "Компактные файлы данных" с оптимизацией в Delta Lake .

  • Управлять секциями перемешивания для записи. Операция mergeнесколько раз перемешивает данные для вычислений и записывает обновленные данные. Количество задач, используемых для случайного перемещения, управляется конфигурацией сеанса Spark spark.sql.shuffle.partitions. Установка этого параметра не только управляет параллелизмом, но и определяет количество выходных файлов. Увеличение значения увеличивает параллелизм, но также создает большее количество файлов данных меньшего размера.

  • Включить оптимизированные операции записи. Для секционированных таблиц merge может выдавать гораздо большее количество небольших файлов, чем количество секций перемешивания. Это связано с тем, что каждая задача перемешивания может записывать несколько файлов в несколько секций и может стать узким местом производительности. Вы можете уменьшить количество файлов, включив оптимизированные записи. См . оптимизированные записи для Delta Lake в Azure Databricks.

  • Настройка размеров файлов в таблице: Azure Databricks может автоматически определить, имеет ли таблица Delta частые merge операции, которые перезаписывают файлы и могут уменьшить размер перезаписанных файлов в ожидании дальнейших перезаписи файлов в будущем. Дополнительные сведения см. в разделе о настройке размеров файлов.

  • Слияние с низким уровнем перетасовки. Слияние с низким уровнем перетасовки обеспечивает оптимизированную реализацию MERGE , которая обеспечивает более высокую производительность для наиболее распространенных рабочих нагрузок. Кроме того, в ней сохраняются существующие оптимизация макета данные, например Z-упорядочение по неизмененным данным.

Управление актуальностью данных

В начале обработки каждого запроса таблицы Delta автоматически обновляются до последней версии. Этот процесс можно наблюдать в записных книжках, когда команда выводит следующее сообщение о состоянии: Updating the Delta table's state. Однако при выполнении ретроспективного анализа для таблицы самые новые данные могут и не требоваться, особенно для таблиц, в которых регулярно принимаются потоковые данные. В таких случаях запросы могут выполняться с использованием устаревших моментальных снимков таблицы Delta. Это может снизить задержку при получении результатов из запросов.

Вы можете настроить отказоустойчивость для устаревших данных, задав конфигурацию spark.databricks.delta.stalenessLimit сеанса Spark со значением строки времени, например 1h или 15m (в течение 1 часа или 15 минут соответственно). Эта конфигурация зависит от сеанса и не влияет на другие клиенты, обращающиеся к таблице. Если состояние таблицы было обновлено в пределах ограничения устаревшей активности, запрос к таблице возвращает результаты, не ожидая последнего обновления таблицы. Этот параметр никогда не предотвращает обновление таблицы и при возврате устаревших данных процессы обновления в фоновом режиме. Если последнее обновление таблицы устарело, запрос не возвращает результаты до завершения обновления состояния таблицы.

Расширенные контрольные точки для запросов с низкой задержкой

Delta Lake записывает контрольные точки в виде статистического состояния таблицы Delta с оптимизированной частотой. Эти контрольные точки служат отправной точкой для вычисления последнего состояния таблицы. Без контрольных точек Delta Lake пришлось бы считывать большую коллекцию файлов JSON (файлов delta), представляющих фиксации в журнале транзакций для расчета состояния таблицы. Кроме того, статистика на уровне столбцов, которую Delta Lake использует для выполнения пропуска данных, хранится в контрольной точке.

Внимание

Контрольные точки Delta Lake отличаются от контрольных точек структурированной потоковой передачи.

Статистика на уровне столбцов хранится в виде структуры и JSON (для обратной совместимости). Формат структуры значительно ускоряет операции чтения Delta Lake, поскольку:

  • Дельта Lake не выполняет дорогостоящий анализ JSON для получения статистики на уровне столбцов.
  • Возможности усечения столбцов Parquet значительно сокращают число операций ввода-вывода, необходимых для чтения статистики для столбца.

Формат структуры включает сбор оптимизаций, которые снижают затраты на операции разностного считывания от нескольких секунд до нескольких десятков миллисекунд, что значительно уменьшает задержку для коротких запросов.

Управление статистикой на уровне столбцов в проверка точках

Вы можете управлять способом записи статистики в контрольные точки с помощью свойств таблицы delta.checkpoint.writeStatsAsJson и delta.checkpoint.writeStatsAsStruct. Если оба свойства таблицы имеют значение false, то Дельта Lake не может выполнить пропуск данных.

  • Пакетные операции записи записывают статистические данные в формате JSON и в формате структуры. delta.checkpoint.writeStatsAsJson имеет значение true.
  • delta.checkpoint.writeStatsAsStruct по умолчанию не определен.
  • Модули чтения используют столбец структуры, если он доступен; в противном случае будет снова использоваться столбец JSON.

Внимание

Расширенные контрольные точки не нарушают совместимость с модулями чтения Delta Lake с открытым исходным кодом. Тем не менее, если установить для параметра delta.checkpoint.writeStatsAsJson значение false, это может повлиять на собственные модули чтения Delta Lake. Обратитесь к поставщикам, чтобы узнать больше о влиянии на производительность.

Включение расширенных проверка точек для структурированных запросов потоковой передачи

Если рабочие нагрузки структурированной потоковой передачи не требуют низкой задержки (менее минуты), можно включить расширенные контрольные точки, выполнив следующую команду SQL:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

Вы также можете улучшить задержку записи проверка point, задав следующие свойства таблицы:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Если пропуск данных не полезен в приложении, можно задать для обоих свойств значение false. Затем статистические данные не собираются или записываются. Databricks не рекомендует эту конфигурацию.