Descripción del flujo estructurado de Spark

Completado

El flujo estructurado de Spark es una plataforma popular para el procesamiento en memoria. Tiene un paradigma unificado para lotes y streaming. Todo lo que aprende y usa para el procesamiento por lotes puede utilizarlo para el streaming, por lo que la transición de un método al otro es sencilla. Spark Streaming es simplemente un motor que se ejecuta sobre Apache Spark.

What is Spark structured streaming

El flujo estructurado crea una consulta de larga ejecución durante la que se aplican operaciones a los datos de entrada, como las de selección, proyección, agregación, división en periodos de tiempo y la combinación de marcos de datos de streaming con marcos de datos de referencia. A continuación, se generan los resultados en el almacenamiento de archivos (Azure Storage Blob o Data Lake Storage) o en cualquier almacén de datos mediante el uso de código personalizado (como SQL Database o Power BI). Structured Streaming también proporciona salida a la consola para su depuración local y a una tabla en memoria para que se puedan ver los datos generados para la depuración en HDInsight.

Secuencias como tablas

El flujo estructurado de Spark representa un flujo de datos como una tabla cuya profundidad es ilimitada, es decir, la tabla no para de crecer mientras lleguen datos nuevos. Esta tabla de entrada la procesa continuamente una consulta de ejecución prolongada y los resultados se envían a una tabla de salida:

How Spark structured streaming represents data as tables

En Structured Streaming, los datos llegan al sistema, se ingieran de inmediato y se colocan en una tabla de entrada. Se escriben consultas (mediante las API de DataFrame y Dataset) que realizan operaciones en esta tabla de entrada. La salida de la consulta genera otra tabla, la de resultados. La tabla de resultados contiene los resultados de la consulta, a partir de la cual se extraen datos para un almacén de datos externo, como una base de datos relacional. El intervalo del desencadenador controla el momento en el que se procesan los datos desde la tabla de entrada. De manera predeterminada, el valor del intervalo del desencadenador es cero, por lo que Structured Streaming intenta procesar los datos en cuanto llegan. En la práctica, esto significa que en cuanto Structured Streaming finaliza el procesamiento de la ejecución de la consulta anterior, inicia otro procesamiento en el que se ejecutan los datos recién recibidos. Puede configurar el desencadenador para que se ejecute en un intervalo, con el fin de que los datos de streaming se procesen en lotes temporales.

Los datos de la tabla de resultados pueden contener solo los datos nuevos con respecto a la última vez que se ha procesado la consulta (modo de anexión), o bien se puede actualizar la tabla cada vez que haya datos nuevos, con el fin de que incluya todos los datos de salida desde el inicio de la consulta de streaming (modo completo).

Modo de anexión

En este modo, las filas que se han agregado a la tabla de resultados desde la última vez que se ejecutó la consulta son las únicas que aparecen en la tabla de resultados y que se escriben en un almacenamiento externo. Por ejemplo, la consulta más sencilla solo copia todos los datos de la tabla de entrada en la tabla de resultados, sin modificaciones. Cada vez que transcurre un intervalo del desencadenador, se procesan los datos nuevos y las filas que representa dichos datos aparecen en la tabla de resultados.

Considere un escenario en el que va a procesar datos de precios de acciones. Imagine que el primer desencadenador ha procesado un evento a las 00:01 para la acción MSFT con un valor de 95 dólares. En el primer desencadenador de la consulta, la fila con la hora 00:01 es la única que aparece en la tabla de resultados. A las 00:02, cuando se recibe otro evento, la única fila nueva es la fila con la hora 00:02 y, por tanto, la tabla de resultados contendrá solo esa fila.

How Spark structured streaming in append mode

Si se usa el modo de anexión, la consulta aplicará proyecciones (se seleccionan las columnas implicadas), filtro (se eligen solo las filas que cumplen ciertas condiciones) o combinación (aumenta los datos con data de una tabla de consulta estática). El modo de anexión facilita la inserción exclusiva de los puntos de datos nuevos relevantes en el almacenamiento externo.

Modo completo

Imagine el mismo escenario, pero esta vez con el modo completo. En el modo completo, toda la tabla de resultados completo se actualiza en cada desencadenador para que la tabla no incluya solo los datos de la última vez que se ejecutó el desencadenador, sino de todas las ejecuciones. El modo completo se puede usar para copiar los datos sin modificaciones de la tabla de entrada a la de resultados. En cada ejecución desencadenada, las filas de resultados nuevas aparecen junto con todas las anteriores. En la tabla de resultados de salida se almacenarán todos los datos recopilados desde que ha comenzado la consulta y acabará por quedarse sin memoria. El modo completo está pensado para usarlo con consultas agregadas que resumen los datos de entrada de alguna manera, por lo que en cada desencadenador la tabla de resultados se actualiza con un resumen nuevo.

Imagine que hasta ahora ya se han procesado cinco segundos de datos y es el momento de procesar los datos del sexto segundo. La tabla de entrada tiene eventos para las horas 00:01 y 00:03. El objetivo de esta consulta de ejemplo es proporcionar el precio medio de las acciones cada cinco segundos. La implementación de esta consulta aplica una agregado que toma todos los valores que se encuentren en cada periodo de cinco segundos, calcula el precio medio de las acciones y genera una fila para el precio medio durante ese intervalo. Al final del primera periodo de 5 segundos, hay dos tuplas: (00:01, 1, 95) y (00:03, 1, 98). Por tanto, para el periodo 00:00-00:05, la agregación genera una tupla con el precio medio de las acciones de 96,50 USD. En el siguiente periodo de 5 segundos, solo hay un punto de datos en la hora 00:06, por lo que el precio de las acciones resultante es de 98 USD. En la hora 00:10, utilizando el modo completo, la tabla de resultados tiene las filas de ambos periodos 00:00-00:05 y 05:00-00:10, porque la consulta devuelve todas las filas agregadas, no solo las nuevas. Por tanto, la tabla de resultados sigue creciendo a medida que se agregan nuevas periodos.

How Spark structured streaming in complete mode

No todas las consultas en las que se usa el modo completo hacen que la tabla crezca de forma ilimitada. Imagine que, en el ejemplo anterior, en lugar de calcular el precio medio de las acciones por período de tiempo, se calcula por acción. La tabla de resultados contiene un número fijo de filas (una por acción) con el precio medio de las acciones en todos los puntos de datos procedentes de ese dispositivo. A medida que se reciben nuevos precios de acciones, la tabla de resultados se actualiza para que los promedios estén siempre actualizados.

¿Cuáles son las ventajas del flujo estructurado de Spark?

En el sector financiero, el control de tiempo de las transacciones es muy importante. Por ejemplo, en el mercado bursátil, es muy importante la diferencia entre el momento en que se produce la compra-venta de acciones, cuándo se recibe la transacción o cuándo se leen los datos. Las instituciones financieras dependen de estos datos críticos y del control del tiempo asociado a ellos.

Hora del evento, datos tardíos y marcas de agua

El flujo estructurado de Spark conoce la diferencia entre la hora de un evento y la hora a la que el sistema lo ha procesado. Cada evento es una fila de la tabla y la hora del evento es un valor de columna en la fila. Esto permite que las agregaciones basadas en periodos de tiempo (por ejemplo, el número de eventos por minuto) sean simplemente una agrupación y agregación en la columna de hora del evento: cada periodo de tiempo es un grupo y cada fila puede pertenecer a varios grupos o periodos. Por tanto, estas consultas de agregación basadas en periodos de hora del evento se pueden definir de forma coherente tanto en un conjunto de datos estático como en un flujo de datos, lo que facilita enormemente el trabajo de un ingeniero de datos.

Además, este modelo controla de forma natural los datos que han llegado más tarde de lo esperado en función de su hora del evento. Spark tiene control total sobre la actualización de agregados antiguos cuando hay datos tardíos, además de limpiar los agregados antiguos para limitar el tamaño de los datos de estado intermedio. Además, a partir de la versión 2.1, Spark admite marcas de agua, lo que permite especificar el umbral de los datos tardíos y que el motor limpie el estado anterior según corresponda.

Flexibilidad para cargar datos recientes o todos los datos

Como se ha explicado en la unidad anterior, puede optar por usar el modo de anexión o el modo completo cuando trabaje con el flujo estructurado de Spark, para que la tabla de resultados incluya solo los datos más recientes o todos los datos.

Admite el cambio de microlotes a procesamiento continuo

Al cambiar el tipo de desencadenador de una consulta de Spark, puede pasar del procesamiento por microlotes al procesamiento continuo sin realizar ningún otro cambio en el marco. Estos son los distintos tipos de desencadenadores que admite Spark.

  • No especificado, es el valor predeterminado. Si no se establece ningún desencadenador de forma explícita, la consulta se ejecuta en microlotes y se procesará de forma continua.
  • Microlote de intervalo fijo. La consulta se inicia en los intervalos de repetición establecidos por el usuario. Si no se reciben datos nuevos, no se ejecuta ningún proceso de microlote.
  • Microlote de un sola uso. La consulta se ejecuta en un solo microlote y después se detiene. Esto resulta útil si quiere procesar todos los datos desde el microlote anterior y puede reducir el costo de los trabajos que no es necesario ejecutar de forma continuada.
  • Continuo con un intervalo de punto de control fijo. La consulta se ejecuta en un nuevo modo de procesamiento continuo y baja latencia que permite una latencia baja (aproximadamente 1 ms) de un extremo a otro con garantías de tolerancia a errores al menos una vez. Es similar al valor predeterminado, que puede lograr garantías de exactamente una vez, pero que, en el mejor de los casos, solo alcanza latencias de aproximadamente 100 ms.

Combinación de trabajos por lotes y en streaming

Además de simplificar el cambio de trabajos por lotes a streaming, también puede combinar los dos tipos. Esto es especialmente útil cuando se quieren usar datos históricos a largo plazo para predecir tendencias futuras mientras se procesa información en tiempo real. Para las acciones, es posible que quiera examinar su precio en los últimos cinco años, además del precio actual, para predecir los cambios realizados relacionados con anuncios de ingresos anuales o trimestrales.

Periodos de hora del evento

Es posible que quiera capturar datos en periodos de tiempo, como el precio máximo y mínimo de una acción en un período de un día o un minuto (el intervalo que decida), algo que también admite el flujo estructurado de Spark. También se admiten periodos de tiempo superpuestos.

Puntos de control para la recuperación de errores

En caso de que se produzca un error o un apagado intencionado, puede recuperar el progreso y el estado anteriores de una consulta previa y continuar desde donde se haya quedado. Esto se hace mediante puntos de control y registros de escritura previa. Puede configurar una consulta con una ubicación de punto de control y la consulta guardará toda la información de progreso (es decir, el rango de desplazamientos procesados en cada desencadenador) y los agregados en ejecución en la ubicación del punto de control. Esta ubicación de punto de control debe ser una ruta de acceso en un sistema de archivos compatible con HDFS y se puede establecer como una opción en DataStreamWriter al iniciar una consulta.