Уровни изоляции и конфликты записи в Azure Databricks

Уровень изоляции таблицы определяет степень, до которой транзакция должна быть изолирована от изменений, выполняемых параллельными операциями. Конфликты записи в Azure Databricks зависят от уровня изоляции.

Delta Lake предоставляет гарантии транзакций ACID для операций чтения и записи. Это означает следующее.

  • Несколько операций записи в нескольких кластерах могут одновременно изменять секцию таблицы. Записи видят согласованное представление моментального снимка таблицы и записи выполняются в последовательном порядке.
    • Читатели продолжают видеть согласованное представление моментального снимка таблицы, с которого началось задание Azure Databricks, даже если таблица была изменена во время выполнения задания.

См. сведения о гарантиях ACID в Azure Databricks?.

Примечание.

Azure Databricks использует Delta Lake для всех таблиц по умолчанию. В этой статье описывается поведение Delta Lake в Azure Databricks.

Внимание

Изменения метаданных вызывают сбой всех одновременных операций записи. Эти операции включают изменения в протокол таблицы, свойства таблицы или схему данных.

Потоковые операции чтения завершаются сбоем при обнаружении фиксации, которая изменяет метаданные таблицы. Чтобы сохранить поток, необходимо перезапустить его. Рекомендуемые методы см . в разделе "Рекомендации по рабочей среде" для структурированной потоковой передачи.

Ниже приведены примеры запросов, которые изменяют метаданные:

-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);

-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;

-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));

-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);

Конфликты записи с параллелизмом на уровне строк

Параллелизм на уровне строк уменьшает конфликты между параллельными операциями записи, обнаруживая изменения на уровне строки и автоматически разрешая конфликты, возникающие при одновременном обновлении или удалении разных строк в одном файле данных.

Параллелизм на уровне строк обычно доступен в Databricks Runtime 14.2 и выше. Параллелизм на уровне строк по умолчанию поддерживается для следующих условий:

  • Таблицы с включенными векторами удаления и без секционирования.
  • Таблицы с ликвидной кластеризация, если вы не отключили векторы удаления.

Таблицы с секциями не поддерживают параллелизм на уровне строк, но по-прежнему могут избежать конфликтов между OPTIMIZE всеми другими операциями записи при включении векторов удаления. См . ограничения параллелизма на уровне строк.

Сведения о других версиях среды выполнения Databricks см. в разделе "Поведение предварительной версии параллелизма на уровне строк" (устаревшая версия).

В следующей таблице описывается, какие пары операций записи могут конфликтовывать на каждом уровне изоляции с включенным параллелизмом на уровне строк.

Примечание.

Таблицы с столбцами удостоверений не поддерживают одновременные транзакции. См. статью "Использование столбцов удостоверений" в Delta Lake.

INSERT (1) UPDATE, DELETE, MERGE INTO OPTIMIZE
INSERT Не может конфликтовать
UPDATE, DELETE, MERGE INTO Не удается конфликтуть в WriteSerializable. Может конфликтуть в Сериализуемом режиме при изменении одной строки; См . ограничения параллелизма на уровне строк. MERGE INTO требуется Photon для разрешения конфликтов на уровне строк. Может возникать конфликт при изменении одной строки; См . ограничения параллелизма на уровне строк.
OPTIMIZE; Не может конфликтовать Не может конфликтовать Не может конфликтовать

Внимание

(1) Все INSERT операции в таблицах выше описывают операции добавления, которые не считывают данные из одной таблицы перед фиксацией. INSERT операции, содержащие вложенные запросы, считывающие ту же таблицу, поддерживают ту же параллельность, что MERGEи .

Запись конфликтов без параллелизма на уровне строк

В следующей таблице описывается, какие пары операций записи могут конфликтовать на каждом уровне изоляции.

Таблицы не поддерживают параллелизм на уровне строк, если в них определены секции или не включены векторы удаления. Databricks Runtime 14.2 или более поздней версии требуется для параллелизма на уровне строк.

Примечание.

Таблицы с столбцами удостоверений не поддерживают одновременные транзакции. См. статью "Использование столбцов удостоверений" в Delta Lake.

INSERT (1) UPDATE, DELETE, MERGE INTO OPTIMIZE
INSERT Не может конфликтовать
UPDATE, DELETE, MERGE INTO Не удается конфликтуть в WriteSerializable. Может конфликтуться в сериализуемом режиме; см . статью "Избегайте конфликтов с секциями". Может конфликтовывать в сериализуемой и writeSerializable; см . статью "Избегайте конфликтов с секциями".
OPTIMIZE; Не может конфликтовать Не удается конфликтуть с таблицами с включенными векторами удаления. Может конфликтуть в противном случае. Не удается конфликтуть с таблицами с включенными векторами удаления. Может конфликтуть в противном случае.

Внимание

(1) Все INSERT операции в таблицах выше описывают операции добавления, которые не считывают данные из одной таблицы перед фиксацией. INSERT операции, содержащие вложенные запросы, считывающие ту же таблицу, поддерживают ту же параллельность, что MERGEи .

Ограничения параллелизма на уровне строк

Некоторые ограничения применяются к параллелизму на уровне строк. Для следующих операций разрешение конфликтов следует обычному параллелизму для конфликтов записи в Azure Databricks. См. конфликты записи без параллелизма на уровне строк.

  • OPTIMIZE команды с ZORDER BY.
  • Команды со сложными условными предложениями, включая следующие:
    • Условия для сложных типов данных, таких как структуры, массивы или карты.
    • Условия, использующие недетерминированные выражения и вложенные запросы.
    • Условия, содержащие коррелированные вложенные запросы.
  • Для MERGE команд необходимо использовать явный предикат в целевой таблице для фильтрации строк, соответствующих исходной таблице. Для разрешения слиянием фильтр используется только для сканирования строк, которые могут конфликтовать на основе условий фильтра в параллельных операциях.

Примечание.

Обнаружение конфликтов на уровне строк может увеличить общее время выполнения. В случае многих параллельных транзакций модуль записи определяет задержку по разрешению конфликтов и конфликтам.

Все ограничения для векторов удаления также применяются. См . ограничения.

Когда Delta Lake фиксирует без чтения таблицы?

Операции Delta Lake INSERT или добавления не считывают состояние таблицы перед фиксацией, если выполнены следующие условия:

  1. Логика выражается с помощью INSERT логики SQL или режима добавления.
  2. Логика не содержит вложенных запросов или условных условий, ссылающихся на таблицу, предназначенную операцией записи.

Как и в других фиксациях, Delta Lake проверяет и разрешает версии таблиц при фиксации с помощью метаданных в журнале транзакций, но версия таблицы на самом деле не считывается.

Примечание.

Многие распространенные шаблоны используют MERGE операции для вставки данных на основе условий таблицы. Хотя эту логику можно переписать с помощью INSERT инструкций, если любое условное выражение ссылается на столбец в целевой таблице, эти инструкции имеют те же ограничения параллелизма, что MERGEи .

Запись сериализуемых и сериализуемых уровней изоляции

Уровень изоляции таблицы определяет степень, до которой транзакция должна быть изолирована от изменений, выполняемых параллельными транзакциями. Delta Lake на Azure Databricks поддерживает два уровня изоляции: Serializable и WriteSerializable.

  • Serializable: наиболее надежный уровень изоляции. Это гарантирует, что зафиксированные операции записи и все операции чтения будут иметь уровень Serializable. Операции разрешены до тех пор, пока существует серийная последовательность их поочередного выполнения, которая создает тот же результат, который отображается в таблице. Для операций записи серийная последовательность идентична показанной в журнале таблицы.

  • WriteSerializable (по умолчанию): более слабый уровень изоляции, чем Serializable. Он обеспечивает сериализуемость только операций записи (но не операций чтения). Однако он все-таки более надежен, чем изоляция уровня Snapshot. Уровень изоляции WriteSerializable используется по умолчанию, поскольку он обеспечивает оптимальный баланс согласованности и доступности данных для наиболее распространенных операций.

    В этом режиме содержимое таблицы Delta может отличаться от ожидаемой последовательности операций, наблюдаемых в журнале таблиц. Это обусловлено тем, что данный режим позволяет выполнять определенные пары одновременных операций записи (скажем, операций X и Y), чтобы результат был таким же, как если бы операция Y выполнялась до операции X (то есть, сериализовалась между ними), даже несмотря на то, что в журнале отображалось бы, что операция Y зафиксирована после операции X. Чтобы запретить такое изменение порядка, задайте для уровня изоляции таблицы значение Serializable, чтобы эти транзакции завершались ошибкой.

Операции чтения всегда используют изоляцию Snapshot. Уровень изоляции операции записи определяет, может ли читатель просматривать моментальный снимок таблицы, который «никогда не существовал» по данным журнала.

Что касается уровня Serializable, читатель всегда видит только те таблицы, которые соответствуют данным журнала. Для уровня WriteSerializable читатель видел бы таблицу, которая не существует в журнале изменений.

Например, рассмотрим txn1, длительно выполняемая операция удаления и txn2, которое вставляет данные, удаленные операцией txn1. txn2 и txn1 завершены, и они записываются в журнал в таком порядке. Согласно журналу данные, вставленные в txn2, не должны существовать в таблице. Что касается уровня Serializable, читатель никогда не будет видеть данные, вставленные операцией txn2. Однако, на уровне WriteSerializable читатель может в определенный момент просмотреть данные, вставленные операцией txn2.

Дополнительные сведения о том, какие типы операций могут конфликтовать друг с другом на каждом уровне изоляции и возможных ошибках, см. в разделе "Избегайте конфликтов" с помощью условий секционирования и разрознения команд.

Выбор уровня изоляции

Уровень изоляции задается с помощью команды ALTER TABLE.

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)

где <level-name> обозначает Serializable или WriteSerializable.

Например, чтобы изменить уровень изоляции со стандартного WriteSerializable на Serializable, выполните:

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

Избегайте конфликтов с помощью условий секционирования и разбиения команд

Во всех случаях, отмеченных как "может конфликтовать", возникновение конфликта двух операций зависит от того, работают ли они с одним и тем же набором файлов. Можно разделить эти два набора файлов путем секционирования таблицы на те же столбцы, которые используются в условиях операций. Например, команды UPDATE table WHERE date > '2010-01-01' ... и DELETE table WHERE date < '2010-01-01' будут конфликтовать, если таблица не секционирована по дате, так как обе команды могут попытаться изменить один и тот же набор файлов. Секционирование таблицы по date позволит избежать конфликта. Поэтому секционирование таблицы в соответствии с условиями, которые часто используются в команде, может значительно сократить конфликты. Однако секционирование таблицы по столбцу с высокой карта inality может привести к другим проблемам производительности из-за большого количества подкаталогов.

Исключения конфликтов

При возникновении конфликта транзакции отобразится одно из следующих исключений:

ConcurrentAppendException

Это исключение возникает, когда параллельная операция добавляет файлы в ту же секцию (или в любое место несекционированной таблицы), которую считывает операция. Добавление файлов может быть вызвано операциями INSERT, DELETE, UPDATE или MERGE.

При использовании уровня изоляции по умолчанию для WriteSerializable файлы, добавляемые операциями blindINSERT (т. е. операциями, которые вслепую добавляют данные без считывания данных), не конфликтуют ни с одной из операций, даже если они затрагивают одну и ту же секцию (или любое место в несекционированной таблице). Если уровень изоляции имеет значение Serializable, то при добавлении вслепую могут возникать конфликты.

Это исключение часто возникает во время выполнения параллельных операций DELETE, UPDATE или MERGE. Хотя параллельные операции могут физически обновлять разные каталоги секций, одна из них может считывать секцию, которую в этот момент обновляет другая операция, что приводит к конфликту. Этого можно избежать, явно указав разделение в условии операции. Рассмотрим следующий пример.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Предположим, приведенный выше код запущен параллельно для различных дат или стран. Поскольку каждое задание работает в отдельной секции целевой разностной таблицы, конфликтов не должно быть. Однако условие не является достаточно явным и позволяет сканировать всю таблицу, что может вызывать конфликты с параллельными операциями обновления любых других секций. Вместо этого можно изменить инструкцию, добавив в условие объединения определенную дату и страну, как показано в следующем примере.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Теперь эту операцию можно безопасно запускать в параллельном режиме в разные даты и в разных странах.

ConcurrentDeleteReadException

Это исключение возникает, когда параллельная операция удаляет файл, который читает ваша операция. Распространенные причины: операция DELETE, UPDATE или MERGE, которая повторно записывает файлы.

ConcurrentDeleteDeleteException

Это исключение возникает, когда параллельная операция удаляет файл, который также удаляет ваша операция. Это может быть вызвано двумя одновременно выполняемыми операциями сжатия, которые выполняют повторную запись одних и тех же файлов.

MetadataChangedException

Это исключение возникает, когда параллельная транзакция обновляет метаданные разностной таблицы. Распространенные причины — операции ALTER TABLE или операции записи в разностную таблицу, которые обновляют схему таблицы.

ConcurrentTransactionException

Если потоковый запрос, использующий одно и то же расположение контрольной точки, одновременно запускается несколько раз и пытается одновременно выполнить запись в разностную таблицу. Никогда не допускайте, чтобы два потоковых запроса одновременно использовали одно и то же расположение контрольной точки и выполнялись одновременно.

ProtocolChangedException

Это исключение может возникать в следующих случаях:

  • При обновлении таблицы Delta до новой версии протокола. Для выполнения будущих операций может потребоваться обновить среду выполнения Databricks.
  • Когда несколько операций записи одновременно создают или заменяют таблицу.
  • Когда несколько операций записи одновременно выполняют запись по пути в свободном месте накопителя.

Дополнительные сведения см. в статье о том, как Azure Databricks управляет совместимостью функций Delta Lake?

Поведение предварительной версии параллелизма на уровне строк (устаревшая версия)

В этом разделе описано поведение предварительной версии для параллелизма на уровне строк в Databricks Runtime 14.1 и ниже. Параллелизм на уровне строк всегда требует векторов удаления.

В Databricks Runtime 13.3 LTS и более поздних версиях таблицы с поддержкой liquid кластеризация автоматически включают параллелизм на уровне строк.

В Databricks Runtime 14.0 и 14.1 можно включить параллелизм на уровне строк для таблиц с векторами удаления, задав следующую конфигурацию для кластера или SparkSession:

spark.databricks.delta.rowLevelConcurrencyPreview = true

В Databricks Runtime 14.1 и ниже вычисления, отличные от Photon, поддерживают только параллелизм на уровне строк для DELETE операций.