Ejecute su primera carga de trabajo de flujo estructurado

En este artículo, se proporcionan ejemplos de código y una explicación de los conceptos básicos necesarios para ejecutar las primeras consultas de flujo estructurado de Azure Databricks. Puede usar flujos estructurados para cargas de trabajo de procesamiento casi en tiempo real e incrementales.

Los flujos estructurados son una de las diversas tecnologías que potencian las tablas de streaming en Delta Live Tables. Databricks recomienda usar Delta Live Tables para todas las nuevas cargas de trabajo de ETL, ingesta y flujo estructurado. Consulte ¿Qué es Delta Live Tables?

Nota:

Aunque Delta Live Tables proporciona una sintaxis ligeramente modificada para declarar tablas de streaming, la sintaxis general para configurar las transformaciones y lecturas de streaming se aplica a todos los casos de uso de streaming de Azure Databricks. Delta Live Tables también simplifica el streaming mediante la administración de la información de estado, los metadatos y numerosas configuraciones.

Lectura desde un flujo de datos

Use el flujo estructurado para ingerir datos de forma incremental desde orígenes de datos admitidos. Algunos de los orígenes de datos más comunes que se usan en las cargas de trabajo de flujo estructurado de Azure Databricks incluyen lo siguiente:

  • Archivos de datos en el almacenamiento de objetos en la nube
  • Colas y buses de mensajes
  • Delta Lake

Databricks recomienda usar el cargador automático para la ingesta de streaming del almacenamiento de objetos en la nube. El cargador automático admite la mayoría de los formatos de archivo compatibles con el flujo estructurado. Consulte ¿Qué es Auto Loader?.

Cada origen de datos proporciona una serie de opciones para especificar cómo cargar lotes de datos. Durante la configuración del lector, es posible que tenga que establecer las opciones principales en las siguientes categorías:

  • Opciones que especifican el origen de datos o el formato (por ejemplo, el tipo de archivo, los delimitadores y el esquema).
  • Opciones que configuran el acceso a los sistemas de origen (por ejemplo, la configuración de puerto y las credenciales).
  • Opciones que especifican dónde iniciar una secuencia (por ejemplo, desplazamientos de Kafka o lectura de todos los archivos existentes).
  • Opciones que controlan la cantidad de datos que se procesan en cada lote (por ejemplo, desplazamientos máximos, archivos o bytes por lote).

Uso del cargador automático para leer datos de streaming desde el almacenamiento de objetos

En el ejemplo siguiente, se muestra cómo cargar datos JSON con el cargador automático, que usa cloudFiles para indicar el formato y las opciones. La opción schemaLocation habilita la inferencia y la evolución del esquema. Pegue el código siguiente en una celda del cuaderno de Databricks y ejecute la celda para crear un DataFrame de streaming denominado raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Al igual que otras operaciones de lectura de Azure Databricks, la configuración de una lectura de streaming no carga realmente los datos. Es necesario desencadenar una acción en los datos antes de que comience la transmisión.

Nota:

Llamar a display() en un DataFrame de streaming inicia un trabajo de streaming. Para la mayoría de los casos de uso de flujo estructurado, la acción que desencadena la transmisión debería escribir datos en un receptor. Consulte Preparación del código de flujo estructurado para producción.

Realización de una transformación de streaming

El flujo estructurado admite la mayoría de las transformaciones disponibles en Azure Databricks y Spark SQL. Incluso es posible cargar modelos de MLflow como UDF y realizar predicciones de streaming como transformación.

En el ejemplo de código siguiente se completa una transformación sencilla para enriquecer los datos JSON ingeridos con información adicional mediante funciones de Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

El transformed_df resultante contiene instrucciones de consulta para cargar y transformar cada registro a medida que llega al origen de datos.

Nota:

El flujo estructurado trata los orígenes de datos como conjuntos de datos ilimitados o infinitos. Por lo tanto, algunas transformaciones no se admiten en cargas de trabajo de flujo estructurado porque requerirían ordenar un número infinito de elementos.

La mayoría de las agregaciones y muchas combinaciones requieren la administración de información de estado con marcas de agua, ventanas y modo de salida. Consulte Aplicación de marcas de agua para controlar los umbrales de procesamiento de datos.

Escribir en un receptor de datos

Un receptor de datos es el destino de una operación de escritura de streaming. Entre los receptores más comunes que se usan en las cargas de trabajo de streaming de Azure Databricks se incluyen los siguientes:

  • Delta Lake
  • Colas y buses de mensajes
  • Bases de datos clave-valor

Al igual que con los orígenes de datos, la mayoría de los receptores de datos proporcionan una serie de opciones para controlar cómo se escriben los datos en el sistema de destino. Durante la configuración del escritor, es posible que tenga que establecer las opciones principales en las siguientes categorías:

  • Modo de salida (anexar de forma predeterminada).
  • Una ubicación de punto de control (necesaria para cada escritor).
  • Intervalo del desencadenador: consulte Configurar intervalos del desencadenador de flujo estructurado.
  • Opciones que especifican el receptor de datos o el formato (por ejemplo: el tipo de archivo, los delimitadores y el esquema).
  • Opciones que configuran el acceso a los sistemas de destino (por ejemplo: la configuración de puerto y las credenciales).

Realización de una escritura por lotes incremental en Delta Lake

En el ejemplo siguiente, se escribe en Delta Lake mediante un punto de control y una ruta de acceso de archivo especificados.

Importante

Asegúrese siempre de especificar una ubicación de punto de control única para cada escritor de streaming que configure. El punto de control proporciona la identidad única de la transmisión, realizando un seguimiento de todos los registros procesados y la información de estado asociada a la consulta de streaming.

La configuración de availableNow para el desencadenador indica al flujo estructurado que procese todos los registros no procesados previamente del conjunto de datos de origen y, a continuación, se apague, por lo que puede ejecutar de forma segura el código siguiente sin preocuparse por dejar una secuencia en ejecución:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

En este ejemplo, no llegan nuevos registros al origen de datos, por lo que la ejecución repetida de este código no ingiere nuevos registros.

Advertencia

La ejecución de flujo estructurado puede impedir que la terminación automática apague los recursos de proceso. Para evitar costes inesperados, asegúrese de finalizar las consultas de streaming.

Preparación del código de flujo estructurado para producción

Databricks recomienda usar Delta Live Tables para la mayoría de las cargas de trabajo de flujo estructurado. Las siguientes recomendaciones proporcionan un punto de partida para preparar cargas de trabajo de flujo estructurado para producción:

  • Quite el código innecesario de los cuadernos que pudieran devolver resultados, como display y count.
  • No ejecute cargas de trabajo de flujo estructurado en clústeres interactivos. Programe siempre secuencias como trabajos.
  • Para ayudar a que los trabajos de streaming se recuperen automáticamente, configure trabajos con reintentos infinitos.
  • No use el escalado automático para cargas de trabajo con flujo estructurado.

Para obtener más recomendaciones, consulte Consideraciones de producción para flujo estructurado.

Leer datos de Delta Lake, transformar y escribir en Delta Lake

Delta Lake tiene una amplia compatibilidad para trabajar con flujo estructurado como origen y receptor. Consulte Lecturas y escrituras en streaming de tablas delta.

En el ejemplo siguiente, se muestra una sintaxis de ejemplo para cargar incrementalmente todos los registros nuevos de una tabla Delta, combinarlos con una instantánea de otra tabla Delta y escribirlos en una tabla Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Es necesario tener los permisos adecuados configurados para leer las tablas de origen y escribir en las tablas de destino, así como la ubicación de punto de control especificada. Rellene todos los parámetros indicados con corchetes angulares (<>) con los valores pertinentes para los orígenes de datos y los receptores.

Nota:

Delta Live Tables proporciona una sintaxis totalmente declarativa para crear canalizaciones de Delta Lake y administra propiedades, como desencadenadores y puntos de control automáticamente. Consulte ¿Qué es Delta Live Tables?

Leer datos de Kafka, transformar y escribir en Kafka

Apache Kafka y otros buses de mensajería proporcionan algunas de las latencias más bajas disponibles para grandes conjuntos de datos. Use Azure Databricks para aplicar transformaciones a los datos ingeridos desde Kafka y, a continuación, vuelva a escribir datos en Kafka.

Nota:

La escritura de datos en el almacenamiento de objetos en la nube agrega una sobrecarga de latencia adicional. Si desea almacenar datos de un bus de mensajería en Delta Lake, pero requiere la menor latencia posible para las cargas de trabajo de streaming, Databricks recomienda configurar trabajos de streaming independientes para ingerir datos en el almacén de lago y aplicar transformaciones casi en tiempo real para receptores de bus de mensajería descendente.

En el ejemplo de código siguiente, se muestra un patrón sencillo para enriquecer datos de Kafka al combinarlos con datos de una tabla Delta y, a continuación, volver a escribir en Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Es necesario tener los permisos adecuados configurados para acceder al servicio Kafka. Rellene todos los parámetros indicados con corchetes angulares (<>) con los valores pertinentes para los orígenes de datos y los receptores. Consulte Procesamiento de flujos con Apache Kafka y Azure Databricks.