Procedimientos recomendados: Delta Lake

En este artículo se describen los procedimientos recomendados al usar Delta Lake.

Databricks recomienda usar la optimización predictiva. Consulte Optimización predictiva para Delta Lake.

Al eliminar y volver a crear una tabla en la misma ubicación, siempre debe usar una instrucción CREATE OR REPLACE TABLE. Consulte Quitar o reemplazar una tabla Delta.

Uso de agrupaciones de clústeres líquidos para omitir datos optimizados

Databricks recomienda usar la agrupación de clústeres líquidos en lugar de la partición, el orden en Z u otras estrategias de organización de datos para optimizar el diseño de los datos para su omisión. Consulte Uso de clústeres líquidos para tablas Delta.

Archivos compactos

La optimización predictiva ejecuta automáticamente los comandos OPTIMIZE y VACUUM en las tablas administradas por Unity Catalog. Consulte Optimización predictiva para Delta Lake.

Databricks recomienda ejecutar con frecuencia el comando OPTIMIZE para compactar los archivos pequeños.

Nota:

Esta operación no quita los archivos antiguos. Para quitarlos, ejecute el comando VACUUM.

Sustitución del contenido o del esquema de una tabla

A veces puede que desee reemplazar una tabla Delta. Por ejemplo:

  • Puede que detecte que los datos de la tabla son incorrectos y quiera reemplazar el contenido.
  • Puede que quiera volver a escribir toda la tabla para realizar cambios de esquema incompatibles (cambiar tipos de columna).

Aunque puede eliminar todo el directorio de una tabla Delta y crear una tabla en la misma ruta de acceso, no se recomienda por los siguientes motivos:

  • Eliminar un directorio no es productivo. Un directorio que contiene archivos muy grandes puede tardar horas o incluso días en eliminarse.
  • Además, se pierde todo el contenido de los archivos eliminados; es difícil recuperarlo si elimina la tabla incorrecta.
  • La eliminación del directorio no es atómica. Mientras elimina la tabla, una consulta simultánea que la está leyendo puede producir un error o ver una tabla parcial.

Si no necesita cambiar el esquema de tabla, puede eliminar datos de una tabla Delta e insertar los nuevos datos, o actualizar la tabla para corregir los valores incorrectos.

Si quiere cambiar el esquema de tabla, puede reemplazar toda la tabla de forma atómica. Por ejemplo:

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Este enfoque tiene varias ventajas:

  • Sobrescribir una tabla es mucho más rápido porque no es necesario enumerar el directorio de forma recursiva ni eliminar archivos.
  • La versión anterior de la tabla se conserva. Si elimina la tabla incorrecta, puede recuperar fácilmente los datos antiguos mediante viaje en el tiempo. Consulte Trabajar con el historial de tablas de Delta Lake.
  • Es una operación atómica. Las consultas simultáneas todavía pueden leer la tabla mientras se está eliminando.
  • Debido a las garantías de transacción ACID de Delta Lake, si se produce un error en la sobrescritura de la tabla, la tabla estará en su estado anterior.

Además, si quiere eliminar archivos antiguos para ahorrar costos de almacenamiento después de sobrescribir la tabla, puede usar VACUUM para eliminarlos. Está optimizado para la eliminación de archivos y normalmente es más rápido que eliminar todo el directorio.

Almacenamiento en caché de Spark

Databricks no recomienda usar el almacenamiento en caché de Spark por los siguientes motivos:

  • Se perderá cualquier omisión de datos que pueda proceder de filtros adicionales agregados sobre el elemento DataFrame almacenado en caché.
  • Es posible que los datos que se almacenan en caché no se actualicen si se tiene acceso a la tabla mediante un identificador diferente.

Diferencias entre Delta Lake y Parquet en Apache Spark

Delta Lake controla las siguientes operaciones automáticamente. Nunca debe realizar estas operaciones manualmente:

  • REFRESH TABLE: las tablas Delta siempre devuelven la información más actualizada, por lo que no es necesario llamar a REFRESH TABLE manualmente después de los cambios.
  • Agregar y quitar particiones: Delta Lake hace un seguimiento automático del conjunto de particiones presente en una tabla y actualiza la lista a medida que se agregan o quitan datos. Como resultado, no es necesario ejecutar ALTER TABLE [ADD|DROP] PARTITION ni MSCK.
  • Cargar una sola partición: no es necesario leer particiones directamente. Por ejemplo, no es necesario ejecutar spark.read.format("parquet").load("/data/date=2017-01-01"). En su lugar, use una cláusula WHERE para omitir datos, como spark.read.table("<table-name>").where("date = '2017-01-01'").
  • No modifique archivos de datos manualmente: Delta Lake usa el registro de transacciones para confirmar de manera atómica los cambios en la tabla. No modifique, agregue ni elimine directamente archivos de datos de Parquet en una tabla Delta, ya que esto puede provocar daños en las tablas o datos perdidos.

Mejora del rendimiento de la combinación de Delta Lake

Puede reducir el tiempo que necesita la combinación mediante los enfoques siguientes:

  • Reducir el espacio de búsqueda de coincidencias: de forma predeterminada, la operación merge busca coincidencias en toda la tabla Delta en la tabla de origen. Una manera de acelerar merge es reducir el espacio de búsqueda agregando restricciones conocidas en la condición de coincidencia. Por ejemplo, suponga que tiene una tabla cuyas particiones crean country y date, y que quiere usar merge para actualizar la información del último día y un país específico. Agregar la siguiente condición hace que la consulta sea más rápida, ya que busca coincidencias solo en las particiones pertinentes:

    events.date = current_date() AND events.country = 'USA'
    

    Además, también reducirá las posibilidades de conflictos con otras operaciones simultáneas. Consulte Niveles de aislamiento y conflictos de escritura en Azure Databricks para más información.

  • Compactar archivos: si los datos se almacenan en muchos archivos pequeños, la lectura de los datos para buscar coincidencias puede resultar lenta. Puede compactar archivos pequeños en archivos más grandes para mejorar el rendimiento de lectura. Consulte Compactar archivos de datos con optimización en Delta Lake para más información.

  • Controlar las particiones aleatorias para escrituras: la operación merge selecciona aleatoriamente los datos varias veces para calcular y escribir los datos actualizados. El número de tareas que se usan pata seleccionar automáticamente se controla mediante la configuración spark.sql.shuffle.partitions de la sesión de Spark. Si se establece este parámetro, no solo se controla el paralelismo, sino que también se determina el número de archivos de salida. Al aumentar el valor aumenta el paralelismo, pero también se genera un mayor número de archivos de datos pequeños.

  • Habilitar escrituras optimizadas: en el caso de las tablas con particiones, merge puede generar un número mucho mayor de archivos pequeños que el número de particiones aleatorias. Esto se debe a que cada tarea aleatoria puede escribir varios archivos en varias particiones y puede convertirse en un cuello de botella de rendimiento. Puede reducir el número de archivos habilitando las escrituras optimizadas. Consulte Escrituras optimizadas para Delta Lake en Azure Databricks.

  • Ajustar los tamaños de archivo de la tabla: Azure Databricks puede detectar automáticamente si una tabla Delta tiene operaciones frecuentes de merge que reescriben archivos y pueden optar por reducir el tamaño de los archivos reescritos en anticipación de las reescrituras de archivos en el futuro. Para más información, consulte la sección sobre el ajustes de los tamaños de los archivos.

  • Combinación de orden aleatorio bajo: Combinación de orden aleatorio bajo proporciona una implementación optimizada de MERGE que proporciona un mejor rendimiento para las cargas de trabajo más comunes. Además, conserva las optimizaciones de diseño de datos existentes, como la ordenación Z en datos sin modificar.

Administración de la novedad de los datos

Al principio de cada consulta, las tablas de Delta se actualizan automáticamente a la versión más reciente de la tabla. Este proceso se puede observar en cuadernos cuando el estado del comando informa: Updating the Delta table's state. Sin embargo, al ejecutar análisis históricos en una tabla, es posible que no tenga que tener necesariamente datos totalmente actualizados, en especial para las tablas en las que los datos de streaming se ingieren con frecuencia. En estos casos, las consultas se pueden ejecutar en instantáneas obsoletas de la tabla de Delta. Este enfoque puede reducir la latencia en la obtención de resultados de las consultas.

Puede configurar la tolerancia para los datos obsoletos estableciendo la configuración de sesión de Spark spark.databricks.delta.stalenessLimit con un valor de cadena de tiempo, como 1h o 15m (durante 1 hora o 15 minutos, respectivamente). Esta configuración es específica de la sesión y no afecta a otros clientes que accedan a la tabla. Si el estado de la tabla se ha actualizado dentro del límite de obsolescencia, una consulta en la tabla devuelve resultados sin esperar a la actualización de la tabla más reciente. Esta configuración nunca impide que la tabla se actualice y, cuando se devuelven datos obsoletos, la actualización se procesa en segundo plano. Si la última actualización de la tabla es anterior al límite de obsolescencia, la consulta no devolverá resultados hasta que se complete la actualización del estado de la tabla.

Puntos de control mejorados para consultas de baja latencia

Delta Lake escribe puntos de control como estado agregado de una tabla de Delta con una frecuencia optimizada. Estos puntos de control sirven como punto de partida para calcular el estado más reciente de la tabla. Sin puntos de control, Delta Lake tendría que leer una gran colección de archivos JSON (archivos "delta") que representan confirmaciones en el registro de transacciones para calcular el estado de una tabla. Además, las estadísticas a nivel de columna que Delta Lake usa para la omisión de datos se almacenan en el punto de control.

Importante

Los puntos de control de Delta Lake son diferentes de los puntos de control de Structured Streaming.

Las estadísticas de nivel de columna se almacenan como una estructura y un JSON (para la compatibilidad con versiones anteriores). El formato de estructura hace que Delta Lake lea mucho más rápido, ya que:

  • Delta Lake no realiza análisis JSON costoso para obtener estadísticas a nivel de columna.
  • Las funcionalidades de recorte de columnas de Parquet reducen significativamente las E/S necesarias para leer las estadísticas de una columna.

El formato de estructura hace posible una colección de optimizaciones que reducen la sobrecarga de las operaciones de lectura de Delta Lake de segundos a decenas de milisegundos, lo que reduce significativamente la latencia de las consultas cortas.

Administración de estadísticas a nivel de columna en puntos de control

Puede administrar cómo se escriben las estadísticas en puntos de control mediante las propiedades de tabla delta.checkpoint.writeStatsAsJson y delta.checkpoint.writeStatsAsStruct. Si ambas propiedades de tabla son false, Delta Lake no puede realizar la omisión de datos.

  • Las escrituras en lote escriben las estadísticas en formato JSON y estructura. delta.checkpoint.writeStatsAsJson es true.
  • delta.checkpoint.writeStatsAsStruct no está definido de manera predeterminada.
  • Los lectores usan la columna de estructura cuando está disponible y, en caso contrario, revierten al uso de la columna JSON.

Importante

Los puntos de control mejorados no interrumpen la compatibilidad con los lectores de código abierto de Delta Lake. Sin embargo, establecer delta.checkpoint.writeStatsAsJson en false puede tener consecuencias en los lectores de propiedad de Delta Lake. Póngase en contacto con sus proveedores para obtener más información sobre las consecuencias en el rendimiento.

Habilitación de puntos de control mejorados para consultas de Structured Streaming

Si las cargas de trabajo de Structured Streaming no tienen requisitos de latencia baja (latencias inferiores al minuto), puede habilitar puntos de control mejorados mediante la ejecución del comando SQL siguiente:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

También puede mejorar la latencia de escritura de los puntos de control si establece las siguientes propiedades de tabla:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Si la omisión de datos no es útil en su aplicación, puede establecer ambas propiedades en false. Entonces, no se recopilan ni escriben estadísticas. Databricks no recomienda esta configuración.