Потребность в оптимизации записи в Apache Spark

Аналитические рабочие нагрузки в модулях обработки больших данных, таких как Apache Spark, выполняются эффективнее всего при использовании больших файлов стандартного размера. Важную роль для производительности играет соотношение между размером файла, количеством файлов, числом рабочих ролей и конфигурациями Spark. Рабочие нагрузки приема в таблицах озера данных могут наследовать особенность, связанную с постоянной записью данных в большое число небольших файлов (этот сценарий обычно называется проблемой с небольшими файлами).

Оптимизация записи — это функция Delta Lake в Synapse, которая сокращает число создаваемых файлов и увеличивает размер отдельных файлов, в которые ведется запись данных. Она динамически оптимизирует секции при создании файлов с размером по умолчанию 128 МБ. Размер целевого файла можно изменить для каждой рабочей нагрузки с помощью параметров.

Эта функция обеспечивает нужный размер файла с помощью дополнительного этапа перетасовки данных по секциям, что ведет к повышению затрат на обработку при записи данных. Небольшой штраф при записи должен компенсироваться эффективностью чтения таблиц.

Примечание.

  • Он доступен в пулах Synapse для версий Apache Spark выше 3.1.

Преимущества оптимизации записи

  • Эта функция доступна в таблицах Delta Lake для записи как в пакетном, так и в потоковом режиме.
  • Менять шаблон работы команды spark.write не требуется. Эта функция активируется параметром конфигурации или свойством таблицы.
  • Она уменьшает число транзакций записи по сравнению с командой OPTIMIZE.
  • Операции OPTIMIZE выполняются быстрее, так как работают с меньшим количеством файлов.
  • Команда VACUUM для удаления старых файлов без ссылок также работает быстрее.
  • Запросы сканируют меньше файлов оптимизированного размера, что повышает или производительность чтения, или использование ресурсов.

Оптимизация сценариев использования записи

Сценарии использования

  • Секционированные таблицы Delta Lake, которые создают файлы неоптимального (менее 128 МБ) или нестандартного размера (файлы, размеры которых различаются).
  • Повторно секционированные кадры данных, которые записываются на диск с неоптимальным размером файлов.
  • Секционированные таблицы Delta Lake, к которым обращаются небольшие пакетные команды SQL, такие как UPDATE, DELETE, MERGE, CREATE TABLE AS SELECT, INSERT INTO и т. д.
  • Сценарии потокового приема с шаблонами добавления данных в секционированные таблицы Delta Lake, где допустима дополнительная задержка при записи.

Сценарии, которых требуется избегать

  • Несекционированные таблицы.
  • Варианты использования, при которых дополнительная задержка при записи неприемлема.
  • Большие таблицы с четко определенной структурой оптимизации и шаблонами чтения.

Включение и отключение функции оптимизации записи

Функция оптимизации записи по умолчанию отключена. В пуле Spark 3.3 он включен по умолчанию для секционированных таблиц.

После настройки параметров для пула или сеанса эта функция применяется для всех шаблонов записи Spark.

Для использования функции оптимизации записи включите ее с помощью следующего параметра:

  1. Scala и PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
  1. SQL Spark
SET `spark.microsoft.delta.optimizeWrite.enabled` = true

Чтобы проверить текущее значение параметра, используйте следующую команду:

  1. Scala и PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
  1. SQL Spark
SET `spark.microsoft.delta.optimizeWrite.enabled`

Чтобы отключить функцию оптимизации записи, измените следующий параметр, как показано ниже:

  1. Scala и PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
  1. SQL Spark
SET `spark.microsoft.delta.optimizeWrite.enabled` = false

Управление оптимизацией записи с помощью свойств таблицы

Новые таблицы

  1. SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

Использование API DeltaTableBuilder

val table = DeltaTable.create()
  .tableName("<table_name>")
  .addColumnn("<colName>", <dataType>)
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

Существующие таблицы

  1. SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

Использование API DeltaTableBuilder

val table = DeltaTable.replace()
  .tableName("<table_name>")
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

Как получить и изменить текущую конфигурацию максимального размера файла для оптимизации записи

Чтобы получить текущее значение параметра, используйте команды ниже. Значение по умолчанию — 128 МБ.

  1. Scala и PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
  • Изменение значения параметра
  1. Scala и PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728

Следующие шаги