Niveles de aislamiento y conflictos de escritura en Azure Databricks

El nivel de aislamiento de una tabla define el grado en el que debe aislarse una transacción de las modificaciones que realicen las operaciones simultáneas. Los conflictos de escritura en Azure Databricks dependen del nivel de aislamiento.

Delta Lake proporciona garantías de transacción ACID entre lecturas y escrituras. Esto significa que:

  • Varios escritores de varios clústeres pueden modificar simultáneamente una partición de tabla. Los escritores consultan una vista instantánea coherente de la tabla y las escrituras se producen en un orden de serie.
    • Los lectores siguen teniendo una vista de instantánea coherente de la tabla con la que se inició el trabajo de Azure Databricks, incluso cuando se modifica una tabla durante un trabajo.

Consulte ¿Cuáles son las garantías ACID en Azure Databricks?.

Nota

De manera predeterminada, Azure Databricks usa Delta Lake para todas las tablas. En este artículo se describe el comportamiento de Delta Lake en Azure Databricks.

Importante

Los cambios de metadatos hacen que se produzcan errores en todas las operaciones de escritura simultáneas. Estas operaciones incluyen cambios en el protocolo de tabla, las propiedades de tabla o el esquema de datos.

Se produce un error en las lecturas de transmisión cuando encuentran una confirmación que cambia los metadatos de la tabla. Si quiere que la transmisión continúe, debe reiniciarla. Para conocer los métodos más recomendables, consulte Consideraciones de producción para Structured Streaming.

A continuación se muestran ejemplos de consultas que cambian los metadatos:

-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);

-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;

-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));

-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);

Conflictos de escritura con simultaneidad de nivel de fila

La simultaneidad de nivel de fila reduce los conflictos entre las operaciones de escritura simultáneas mediante la detección de cambios en el nivel de fila y la resolución automática de los conflictos que se producen cuando operaciones de escritura simultáneas actualizan o eliminan filas diferentes en el mismo archivo de datos.

La simultaneidad de nivel de fila está disponible con carácter general en Databricks Runtime 14.2 y versiones posteriores. La simultaneidad de nivel de fila se admite de manera predeterminada para las condiciones siguientes:

  • Tablas con vectores de eliminación habilitados y sin particiones.
  • Tablas con agrupación en clústeres líquidos, a menos que haya deshabilitado los vectores de eliminación.

Las tablas con particiones no admiten la simultaneidad de nivel de fila, pero pueden evitar conflictos entre OPTIMIZE y todas las demás operaciones de escritura cuando se habilitan los vectores de eliminación. Consulte Limitaciones para la simultaneidad de nivel de fila.

Para obtener otras versiones de Databricks Runtime, vea Comportamiento de la versión preliminar de la simultaneidad de nivel de fila (heredado).

En la tabla siguiente se describen los pares de operaciones de escritura que pueden entrar en conflicto en cada nivel de aislamiento con simultaneidad de nivel de fila habilitada.

Nota:

Las tablas con columnas de identidad no admiten transacciones simultáneas. Consulte Uso de columnas de identidad en Delta Lake.

INSERT (1) UPDATE, DELETE y MERGE INTO OPTIMIZE
INSERT Sin riesgo de conflicto
UPDATE, DELETE y MERGE INTO No puede entrar en conflicto en WriteSerializable. Puede entrar en conflicto en Serializable al modificar la misma fila; vea Limitaciones para la simultaneidad de nivel de fila. MERGE INTO necesita Photon para la resolución de conflictos de nivel de fila. Puede entrar al modificar la misma fila; vea Limitaciones para la simultaneidad de nivel de fila.
OPTIMIZE Sin riesgo de conflicto Sin riesgo de conflicto Sin riesgo de conflicto

Importante

(1) Todas las operaciones de INSERT de las tablas anteriores describen las operaciones de adición que no leen datos de la misma tabla antes de confirmar. Las operaciones de INSERT que contienen subconsultas que leen la misma tabla admiten la misma simultaneidad que MERGE.

Conflictos de escritura sin simultaneidad de nivel de fila

En la tabla siguiente se describen los pares de operaciones de escritura que pueden estar en conflicto en cada nivel de aislamiento.

Las tablas no admiten la simultaneidad de nivel de fila si tienen particiones definidas o no tienen habilitados vectores de eliminación. Se necesita Databricks Runtime 14.2 o superior para la simultaneidad de nivel de fila.

Nota:

Las tablas con columnas de identidad no admiten transacciones simultáneas. Consulte Uso de columnas de identidad en Delta Lake.

INSERT (1) UPDATE, DELETE y MERGE INTO OPTIMIZE
INSERT Sin riesgo de conflicto
UPDATE, DELETE y MERGE INTO No puede entrar en conflicto en WriteSerializable. Puede entrar en conflicto en Serializable; vea Evitar conflictos con particiones. Puede entrar en conflicto en Serializable y WriteSerializable; vea Evitar conflictos con particiones.
OPTIMIZE Sin riesgo de conflicto No puede entrar en conflicto en tablas con vectores de eliminación habilitados. De lo contrario, puede entrar en conflicto. No puede entrar en conflicto en tablas con vectores de eliminación habilitados. De lo contrario, puede entrar en conflicto.

Importante

(1) Todas las operaciones de INSERT de las tablas anteriores describen las operaciones de adición que no leen datos de la misma tabla antes de confirmar. Las operaciones de INSERT que contienen subconsultas que leen la misma tabla admiten la misma simultaneidad que MERGE.

Limitaciones de la simultaneidad de nivel de fila

Algunas limitaciones se aplican a la simultaneidad de nivel de fila. Para las siguientes operaciones, la resolución de conflictos sigue la simultaneidad normal para los conflictos de escritura en Azure Databricks. Consulte Escritura de conflictos sin simultaneidad de nivel de fila.

  • Comandos OPTIMIZE con ZORDER BY.
  • Comandos con cláusulas condicionales complejas, incluidas las siguientes:
    • Condiciones en tipos de datos complejos, como estructuras, matrices o mapas.
    • Condiciones que usan expresiones y subconsultas no deterministas.
    • Condiciones que contienen subconsultas correlacionadas.
  • Para los comandos MERGE, debe usar un predicado explícito en la tabla de destino para filtrar las filas que coinciden con la tabla de origen. Para la resolución de combinación, el filtro solo se usa para examinar las filas que podrían entrar en conflicto en función de las condiciones de filtro en operaciones simultáneas.

Nota:

La detección de conflictos de nivel de fila puede aumentar el tiempo total de ejecución. En el caso de muchas transacciones simultáneas, el escritor da prioridad a la latencia sobre la resolución de conflictos y pueden producirse conflictos.

También se aplican todas las limitaciones de los vectores de eliminación. Consulte Limitaciones.

¿Cuándo se confirma Delta Lake sin leer la tabla?

Las operaciones de anexión o INSERT de Delta Lake no leen el estado de la tabla antes de confirmar si se cumplen las condiciones siguientes:

  1. La lógica se expresa mediante la lógica SQL o el modo de anexión de INSERT.
  2. La lógica no contiene subconsultas ni condicionales que hacen referencia a la tabla destino de la operación de escritura.

Como sucede en otras confirmaciones, Delta Lake valida y resuelve las versiones de la tabla en la confirmación mediante metadatos en el registro de transacciones, pero no se lee realmente ninguna versión de la tabla.

Nota

Muchos patrones comunes usan operaciones MERGE para insertar datos en función de las condiciones de la tabla. Aunque puede volver a escribir esta lógica mediante instrucciones INSERT, si alguna expresión condicional hace referencia a una columna de la tabla de destino, estas instrucciones tienen las mismas limitaciones de simultaneidad que MERGE.

Niveles de aislamiento Serializable y WriteSerializable

El nivel de aislamiento de una tabla define el grado en el que debe aislarse una transacción de las modificaciones que realicen las transacciones simultáneas. Delta Lake en Azure Databricks admite dos niveles de aislamiento: Serializable y WriteSerializable.

  • Serializable: el nivel de aislamiento más fuerte. Garantiza que las operaciones de escritura y todas las lecturas se pueden serializar. Las operaciones se permiten siempre que exista una secuencia en serie para ejecutarlas de una en una que genere el mismo resultado que el que se ve en la tabla. Para las operaciones de escritura, la secuencia en serie es exactamente la misma que la que se ve en el historial de la tabla.

  • WriteSerializable (valor predeterminado): un nivel de aislamiento más débil que el Serializable. Solo garantiza que las operaciones de escritura (es decir, no las lecturas) se pueden serializar. Aun así, sigue siendo más fuerte que el aislamiento de instantáneas. WriteSerializable es el nivel de aislamiento predeterminado porque proporciona un gran equilibrio de coherencia de datos y disponibilidad para las operaciones más comunes.

    En este modo, el contenido de la tabla Delta podría ser diferente del esperado de la secuencia de operaciones que se ve en el historial de tablas. Esto se debe a que este modo permite que determinados pares de operaciones de escritura simultáneas (por ejemplo, las operaciones X e Y) sigan adelante de forma que el resultado fuese como si Y se realizara antes que X (es decir, serializable entre ellas) aunque el historial mostrara que Y se había confirmado después de X. Para no permitir esta reordenación, establezca el nivel de aislamiento de tabla en Serializable para evitar que se puedan realizar estas transacciones.

Las operaciones de lectura siempre usan el aislamiento de instantáneas. El nivel de aislamiento de escritura determina si es posible que un lector vea o no una instantánea de una tabla que, según el historial, "nunca existió".

En el nivel Serializable, un lector siempre ve solo las tablas que se ajustan al historial. Para el nivel WriteSerializable, un lector podría ver una tabla que no existe en el registro Delta.

Por ejemplo, considere txn1, una eliminación de larga duración y txn2, que inserta los datos eliminados por txn1. Txn2 y txn1 se completan y se registran en ese orden en el historial. Según el historial, los datos insertados en txn2 no deben existir en la tabla. En el nivel serializable, un lector nunca vería los datos insertados por txn2, pero, en el nivel WriteSerializable, un lector podría ver en algún momento los datos insertados por txn2.

Para obtener más información sobre qué tipos de operaciones pueden entrar en conflicto entre sí en cada nivel de aislamiento y los posibles errores, vea Evitar los conflictos mediante el uso de particiones y condiciones de comandos separados.

Establecimiento del nivel de aislamiento

El nivel de aislamiento se establece mediante el comando ALTER TABLE:

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)

donde <level-name> es Serializable o WriteSerializable.

Por ejemplo, para cambiar el nivel de aislamiento del valor predeterminado WriteSerializable a Serializable, ejecute lo siguiente:

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

Evitar los conflictos mediante el uso de particiones y condiciones de comandos separados

En todos los casos marcados como "posible conflicto", el hecho de que las dos operaciones entren en conflicto depende de si funcionan en el mismo conjunto de archivos. Puede hacer que los dos conjuntos de archivos no se unan creando particiones de la tabla por las mismas columnas que las usadas en las condiciones de las operaciones. Por ejemplo, los dos comandos, UPDATE table WHERE date > '2010-01-01' ... y DELETE table WHERE date < '2010-01-01', estarán en conflicto si la tabla no tiene particiones por fecha, ya que ambos pueden intentar modificar el mismo conjunto de archivos. La creación de particiones de la tabla por date evitará el conflicto. Por lo tanto, la creación de particiones de una tabla según las condiciones que se usan habitualmente en el comando puede reducir significativamente los conflictos. Sin embargo, la creación de particiones de una tabla por una columna con una cardinalidad alta puede provocar otros problemas de rendimiento debido a la cantidad de subdirectorios.

Excepciones de conflicto

Cuando se produce un conflicto de transacciones, observará una de las siguientes excepciones:

ConcurrentAppendException

Esta excepción se produce cuando una operación simultánea agrega archivos en la misma partición (o en cualquier lugar de una tabla sin particiones) y la operación los lee. Las adiciones de archivos pueden deberse a operaciones INSERT, DELETE, UPDATE o MERGE.

Con el nivel de aislamiento predeterminado WriteSerializable, los archivos agregados por operaciones a ciegasINSERT (es decir, las que anexan datos a ciegas sin leer ningún dato) no entra en conflicto con ninguna operación, incluso si entran en contacto con la misma partición (o con cualquier parte de una tabla sin particiones). Si el nivel de aislamiento se establece en Serializable, los anexos a ciegas pueden estar en conflicto.

A menudo, esta excepción se produce durante las operaciones simultáneas DELETE, UPDATE o MERGE. Aunque las operaciones simultáneas puedan actualizar físicamente distintos directorios de partición, uno de ellos podría leer la misma partición que la otra actualiza simultáneamente, lo que provocaría un conflicto. Para evitarlo, haga que la separación sea explícita en la condición de operación. Considere el ejemplo siguiente.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Supongamos que ejecuta el código anterior simultáneamente para diferentes fechas o países. Dado que cada trabajo está trabajando en una partición independiente en la tabla de Delta de destino, no se espera ningún conflicto. Sin embargo, la condición no es lo suficientemente explícita y puede examinar toda la tabla, además de estar en conflicto con las operaciones simultáneas que actualizan cualquier otra partición. En su lugar, puede reescribir la instrucción para agregar una fecha y un país específicos a la condición de combinación, tal como se muestra en el ejemplo siguiente.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Ahora es seguro ejecutar esta operación simultáneamente en diferentes fechas y países.

ConcurrentDeleteReadException

Esta excepción se produce cuando una operación simultánea elimina un archivo que la operación lee. Las causas comunes son operaciones DELETE, UPDATE o MERGE que reescriben archivos.

ConcurrentDeleteDeleteException

Esta excepción se produce cuando una operación simultánea elimina un archivo que la operación también elimina. Esto puede deberse a que dos operaciones de compactación simultáneas reescriban los mismos archivos.

MetadataChangedException

Esta excepción se produce cuando una transacción simultánea actualiza los metadatos de una tabla de Delta. Las causas comunes son operaciones ALTER TABLE o escrituras en la tabla de Delta que actualizan el esquema de la tabla.

ConcurrentTransactionException

Si una consulta de streaming en la que se usa la misma ubicación de punto de control se inicia varias veces simultáneamente y esta intenta escribir en la tabla de Delta al mismo tiempo. Nunca debe hacer que dos consultas de streaming usen la misma ubicación de punto de control y se ejecuten al mismo tiempo.

ProtocolChangedException

Esta excepción puede producirse en los casos siguientes:

  • Cuando su tabla Delta se actualice a una nueva versión del protocolo. Para que las operaciones futuras tengan éxito puede que necesite actualizar su Databricks Runtime.
  • Cuando varios escritores crean o reemplazan una tabla al mismo tiempo.
  • Cuando varios escritores escriben en una ruta de acceso vacía al mismo tiempo.

Consulte ¿Cómo administra Azure Databricks la compatibilidad de características de Delta Lake? para obtener más información.

Comportamiento de la versión preliminar de la simultaneidad de nivel de fila (heredado)

En esta sección se describen los comportamientos de versión preliminar de la simultaneidad de nivel de fila en Databricks Runtime 14.1 y versiones posteriores. La simultaneidad de nivel de fila siempre necesita vectores de eliminación.

En Databricks Runtime 13.3 LTS y versiones posteriores, las tablas con la agrupación en clústeres líquidos habilitada habilitan automáticamente la simultaneidad de nivel de fila.

En Databricks Runtime 14.0 y 14.1, puede habilitar la simultaneidad de nivel de fila para las tablas con vectores de eliminación si establece la siguiente configuración para el clúster o SparkSession:

spark.databricks.delta.rowLevelConcurrencyPreview = true

En Databricks Runtime 14.1 y versiones posteriores, el proceso que no es Photon solo admite la simultaneidad de nivel de fila para las operaciones DELETE.