Выборочно перезаписывать данные с помощью Delta Lake

Azure Databricks использует функции Delta Lake для поддержки двух различных вариантов выборочного перезаписи:

  • Параметр replaceWhere атомарно заменяет все записи, соответствующие заданному предикату.
  • Можно заменить каталоги данных на основе секционирования таблиц с помощью динамических перезаписей секций.

Для большинства операций Databricks рекомендует указать replaceWhere , какие данные следует перезаписать.

Внимание

Если данные были случайно перезаписаны, можно использовать восстановление для отмены изменения.

Произвольная выборочная перезапись с помощью replaceWhere

Вы можете выборочно перезаписать только данные, соответствующие произвольному выражению.

Примечание.

ДЛЯ SQL требуется Databricks Runtime 12.2 LTS или более поздней версии.

Следующая команда выполняет атомарную замену событий в январе в целевой таблице, которая секционирована по start_date с использованием данных в replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Этот пример кода записывает данные в replace_data, проверяет соответствие всех строк предикату и выполняет атомарную замену с помощью семантики overwrite . Если какие-либо значения в операции выходят за пределы ограничения, эта операция завершается ошибкой по умолчанию.

Это поведение можно изменить на overwrite значения в диапазоне предиката и insert записи, которые выходят за пределы указанного диапазона. Для этого отключите ограничение проверка, установив spark.databricks.delta.replaceWhere.constraintCheck.enabled значение false с помощью одного из следующих параметров:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Устаревшее поведение

Устаревшее поведение по умолчанию перезаписывало replaceWhere данные, соответствующие предикату только для столбцов секций. В этой устаревшей модели следующая команда будет атомарно заменять месяц в целевой таблице, которая секционирована dateпо данным в df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Если вы хотите вернуться к старому поведению, можно отключить spark.databricks.delta.replaceWhere.dataColumns.enabled флаг:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Динамические перезаписи секций

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Databricks Runtime 11.3 LTS и более поздних версий поддерживает режим динамической перезаписи секционирования для секционированных таблиц. Для таблиц с несколькими секциями Databricks Runtime 11.3 LTS и ниже поддерживаются только динамические перезаписи секций, если все столбцы секций имеют одинаковый тип данных.

При перезаписи динамического раздела операции перезаписывают все существующие данные в каждой логической секции, для которой запись фиксирует новые данные. Все существующие логические секции, для которых запись не содержит данных, остаются неизменными. Этот режим применяется только в том случае, если данные записываются в режиме перезаписи: INSERT OVERWRITE в SQL или запись DataFrame с df.write.mode("overwrite").

Чтобы настроить режим динамической перезаписи секций, задайте для конфигурации сеанса Spark spark.sql.sources.partitionOverwriteMode значение dynamic. Его также можно включить, задав для параметра partitionOverwriteMode в DataFrameWriter значение dynamic. Если задан параметр на уровне запроса, он переопределяет режим, указанный в конфигурации сеанса. Значение partitionOverwriteMode по умолчанию — static.

Внимание

Убедитесь в том, что данные, записываемые путем динамической перезаписи секций, относятся к соответствующим секциям. Наличие всего одной строки в неправильной секции может привести к непреднамеренной перезаписи всей секции.

В следующем примере показано использование динамических перезаписей секций:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Примечание.

  • Динамическая перезапись секций конфликтует с параметром replaceWhere секционированных таблиц.
    • Если в конфигурации сеанса Spark включена динамическая перезапись секций и в DataFrameWriter задан параметр replaceWhere, Delta Lake перезаписывает данные в соответствии с выражением replaceWhere (параметры на уровне запроса переопределяют конфигурацию сеанса).
    • При наличии динамической перезаписи и replaceWhere включения параметров возникает ошибкаDataFrameWriter.
  • Невозможно указать overwriteSchema , как true при использовании динамической перезаписи секции.