Compartir a través de


Carga y procesamiento de datos de forma incremental con flujos de Delta Live Tables

En este artículo se explica qué son los flujos y cómo puede usar flujos en canalizaciones de Delta Live Tables para procesar incrementalmente los datos de un origen a una tabla de streaming de destino. En Delta Live Tables, los flujos se definen de dos maneras:

  1. Un flujo se define automáticamente cuando se crea una consulta que actualiza una tabla de streaming.
  2. Delta Live Tables también proporciona funcionalidad para definir explícitamente flujos para un procesamiento más complejo, como anexar a una tabla de streaming desde varios orígenes de streaming.

En este artículo se describen los flujos implícitos que se crean al definir una consulta para actualizar una tabla de streaming y, a continuación, se proporcionan detalles sobre la sintaxis para definir flujos más complejos.

¿Qué es un flujo?

En Delta Live Tables, un flujo es una consulta de streaming que procesa los datos de origen de forma incremental para actualizar una tabla de streaming de destino. La mayoría de los conjuntos de datos de Delta Live Tables que se crean en una canalización definen el flujo como parte de la consulta y no requieren definir explícitamente el flujo. Por ejemplo, puede crear una tabla de streaming en Delta Live Tables en un único comando DDL en lugar de usar instrucciones de flujo y tabla independientes para crear la tabla de streaming:

Nota:

Este ejemplo de CREATE FLOW se proporciona solo con fines ilustrativos e incluye palabras clave que no son sintaxis válida de Delta Live Tables.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Además del flujo predeterminado definido por una consulta, las interfaces de Python y SQL de Delta Live Tables proporcionan funcionalidad de flujo de anexión. El flujo de anexión admite el procesamiento que requiere leer datos de varios orígenes de streaming para actualizar una sola tabla de streaming. Por ejemplo, puede usar la funcionalidad de flujo de anexión cuando tenga una tabla de streaming y un flujo existentes y quiera agregar un nuevo origen de streaming que escriba en esta tabla de streaming existente.

Usar un flujo de anexión para escribir en una tabla de streaming desde varias secuencias de origen

Nota:

Para usar el procesamiento de flujo de anexión, la canalización debe configurarse para usar el canal de vista previa.

Use el decorador @append_flow en la interfaz de Python o la cláusula CREATE FLOW de la interfaz SQL para escribir en una tabla de streaming desde varios orígenes de streaming. Use el flujo de anexión para procesar tareas como las siguientes:

  • Agregar orígenes de streaming que anexan datos a una tabla de streaming existente sin necesidad de una actualización completa. Por ejemplo, puede tener una tabla que combine datos regionales de cada región en la que esté trabajando. A medida que se implementan nuevas regiones, puede agregar los nuevos datos de región a la tabla sin realizar una actualización completa. Consulte Ejemplo: escritura en una tabla de streaming desde varios temas de Kafka.
  • Actualizar una tabla de streaming anexando datos históricos que faltan (reposición). Por ejemplo, tiene una tabla de streaming existente escrita en un tema de Apache Kafka. También tiene datos históricos almacenados en una tabla que necesita insertar exactamente una vez en la tabla de streaming y no puede transmitir los datos porque su procesamiento incluye realizar una agregación compleja antes de insertar los datos. Consulte Ejemplo: ejecutar un reposición de datos único.
  • Combine datos de varios orígenes y escriba en una sola tabla de streaming en lugar de usar la cláusula UNION en una consulta. El uso del procesamiento de flujo de anexión en lugar de UNION permite actualizar la tabla de destino de forma incremental sin ejecutar una actualización completa. Consulte Ejemplo: usar el procesamiento de flujo de anexión en lugar de UNION.

El destino de los registros que genera el procesamiento del flujo de anexión puede ser una tabla existente o una tabla nueva. Para las consultas de Python, use la función create_streaming_table() para crear una tabla de destino.

Importante

  • Si necesita definir restricciones de calidad de datos con expectativas, defina las expectativas en la tabla de destino como parte de la función create_streaming_table() o una definición de tabla existente. No se pueden definir expectativas en la definición de @append_flow.
  • Los flujos se identifican mediante un nombre de flujo y este nombre se usa para identificar los puntos de control de streaming. El uso del nombre del flujo para identificar el punto de control significa lo siguiente:
    • Si se cambia el nombre de un flujo existente en una canalización, el punto de control no se lleva a cabo y el flujo cuyo nombre ha cambiado es en realidad un flujo completamente nuevo.
    • No puede reutilizar un nombre de flujo en una canalización, ya que el punto de control existente no coincidirá con la nueva definición de flujo.

A continuación se muestra la sintaxis de @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Ejemplo: escritura en una tabla de streaming desde varios temas de Kafka

En el ejemplo siguiente se crea una tabla de streaming denominada kafka_target y se escribe en esa tabla de streaming desde dos temas de Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Para obtener más información sobre la función con valores de tabla read_kafka() usada en las consultas SQL, consulte read_kafka en la referencia del lenguaje SQL.

Ejemplo: ejecutar un reposición de datos único

En el ejemplo siguiente se ejecuta una consulta para anexar datos históricos a una tabla de streaming:

Nota:

Para asegurarse de un rellenado único verdadero cuando la consulta de reposición forma parte de una canalización que se ejecuta de forma programada o continua, quite la consulta después de ejecutar la canalización una vez. Para anexar nuevos datos si llega al directorio de reposición, deje la consulta en su lugar.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Ejemplo: usar el procesamiento de flujo de anexión en lugar de UNION

En lugar de usar una consulta con una cláusula UNION, puede usar consultas de flujo de anexión para combinar varios orígenes y escribir en una sola tabla de streaming. El uso de consultas de flujo de anexión en lugar de UNION permite anexar a una tabla de streaming desde varios orígenes sin ejecutar una actualización completa.

En el ejemplo de Python siguiente se incluye una consulta que combina varios orígenes de datos con una cláusula UNION:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

En los ejemplos siguientes se reemplaza la consulta de UNION por las consultas de flujo de anexión:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );