Transformación de datos con Delta Live Tables

En este artículo se describe cómo puede usar Delta Live Tables para declarar transformaciones en conjuntos de datos y especificar cómo se procesan los registros a través de la lógica de consulta. También contiene algunos ejemplos de patrones de transformación comunes que pueden ser útiles al compilar canalizaciones de Delta Live Tables.

Puede definir un conjunto de datos en cualquier consulta que devuelva un dataframe. Puede usar las operaciones integradas de Apache Spark, las UDF, la lógica personalizada y los modelos de MLflow como transformaciones en la canalización de Delta Live Tables. Una vez que los datos se han ingerido en la canalización de Delta Live Tables, puede definir nuevos conjuntos de datos en orígenes ascendentes para crear nuevas tablas de streaming, vistas materializadas y vistas.

Para obtener información sobre cómo realizar eficazmente el procesamiento con estado con tablas dinámicas diferenciales, consulte Optimizar el procesamiento con estado en Tablas dinámicas diferenciales con marcas de agua.

Cuándo usar vistas, vistas materializadas y tablas de streaming

Para garantizar que sus pipelines sean eficientes y mantenibles, elija el mejor tipo de conjunto de datos cuando implemente las consultas de sus pipelines.

Considere el uso de una vista cuando:

  • Tenga una consulta grande o compleja que quiera dividir en consultas más fáciles de administrar.
  • Quiera validar resultados intermedios mediante expectativas.
  • Quiera reducir los costos de almacenamiento y proceso y no necesite la materialización de los resultados de las consultas. Como las tablas se materializan, requieren recursos adicionales de cálculo y almacenamiento.

Considere la posibilidad de usar una vista materializada cuando:

  • Haya varias consultas de bajada que consuman la tabla. Dado que las vistas se calculan bajo demanda, la vista se recalcula cada vez que se consulta.
  • Otras canalizaciones, trabajos o consultas consumen la tabla. Dado que las vistas no se materializan, solo puede usarlas en la misma canalización.
  • Quiera ver los resultados de una consulta durante el desarrollo. Dado que las tablas se materializan y se pueden ver y consultar fuera de la canalización, el uso de tablas durante el desarrollo puede ayudar a validar la exactitud de los cálculos. Después de validar, convierta en vistas las consultas que no requieran materialización.

Considere la posibilidad de usar una tabla de streaming cuando:

  • Una consulta se define en un origen de datos que crece continua o incrementalmente.
  • Los resultados de la consulta deben calcularse incrementalmente.
  • Se desea un alto rendimiento y una latencia baja para la canalización.

Nota:

Las tablas de streaming siempre se definen en orígenes de streaming. También puede utilizar fuentes de streaming con APPLY CHANGES INTO para aplicar las actualizaciones de los feeds CDC. Consulte APPLY CHANGES API: simplificación de la captura de datos modificados en Delta Live Tables.

Combinación de tablas de streaming y vistas materializadas en una sola canalización

Las tablas de streaming heredan las garantías de procesamiento de Apache Spark Structured Streaming y están configuradas para procesar consultas de orígenes de datos de solo anexión, donde las nuevas filas siempre se insertan en la tabla de origen en lugar de modificarse.

Nota:

Aunque, de forma predeterminada, las tablas de flujo de datos requieren orígenes de datos de solo anexión, cuando un origen de flujo es otra tabla de flujo de datos que requiere actualizaciones o eliminaciones, puede invalidar este comportamiento con la marca skipChangeCommits.

Un patrón de streaming común incluye la ingesta de datos de origen para crear los conjuntos de datos iniciales en una canalización. Estos conjuntos de datos iniciales se denominan comúnmente tablas de bronce, y suelen realizar transformaciones simples.

Por el contrario, las tablas finales de una canalización, comúnmente denominadas tablas de oro, a menudo requieren complicadas agregaciones o se leen desde fuentes que son los objetivos de una operación APPLY CHANGES INTO. Dado que estas operaciones crean actualizaciones inherentemente en lugar de anexadas, no se admiten como entradas para transmitir tablas en directo. Estas transformaciones son más adecuadas para vistas materializadas.

Al mezclar tablas de flujo y vistas materializadas en una única canalización, puede simplificar su canalización, evitar la costosa reintroducción o reprocesamiento de datos sin procesar y disponer de toda la potencia de SQL para calcular agregaciones complejas sobre un conjunto de datos codificados y filtrados de forma eficaz. En el ejemplo siguiente se muestra este tipo de procesamiento mixto:

Nota:

En estos ejemplos se usa el cargador automático para cargar archivos desde el almacenamiento en la nube. Para cargar archivos con el cargador automático en una canalización habilitada para el catálogo de Unity, debe usar ubicaciones externas. Para obtener más información sobre el uso del catálogo de Unity con Delta Live Tables, consulte Uso del catálogo de Unity con las canalizaciones de Delta Live Tables.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Más información sobre el uso de Auto Loader para leer archivos JSON de forma eficiente desde Azure Storage para el procesamiento incremental.

Combinaciones estáticas de secuencias

Las combinaciones de flujo por lotes son una buena opción cuando se desnormaliza un flujo continuo de datos de tipo apéndice con una tabla de dimensiones principalmente estática.

Con cada actualización de canalización, los nuevos registros de la secuencia se unen con la instantánea más actual de la tabla estática. Si los registros se agregan o actualizan en la tabla estática una vez procesados los datos correspondientes de la tabla de streaming, los registros resultantes no se vuelven a calcular a menos que se realice una actualización completa.

En las canalizaciones configuradas para la ejecución desencadenada, la tabla estática devuelve resultados a partir de la hora en que se inició la actualización. En las canalizaciones configuradas para la ejecución continua, cada vez que la tabla procesa una actualización, se consulta la versión más reciente de la tabla estática.

A continuación se muestra un ejemplo de una combinación de secuencia estática:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Calcule los agregados de forma eficaz

Puede utilizar tablas de flujo para calcular incrementalmente agregados distributivos simples como recuento, mín., máx. o suma, y agregados algebraicos como media o desviación estándar. Databricks recomienda la agregación incremental en el caso de consultas con un número limitado de grupos, por ejemplo, una consulta con una cláusula GROUP BY country. Solo se leen nuevos datos de entrada con cada actualización.

Para más información sobre cómo escribir consultas de Delta Live Tables que realizan agregaciones incrementales, vea Agregaciones en ventanas con marcas de agua.

Usar modelos de MLFlow en una canalización de Delta Live Tables

Nota:

Para usar modelos MLflow en una canalización habilitada para catálogo de Unity, la canalización debe configurarse para usar el canal preview. Para usar el canal current, debe configurar la canalización para publicarla en el metastore de Hive.

Puede usar modelos entrenados por MLflow en canalizaciones de Delta Live Tables. Los modelos de MLflow se tratan como transformaciones en Azure Databricks, lo que significa que actúan sobre una entrada de DataFrame de Spark y devuelven resultados como dataFrame de Spark. Dado que Delta Live Tables define conjuntos de datos en DataFrames, puede convertir cargas de trabajo de Apache Spark que aprovechan MLflow en Delta Live Tables con solo unas pocas líneas de código. Para obtener más información sobre MLflow, consulte Administración del ciclo de vida de Machine Learning mediante MLflow.

Si ya tiene un cuaderno de Python que llama a un modelo de MLflow, puede adaptar este código a Delta Live Tables mediante el @dlt.table decorador y asegurarse de que las funciones se definen para devolver resultados de transformación. Delta Live Tables no instala MLflow de forma predeterminada, por lo que asegúrese de importarlo %pip install mlflow e importarlo mlflow en dlt la parte superior del cuaderno. Para obtener una introducción a la sintaxis de Delta Live Tables, consulte Ejemplo: ingesta y procesamiento de los datos de nombres de bebés de Nueva York.

Para usar modelos de MLflow en Delta Live Tables, complete los pasos siguientes:

  1. Obtenga el id. de ejecución y el nombre de modelo del modelo de MLFlow. El id. de ejecución y el nombre de modelo se usan para construir el URI del modelo de MLFlow.
  2. Use el URI para definir una UDF de Spark para cargar el modelo de MLFlow.
  3. Llame a la UDF en las definiciones de tabla para usar el modelo de MLFlow.

En el ejemplo siguiente se muestra la sintaxis básica de este patrón:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

En el ejemplo siguiente se define una UDF de Spark denominada loaded_model_udf, que carga un modelo de MLFlow entrenado con datos de riesgo de préstamos. Las columnas de datos usadas para realizar la predicción se pasan como argumento a la UDF. La tabla loan_risk_predictions calcula las predicciones de cada fila de loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Conservación de las eliminaciones o actualizaciones manuales

Delta Live Tables permite eliminar o actualizar manualmente los registros de una tabla y realizar una operación de actualización para volver a calcular las tablas de bajada.

De forma predeterminada, Delta Live Tables vuelve a calcular los resultados de la tabla en función de los datos de entrada cada vez que se actualiza una canalización, por lo que debe asegurarse de que el registro eliminado no se vuelve a cargar desde los datos de origen. Establecer la pipelines.reset.allowedpropiedad de la tablafalse a impide las actualizaciones de una tabla, pero no impide las escrituras incrementales en las tablas ni impide que fluyan nuevos datos a la tabla.

En el diagrama siguiente se muestra un ejemplo con dos tablas de streaming:

  • raw_user_table ingiere un conjunto de datos de usuario, sin procesar, de un origen.
  • bmi_table calcula incrementalmente los niveles de IMC, mediante el peso y la altura de raw_user_table.

Desea eliminar o actualizar manualmente los registros de usuario de raw_user_table y volver a calcular el bmi_table.

Diagrama de retención de datos

El código siguiente demuestra la configuración de la propiedad de tabla de pipelines.reset.allowed a false para desactivar la actualización completa para raw_user_table de manera que los cambios previstos se conserven en el tiempo, pero las tablas descendentes se vuelvan a calcular cuando se ejecuta una actualización de canalización:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);