Información general acerca de Apache Spark Structured StreamingOverview of Apache Spark Structured Streaming

Apache Spark Structured Streaming permite implementar aplicaciones escalables, de alto rendimiento y tolerantes a errores para el procesamiento de flujos de datos.Apache Spark Structured Streaming enables you to implement scalable, high-throughput, fault-tolerant applications for processing data streams. Structured Streaming se basa en el motor de Spark SQL y mejora las construcciones de las tramas de datos y los conjuntos de datos de Spark SQL, lo que permite escribir consultas de streaming de la misma manera que se escribirían las consultas por lotes.Structured Streaming is built upon the Spark SQL engine, and improves upon the constructs from Spark SQL Data Frames and Datasets so you can write streaming queries in the same way you would write batch queries.

Las aplicaciones de Structured Streaming se ejecutan en clústeres de HDInsight Spark y se conectan a los datos de streaming de Apache Kafka, un socket de TCP (para la depuración), Azure Storage o Azure Data Lake Storage.Structured Streaming applications run on HDInsight Spark clusters, and connect to streaming data from Apache Kafka, a TCP socket (for debugging purposes), Azure Storage, or Azure Data Lake Storage. Las dos últimas opciones, que usan servicios de almacenamiento externo, permiten inspeccionar los nuevos archivos agregados al almacenamiento y procesar su contenido como si se transmitieran en secuencias.The latter two options, which rely on external storage services, enable you to watch for new files added into storage and process their contents as if they were streamed.

Structured Streaming crea una consulta de ejecución prolongada durante la que se aplican operaciones a los datos de entrada, como la selección, proyección, agregación, división en ventanas y la combinación de la trama de datos de streaming con las tramas de datos de referencia.Structured Streaming creates a long-running query during which you apply operations to the input data, such as selection, projection, aggregation, windowing, and joining the streaming DataFrame with reference DataFrames. A continuación, se generan los resultados en el almacenamiento de archivos (Azure Storage Blob o Data Lake Storage) o en cualquier almacén de datos mediante el uso de código personalizado (como SQL Database o Power BI).Next, you output the results to file storage (Azure Storage Blobs or Data Lake Storage) or to any datastore by using custom code (such as SQL Database or Power BI). Structured Streaming también proporciona salida a la consola para su depuración local y a una tabla en memoria para que se puedan ver los datos generados para la depuración en HDInsight.Structured Streaming also provides output to the console for debugging locally, and to an in-memory table so you can see the data generated for debugging in HDInsight.

Procesamiento de secuencias con HDInsight y Spark Structured Streaming

Nota

Spark Structured Streaming reemplaza a Spark Streaming (DStreams).Spark Structured Streaming is replacing Spark Streaming (DStreams). En el futuro, Structured Streaming recibirá mejoras y el mantenimiento, mientras que DStreams estará solo en modo de mantenimiento.Going forward, Structured Streaming will receive enhancements and maintenance, while DStreams will be in maintenance mode only. En la actualidad, Structured Streaming no tiene tantas características como DStreams para los orígenes y receptores que admite de forma estándar, así que es aconsejable evaluar los requisitos para elegir la opción de procesamiento de secuencias de Spark adecuada.Structured Streaming is currently not as feature-complete as DStreams for the sources and sinks that it supports out of the box, so evaluate your requirements to choose the appropriate Spark stream processing option.

Secuencias como tablasStreams as tables

Spark Structured Streaming representa una secuencia de datos como una tabla cuya profundidad es ilimitada, es decir, la tabla no para de crecer mientras lleguen datos nuevos.Spark Structured Streaming represents a stream of data as a table that is unbounded in depth, that is, the table continues to grow as new data arrives. Esta tabla de entrada la procesa continuamente una consulta de ejecución prolongada y los resultados se envían a una tabla de salida:This input table is continuously processed by a long-running query, and the results sent to an output table:

Concepto de Structured Streaming

En Structured Streaming, los datos llegan al sistema, se ingieran de inmediato y se colocan en una tabla de entrada.In Structured Streaming, data arrives at the system and is immediately ingested into an input table. Se escriben consultas (mediante las API de DataFrame y Dataset) que realizan operaciones en esta tabla de entrada.You write queries (using the DataFrame and Dataset APIs) that perform operations against this input table. La salida de la consulta genera otra tabla, la tabla de resultados.The query output yields another table, the results table. La tabla de resultados contiene los resultados de la consulta, a partir de la cual se extraen datos para un almacén de datos externo, como una base de datos relacional.The results table contains the results of your query, from which you draw data for an external datastore, such a relational database. El intervalo del desencadenador controla el momento en el que se procesan los datos desde la tabla de entrada.The timing of when data is processed from the input table is controlled by the trigger interval. De manera predeterminada, el valor del intervalo del desencadenador es cero, por lo que Structured Streaming intenta procesar los datos en cuanto llegan.By default, the trigger interval is zero, so Structured Streaming tries to process the data as soon as it arrives. En la práctica, esto significa que en cuanto Structured Streaming finaliza el procesamiento de la ejecución de la consulta anterior, inicia otro procesamiento en el que se ejecutan los datos recién recibidos.In practice, this means that as soon as Structured Streaming is done processing the run of the previous query, it starts another processing run against any newly received data. Puede configurar el desencadenador para que se ejecute en un intervalo, con el fin de que los datos de streaming se procesen en lotes temporales.You can configure the trigger to run at an interval, so that the streaming data is processed in time-based batches.

Los datos de la tabla de resultados pueden contener solo los datos nuevos, con respecto a la última vez que se procesó la consulta (modo de anexión), o bien se puede actualizar la tabla cada vez que haya nuevos datos, con el fin de que incluya todos los datos de salida desde el inicio de la consulta de streaming (modo completo).The data in the results tables may contain only the data that is new since the last time the query was processed (append mode), or the table may be refreshed every time there's new data so the table includes all of the output data since the streaming query began (complete mode).

Modo de anexiónAppend mode

En este modo, las filas que se han agregado a la tabla de resultados desde la última vez que se ejecutó la consulta son las únicas que aparecen en la tabla de resultados y que se escriben en un almacenamiento externo.In append mode, only the rows added to the results table since the last query run are present in the results table and written to external storage. Por ejemplo, la consulta más sencilla sólo copia todos los datos de la tabla de entrada en la tabla de resultados sin modificaciones.For example, the simplest query just copies all data from the input table to the results table unaltered. Cada vez que transcurre un intervalo del desencadenador, se procesan los datos nuevos y las filas que representa dichos datos aparecen en la tabla de resultados.Each time a trigger interval elapses, the new data is processed and the rows representing that new data appear in the results table.

Considere un escenario en el que procesa la telemetría de los sensores de temperatura, como un termostato.Consider a scenario where you're processing telemetry from temperature sensors, such as a thermostat. Suponga que el primer desencadenador procesó un evento a la hora 00:01 para el dispositivo 1 con una lectura de temperatura de 95 grados.Assume the first trigger processed one event at time 00:01 for device 1 with a temperature reading of 95 degrees. En el primer desencadenador de la consulta, la fila con la hora 00:01 es la única que aparece en la tabla de resultados.In the first trigger of the query, only the row with time 00:01 appears in the results table. En la hora 00:02, cuando se recibe otro evento, la fila nueva es la fila con la hora 00:02 y, por tanto, la tabla de resultados contendrá solo esa fila.At time 00:02 when another event arrives, the only new row is the row with time 00:02 and so the results table would contain only that one row.

Modo de anexión de Structured Streaming

Si se usa el modo de anexión, la consulta aplicará proyecciones (se seleccionan las columnas implicadas), filtro (se eligen solo las filas que cumplen ciertas condiciones) o combinación (aumenta los datos con data de una tabla de consulta estática).When using append mode, your query would be applying projections (selecting the columns it cares about), filtering (picking only rows that match certain conditions) or joining (augmenting the data with data from a static lookup table). El modo de anexión facilita la inserción exclusiva de los puntos de datos nuevos relevantes en el almacenamiento externo.Append mode makes it easy to push only the relevant new data points out to external storage.

Modo completoComplete mode

Piense en el mismo escenario, pero esta vez se utiliza el modo completo.Consider the same scenario, this time using complete mode. En el modo completo, toda la tabla de resultados completo se actualiza en cada desencadenador para que la tabla no incluya solo los datos de la última vez que se ejecutó el desencadenador, sino de todas las ejecuciones.In complete mode, the entire output table is refreshed on every trigger so the table includes data not just from the most recent trigger run, but from all runs. El modo completo se puede usar para copiar los datos sin modificaciones de la tabla de entrada a la tabla de resultados.You could use complete mode to copy the data unaltered from the input table to the results table. En cada ejecución desencadenada, las nuevas filas de resultados aparecen junto con todas las anteriores.On every triggered run, the new result rows appear along with all the previous rows. La tabla de resultados de salida acabará almacenando todos los datos recopilados desde que comenzó la consulta y, finalmente, se quedaría sin memoria.The output results table will end up storing all of the data collected since the query began, and you would eventually run out of memory. El modo completo está pensado para usarlo con consultas agregadas que resumen los datos de entrada de alguna manera, por lo que en cada desencadenador la tabla de resultados se actualiza con un resumen nuevo.Complete mode is intended for use with aggregate queries that summarize the incoming data in some way, so on every trigger the results table is updated with a new summary.

Suponga que hasta ahora hay datos procesados correspondientes a cinco segundos y es el momento de procesar los datos del sexto segundo.Assume so far there are five seconds' worth of data already processed, and it's time to process the data for the sixth second. La tabla de entrada tiene eventos de la hora 00:01 y la hora 00:03.The input table has events for time 00:01 and time 00:03. El objetivo de esta consulta de ejemplo es dar la temperatura media del dispositivo cada cinco segundos.The goal of this example query is to give the average temperature of the device every five seconds. La implementación de esta consulta aplica una agregado que toma todos los valores que se encuentren en cada periodo de 5 segundos, calcula el promedio de la temperatura y genera una fila para la temperatura media en ese intervalo.The implementation of this query applies an aggregate that takes all of the values that fall within each 5-second window, averages the temperature, and produces a row for the average temperature over that interval. Al final del primera periodo de 5 segundos, hay dos tuplas: (00:01, 1, 95) y (00:03, 1, 98).At the end of the first 5-second window, there are two tuples: (00:01, 1, 95) and (00:03, 1, 98). Por tanto en el periodo 00:00-00:05 la agregación genera una tupla con la temperatura media de 96,5 grados.So for the window 00:00-00:05 the aggregation produces a tuple with the average temperature of 96.5 degrees. En el siguiente periodo de 5 segundos, hay solo un punto de datos en la hora 00:06, por lo que la temperatura media resultante es 98 grados.In the next 5-second window, there's only one data point at time 00:06, so the resulting average temperature is 98 degrees. En la hora 00:10, utilizando el modo completo, la tabla de resultados tiene las filas de ambos periodos 00:00-00:05 y 05:00-00:10, porque la consulta devuelve todas las filas agregadas, no solo las nuevas.At time 00:10, using complete mode, the results table has the rows for both windows 00:00-00:05 and 00:05-00:10 because the query outputs all the aggregated rows, not just the new ones. Por consiguiente, la tabla de resultados sigue creciendo a medida que se agregan nuevas periodos.Therefore the results table continues to grow as new windows are added.

Modo completo de Structured Streaming

No todas las consultas que utilicen el modo completo harán que la tabla crezca sin límites.Not all queries using complete mode will cause the table to grow without bounds. Suponga que, en el ejemplo anterior, en lugar de calcular el promedio de la temperatura por período de tiempo, lo hace por identificador de dispositivo.Consider in the previous example that rather than averaging the temperature by time window, it averaged instead by device ID. La tabla de resultados contiene un número fijo de filas (una por dispositivo) con la temperatura promedio del dispositivo en todos los puntos de datos procedentes del mismo.The result table contains a fixed number of rows (one per device) with the average temperature for the device across all data points received from that device. Cuando se reciben nuevas temperaturas, se actualiza la tabla de resultados para que sus medias estén siempre actualizadas.As new temperatures are received, the results table is updated so that the averages in the table are always current.

Componentes de una aplicación de Spark Structured StreamingComponents of a Spark Structured Streaming application

Una consulta de ejemplo simple puede resumir las lecturas de temperatura por periodos de una hora.A simple example query can summarize the temperature readings by hour-long windows. En este caso, los datos se almacenan en archivos JSON en Azure Storage (que se asocia como almacenamiento predeterminado para el clúster de HDInsight):In this case, the data is stored in JSON files in Azure Storage (attached as the default storage for the HDInsight cluster):

{"time":1469501107,"temp":"95"}
{"time":1469501147,"temp":"95"}
{"time":1469501202,"temp":"95"}
{"time":1469501219,"temp":"95"}
{"time":1469501225,"temp":"95"}

Estos archivos JSON se almacenan en la subcarpeta temps debajo del contenedor del clúster de HDInsight.These JSON files are stored in the temps subfolder underneath the HDInsight cluster's container.

Definición del origen de entradaDefine the input source

En primer lugar, configure una instancia de DataFrame que describa el origen de los datos y todos los valores que requiera dicho origen.First configure a DataFrame that describes the source of the data and any settings required by that source. En este ejemplo se extraen los archivos JSON de Azure Storage y se les aplica un esquema en el momento en que se leen.This example draws from the JSON files in Azure Storage and applies a schema to them at read time.

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

//Cluster-local path to the folder containing the JSON files
val inputPath = "/temps/" 

//Define the schema of the JSON files as having the "time" of type TimeStamp and the "temp" field of type String
val jsonSchema = new StructType().add("time", TimestampType).add("temp", StringType)

//Create a Streaming DataFrame by calling readStream and configuring it with the schema and path
val streamingInputDF = spark.readStream.schema(jsonSchema).json(inputPath)

Aplicación de la consultaApply the query

A continuación, aplique una consulta que contenga las operaciones deseadas a la instancia de DataFrame de Streaming.Next, apply a query that contains the desired operations against the Streaming DataFrame. En este caso, una agregación agrupa todas las filas en periodos de 1 hora y, a continuación, calcula las temperaturas mínima, máxima y media en el periodo de 1 hora.In this case, an aggregation groups all the rows into 1-hour windows, and then computes the minimum, average, and maximum temperatures in that 1-hour window.

val streamingAggDF = streamingInputDF.groupBy(window($"time", "1 hour")).agg(min($"temp"), avg($"temp"), max($"temp"))

Definición del receptor de salidaDefine the output sink

A continuación, defina el destino de las filas que se agregan a la tabla de resultados en cada intervalo del desencadenador.Next, define the destination for the rows that are added to the results table within each trigger interval. En este ejemplo solo se generan todas las filas en una tabla en memoria temps que posteriormente se puede consultar con SparkSQL.This example just outputs all rows to an in-memory table temps that you can later query with SparkSQL. El modo de salida completa garantiza que siempre se generan todas las filas de todos los periodos.Complete output mode ensures that all rows for all windows are output every time.

val streamingOutDF = streamingAggDF.writeStream.format("memory").queryName("temps").outputMode("complete")

Inicio de la consultaStart the query

Inicie la consulta de streaming y ejecútela hasta que reciba una señal de finalización.Start the streaming query and run until a termination signal is received.

val query = streamingOutDF.start() 

Visualización de los resultadosView the results

Mientras se ejecuta la consulta, en la misma sesión de Spark, puede ejecutar una consulta de SparkSQL en el tempstabla en que se almacenan los resultados de la consulta.While the query is running, in the same SparkSession, you can run a SparkSQL query against the temps table where the query results are stored.

select * from temps

Esta consulta devuelve resultados similares a los siguientes:This query yields results similar to the following:

periodowindow mín. (temp.)min(temp) media (temp.)avg(temp) máx. (temp.)max(temp)
{u'start': u'2016-07-26T02:00:00.000Z', u'end'...{u'start': u'2016-07-26T02:00:00.000Z', u'end'... 9595 95,23157995.231579 9999
{u'start': u'2016-07-26T03:00:00.000Z', u'end'...{u'start': u'2016-07-26T03:00:00.000Z', u'end'... 9595 96,02304896.023048 9999
{u'start': u'2016-07-26T04:00:00.000Z', u'end'...{u'start': u'2016-07-26T04:00:00.000Z', u'end'... 9595 96,79713396.797133 9999
{u'start': u'2016-07-26T05:00:00.000Z', u'end'...{u'start': u'2016-07-26T05:00:00.000Z', u'end'... 9595 96,98463996.984639 9999
{u'start': u'2016-07-26T06:00:00.000Z', u'end'...{u'start': u'2016-07-26T06:00:00.000Z', u'end'... 9595 97,01474997.014749 9999
{u'start': u'2016-07-26T07:00:00.000Z', u'end'...{u'start': u'2016-07-26T07:00:00.000Z', u'end'... 9595 96,98097196.980971 9999
{u'start': u'2016-07-26T08:00:00.000Z', u'end'...{u'start': u'2016-07-26T08:00:00.000Z', u'end'... 9595 96,96599796.965997 9999

Para más información acerca de la API de Spark Structured Streaming, además de los orígenes de datos de entrada, las operaciones y los receptores de salida que admite, consulte Apache Spark Structured Streaming Programming Guide (Guía de programación de Apache Spark Structured Streaming).For details on the Spark Structured Stream API, along with the input data sources, operations, and output sinks it supports, see Apache Spark Structured Streaming Programming Guide.

Puntos de control y registros de escritura previaCheckpointing and write-ahead logs

Para proporcionar resistencia y tolerancia a errores, Structured Streaming utiliza los puntos de control para garantizar que el procesamiento de secuencias puede continuar sin interrupciones, aunque se produzcan errores en el nodo.To deliver resiliency and fault tolerance, Structured Streaming relies on checkpointing to ensure that stream processing can continue uninterrupted, even with node failures. En HDInsight, Spark crea puntos de control en un almacenamiento duradero, Azure Storage o Data Lake Storage.In HDInsight, Spark creates checkpoints to durable storage, either Azure Storage or Data Lake Storage. Dichos puntos almacenan la información de progreso de la consulta de streaming.These checkpoints store the progress information about the streaming query. Además, Structured Streaming usa un registro de escritura previa (WAL)In addition, Structured Streaming uses a write-ahead log (WAL). que captura los datos ingeridos que se han recibido, pero que aún no ha procesado una consulta.The WAL captures ingested data that has been received but not yet processed by a query. Si se produce un error y se reinicia el procesamiento desde el WAL, los eventos recibidos del origen no se pierden.If a failure occurs and processing is restarted from the WAL, any events received from the source aren't lost.

Implementación de aplicaciones de Spark StreamingDeploying Spark Streaming applications

Normalmente, las aplicaciones de Spark Streaming se compilan localmente en un archivo JAR y, luego, se implementan en Spark en HDInsight, para lo que se copia el archivo JAR en el almacenamiento predeterminado asociado al clúster de HDInsight.You typically build a Spark Streaming application locally into a JAR file and then deploy it to Spark on HDInsight by copying the JAR file to the default storage attached to your HDInsight cluster. Después, puede iniciar la aplicación mediante las API REST de Apache Livy disponibles en el clúster mediante una operación POST.You can start your application with the Apache Livy REST APIs available from your cluster using a POST operation. El cuerpo de la operación POST incluye un documento JSON que proporciona la ruta de acceso al archivo JAR, el nombre de la clase cuyo método principal define y ejecuta la aplicación de streaming y, opcionalmente, los requisitos de recursos del trabajo (por ejemplo, el número de ejecutores, la memoria y los núcleos) y los valores de configuración que requiere el código de la aplicación.The body of the POST includes a JSON document that provides the path to your JAR, the name of the class whose main method defines and runs the streaming application, and optionally the resource requirements of the job (such as the number of executors, memory and cores), and any configuration settings your application code requires.

Implementación de una aplicación de Spark Streaming

También se puede comprobar el estado de todas las aplicaciones con una solicitud GET en un punto de conexión de LIVY.The status of all applications can also be checked with a GET request against a LIVY endpoint. Por último, para finalizar una aplicación en ejecución, emita una solicitud DELETE en el punto de conexión de LIVY.Finally, you can terminate a running application by issuing a DELETE request against the LIVY endpoint. Para más información sobre la API de LIVY, consulte Trabajos remotos con Apache LIVY.For details on the LIVY API, see Remote jobs with Apache LIVY

Pasos siguientesNext steps