Introducción a Apache Spark StreamingOverview of Apache Spark Streaming

Apache Spark Streaming proporciona procesamiento de flujo de datos en clústeres de HDInsight Spark.Apache Spark Streaming provides data stream processing on HDInsight Spark clusters. Garantiza que todos los eventos de entrada se procesan exactamente una vez, incluso si se produce un error del nodo.With a guarantee that any input event is processed exactly once, even if a node failure occurs. Un flujo de Spark es un trabajo de ejecución prolongada que recibe datos de entrada de una amplia variedad de orígenes, incluido Azure Event Hubs.A Spark Stream is a long-running job that receives input data from a wide variety of sources, including Azure Event Hubs. Asimismo, Azure IoT Hub, Apache Kafka, Apache Flume, Twitter, ZeroMQ, sockets TCP sin formato o desde la supervisión de los sistemas de archivo YARN de Apache Hadoop.Also: Azure IoT Hub, Apache Kafka, Apache Flume, Twitter, ZeroMQ, raw TCP sockets, or from monitoring Apache Hadoop YARN filesystems. A diferencia de un proceso exclusivamente orientado a eventos, un flujo de Spark procesa por lotes los datos de entrada en ventanas de tiempo.Unlike a solely event-driven process, a Spark Stream batches input data into time windows. Por ejemplo, como un segmento de 2 segundos, y luego transforma cada lote de datos utilizando operaciones de asignación, reducción, unión y extracción.Such as a 2-second slice, and then transforms each batch of data using map, reduce, join, and extract operations. Spark Streaming después escribe los datos transformados en sistemas de archivos, bases de datos, paneles y la consola.The Spark Stream then writes the transformed data out to filesystems, databases, dashboards, and the console.

Procesamiento de flujos con Spark Streaming y HDInsight

Las aplicaciones de Spark Streaming deben esperar una fracción de segundo para recopilar cada micro-batch de eventos antes de enviar ese lote para procesarlo.Spark Streaming applications must wait a fraction of a second to collect each micro-batch of events before sending that batch on for processing. En cambio, una aplicación controlada por eventos procesa cada evento inmediatamente.In contrast, an event-driven application processes each event immediately. La latencia de Spark Streaming suele ser de unos segundos.Spark Streaming latency is typically under a few seconds. Las ventajas del enfoque de los microlotes son una mayor eficacia del procesamiento de datos y cálculos agregados más sencillos.The benefits of the micro-batch approach are more efficient data processing and simpler aggregate calculations.

Introducción a los flujos DStreamIntroducing the DStream

Spark Streaming representa un flujo continuo de datos entrantes que utilizan un flujo discretizado llamado "DStream".Spark Streaming represents a continuous stream of incoming data using a discretized stream called a DStream. Se puede crear un flujo DStream a partir de orígenes de entrada como Event Hubs o Kafka.A DStream can be created from input sources such as Event Hubs or Kafka. O aplicando transformaciones en otro DStream.Or by applying transformations on another DStream.

Un flujo DStream proporciona una capa de abstracción basada en los datos del evento sin procesar.A DStream provides a layer of abstraction on top of the raw event data.

Comience con un solo evento, por ejemplo, una lectura de temperatura en un termostato conectado.Start with a single event, say a temperature reading from a connected thermostat. Cuando este evento llega a la aplicación de Spark Streaming, se almacena de forma confiable, donde se replica en varios nodos.When this event arrives at your Spark Streaming application, the event is stored in a reliable way, where it's replicated on multiple nodes. Esta tolerancia a errores garantiza que un error de un solo nodo no provocará la pérdida del evento.This fault-tolerance ensures that the failure of any single node won't result in the loss of your event. Spark Core usa una estructura de datos que distribuye los datos entre varios nodos del clúster.The Spark core uses a data structure that distributes data across multiple nodes in the cluster. En este caso, en general, cada nodo mantiene sus propios datos en memoria para obtener el mejor rendimiento.Where each node generally maintains its own data in-memory for best performance. Esta estructura de datos se denomina conjunto de datos distribuido resistente (RDD).This data structure is called a resilient distributed dataset (RDD).

Cada RDD representa eventos recopilados a través de un período definido por el usuario llamado intervalo entre lotes.Each RDD represents events collected over a user-defined timeframe called the batch interval. Cuando transcurre cada intervalo entre lotes, se genera un nuevo diseño que contiene todos los datos de ese intervalo.As each batch interval elapses, a new RDD is produced that contains all the data from that interval. El conjunto continuo de datos distribuidos resistentes se recopila en un flujo DStream.The continuous set of RDDs is collected into a DStream. Por ejemplo, si el intervalo entre lotes es de un segundo, el flujo DStream emite un lote cada segundo con un RDD que contiene todos los datos ingeridos durante ese segundo.For example, if the batch interval is one second long, your DStream emits a batch every second containing one RDD that contains all the data ingested during that second. Al procesar el flujo DStream, el evento de temperatura se muestra en uno de estos lotes.When processing the DStream, the temperature event appears in one of these batches. Una aplicación de Spark Streaming procesa los lotes que contienen los eventos y, en última instancia, actúa en los datos almacenados en cada RDD.A Spark Streaming application processes the batches that contain the events and ultimately acts on the data stored in each RDD.

Flujo DStream de ejemplo con eventos de temperatura

Estructura de una aplicación de Spark StreamingStructure of a Spark Streaming application

Una aplicación de Spark Streaming es una aplicación de ejecución prolongada que recibe datos de orígenes de ingesta.A Spark Streaming application is a long-running application that receives data from ingest sources. Aplica las transformaciones para procesar los datos y, a continuación, envía los datos a uno o varios destinos.Applies transformations to process the data, and then pushes the data out to one or more destinations. La estructura de una aplicación de Spark Streaming tiene una parte estática y otra dinámica.The structure of a Spark Streaming application has a static part and a dynamic part. La parte estática define el origen de los datos y qué procesamiento hay que realizar en ellos.The static part defines where the data comes from, what processing to do on the data. Además, dónde deben ir los resultados.And where the results should go. La parte dinámica ejecuta la aplicación de forma indefinida y espera una señal de detención.The dynamic part is running the application indefinitely, waiting for a stop signal.

Por ejemplo, la siguiente aplicación sencilla recibe una línea de texto a través de un socket TCP y cuenta el número de veces que aparece cada palabra.For example, the following simple application receives a line of text over a TCP socket and counts the number of times each word appears.

Definición de la aplicaciónDefine the application

La definición de la lógica de la aplicación consta de cuatro pasos:The application logic definition has four steps:

  1. Crear un objeto StreamingContextCreate a StreamingContext.
  2. Crear un flujo DStream a partir del objeto StreamingContextCreate a DStream from the StreamingContext.
  3. Aplicar transformaciones al flujo DStreamApply transformations to the DStream.
  4. Generar los resultadosOutput the results.

Esta definición es estática y no se procesa ningún dato hasta que se ejecuta la aplicación.This definition is static, and no data is processed until you run the application.

Creación de un objeto StreamingContextCreate a StreamingContext

Cree un objeto StreamingContext desde el objeto SparkContext que apunta al clúster.Create a StreamingContext from the SparkContext that points to your cluster. Al crear un objeto StreamingContext, especifique el tamaño del lote en segundos, por ejemplo:When creating a StreamingContext, you specify the size of the batch in seconds, for example:

import org.apache.spark._
import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(1))

Creación de un flujo DStreamCreate a DStream

Con la instancia del objeto StreamingContext, cree un flujo DStream de entrada para el origen de entrada.With the StreamingContext instance, create an input DStream for your input source. En este caso, la aplicación se fija en la apariencia de los archivos nuevos en el almacenamiento asociado predeterminado.In this case, the application is watching for the appearance of new files in the default attached storage.

val lines = ssc.textFileStream("/uploads/Test/")

Aplicación de transformacionesApply transformations

Implemente el procesamiento aplicando transformaciones en el flujo DStream.You implement the processing by applying transformations on the DStream. Esta aplicación recibe una línea de texto a la vez desde el archivo y divide cada línea en palabras.This application receives one line of text at a time from the file, splits each line into words. Y, a continuación, usa un patrón map-reduce para contar el número de veces que aparece cada palabra.And then uses a map-reduce pattern to count the number of times each word appears.

val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

Generación de los resultadosOutput results

Envíe los resultados de transformación a los sistemas de destino aplicando operaciones de salida.Push the transformation results out to the destination systems by applying output operations. En este caso, el resultado de cada ejecución durante el cálculo se imprime en la salida de consola.In this case, the result of each run through the computation is printed in the console output.

wordCounts.print()

Ejecución de la aplicaciónRun the application

Inicie la aplicación de streaming y ejecútela hasta que se reciba una señal de finalización.Start the streaming application and run until a termination signal is received.

ssc.start()
ssc.awaitTermination()

Para obtener más información sobre la API de Spark Streaming, consulte la guía de programación de Apache Spark Streaming.For details on the Spark Stream API, see Apache Spark Streaming Programming Guide.

La siguiente aplicación de ejemplo es independiente, así que puede ejecutarla en un cuaderno de Jupyter Notebook.The following sample application is self-contained, so you can run it inside a Jupyter Notebook. En este ejemplo, se crea un origen de datos ficticio en la clase DummySource que genera el valor de un contador y la hora actual en milisegundos cada cinco segundos.This example creates a mock data source in the class DummySource that outputs the value of a counter and the current time in milliseconds every five seconds. Un nuevo objeto StreamingContext tiene un intervalo entre lotes de treinta segundos.A new StreamingContext object has a batch interval of 30 seconds. Cada vez que se crea un lote, la aplicación de streaming examina el RDD generado.Every time a batch is created, the streaming application examines the RDD produced. A continuación, convierte el RDD en un DataFrame de Spark y crea una tabla temporal sobre el DataFrame.Then converts the RDD to a Spark DataFrame, and creates a temporary table over the DataFrame.

class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {

    /** Start the thread that simulates receiving data */
    def onStart() {
        new Thread("Dummy Source") { override def run() { receive() } }.start()
    }

    def onStop() {  }

    /** Periodically generate a random number from 0 to 9, and the timestamp */
    private def receive() {
        var counter = 0  
        while(!isStopped()) {
            store(Iterator((counter, System.currentTimeMillis)))
            counter += 1
            Thread.sleep(5000)
        }
    }
}

// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))

// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)

// Create the stream
val stream = ssc.receiverStream(new DummySource())

// Process RDDs in the batch
stream.foreachRDD { rdd =>

    // Access the SQLContext and create a table called demo_numbers we can query
    val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
    _sqlContext.createDataFrame(rdd).toDF("value", "time")
        .registerTempTable("demo_numbers")
}

// Start the stream processing
ssc.start()

Espere aproximadamente 30 segundos después de iniciar la aplicación anterior.Wait for about 30 seconds after starting the application above. Después, puede consultar periódicamente el objeto DataFrame para ver el conjunto actual de valores presentes en el lote, por ejemplo, con el uso de esta consulta SQL:Then, you can query the DataFrame periodically to see the current set of values present in the batch, for example using this SQL query:

%%sql
SELECT * FROM demo_numbers

El resultado tendrá un aspecto similar al siguiente:The resulting output looks like the following output:

valuevalue timetime
1010 14973144652561497314465256
1111 14973144702721497314470272
1212 14973144752891497314475289
1313 14973144803101497314480310
1414 14973144853271497314485327
1515 14973144903461497314490346

Hay seis valores, ya que el objeto DummySource crea un valor cada cinco segundos y la aplicación emite un lote cada treinta segundos.There are six values, since the DummySource creates a value every 5 seconds and the application emits a batch every 30 seconds.

Ventanas deslizantesSliding windows

Para realizar cálculos agregados en el flujo DStream durante un período concreto, por ejemplo, para obtener una temperatura media durante los últimos dos segundos, use las operaciones de sliding window incluidas con Spark Streaming.To do aggregate calculations on your DStream over some time period, for example to get an average temperature over the last two seconds, use the sliding window operations included with Spark Streaming. Una ventana deslizante tiene una duración (la longitud de la ventana) y un intervalo durante el cual se evalúa su contenido (el intervalo de deslizamiento).A sliding window has a duration (the window length) and the interval during which the window's contents are evaluated (the slide interval).

Las ventanas deslizantes se pueden superponer; por ejemplo, puede definir una ventana con una longitud de dos segundos, que se desliza cada segundo.Sliding windows can overlap, for example, you can define a window with a length of two seconds, that slides every one second. Esta acción implica que cada vez que realice un cálculo de agregación, la ventana incluirá los datos del último segundo de la ventana anterior.This action means every time you do an aggregation calculation, the window will include data from the last one second of the previous window. Además de cualquier dato nuevo del siguiente segundo.And any new data in the next one second.

Ejemplo de ventana inicial con eventos de temperatura

Ventana de ejemplo con eventos de temperatura después del deslizamiento

En el ejemplo siguiente, se actualiza el código que utiliza el objeto DummySource para recopilar los lotes en una ventana con una duración de un minuto y un deslizamiento de un minuto.The following example updates the code that uses the DummySource, to collect the batches into a window with a one-minute duration and a one-minute slide.

class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {

    /** Start the thread that simulates receiving data */
    def onStart() {
        new Thread("Dummy Source") { override def run() { receive() } }.start()
    }

    def onStop() {  }

    /** Periodically generate a random number from 0 to 9, and the timestamp */
    private def receive() {
        var counter = 0  
        while(!isStopped()) {
            store(Iterator((counter, System.currentTimeMillis)))
            counter += 1
            Thread.sleep(5000)
        }
    }
}

// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))

// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)

// Create the stream
val stream = ssc.receiverStream(new DummySource())

// Process batches in 1 minute windows
stream.window(org.apache.spark.streaming.Minutes(1)).foreachRDD { rdd =>

    // Access the SQLContext and create a table called demo_numbers we can query
    val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
    _sqlContext.createDataFrame(rdd).toDF("value", "time")
    .registerTempTable("demo_numbers")
}

// Start the stream processing
ssc.start()

Después del primer minuto, hay doce entradas: seis entradas de cada uno de los dos lotes recopilados en la ventana.After the first minute, there are 12 entries - six entries from each of the two batches collected in the window.

valuevalue timetime
11 14973162941391497316294139
22 14973162991581497316299158
33 14973163041781497316304178
44 14973163092041497316309204
55 14973163142241497316314224
66 14973163192431497316319243
77 14973163242601497316324260
88 14973163292781497316329278
99 14973163342931497316334293
1010 14973163393141497316339314
1111 14973163443391497316344339
1212 14973163493611497316349361

Las funciones de ventana deslizante disponibles en la API de Spark Streaming incluyen las funciones window, countByWindow, reduceByWindow y countByValueAndWindow.The sliding window functions available in the Spark Streaming API include window, countByWindow, reduceByWindow, and countByValueAndWindow. Para obtener más información sobre estas funciones, consulte Transformations on DStreams (Transformaciones en flujos DStream).For details on these functions, see Transformations on DStreams.

Puntos de controlCheckpointing

Para proporcionar resistencia y tolerancia a errores, Spark Streaming se basa en los puntos de control para garantizar que el procesamiento de flujos puede continuar sin interrupciones, aunque se produzcan errores de nodo.To deliver resiliency and fault tolerance, Spark Streaming relies on checkpointing to ensure that stream processing can continue uninterrupted, even in the face of node failures. Spark crea puntos de control en un almacenamiento duradero (Azure Storage o Data Lake Storage).Spark creates checkpoints to durable storage (Azure Storage or Data Lake Storage). Estos puntos de control almacenan metadatos de la aplicación de streaming, como la configuración y las operaciones definidas por la aplicación.These checkpoints store streaming application metadata such as the configuration, and the operations defined by the application. Además, todos los lotes que se pusieron en cola pero que todavía no se han procesado.Also, any batches that were queued but not yet processed. En algunos casos, los puntos de control también guardarán los datos en los RDD para recompilar con más rapidez el estado de los datos a partir de lo que haya presente en los RDD que administra Spark.Sometimes, the checkpoints will also include saving the data in the RDDs to more quickly rebuild the state of the data from what is present in the RDDs managed by Spark.

Implementación de aplicaciones de Spark StreamingDeploying Spark Streaming applications

Normalmente se crea una aplicación de Spark Streaming localmente en un archivo JAR.You typically build a Spark Streaming application locally into a JAR file. A continuación, se implementa en Spark en HDInsight copiando el archivo JAR en el almacenamiento conectado predeterminado.Then deploy it to Spark on HDInsight by copying the JAR file to the default attached storage. Después, puede iniciar la aplicación mediante las REST API de LIVY disponibles en el clúster mediante una operación POST.You can start your application with the LIVY REST APIs available from your cluster using a POST operation. El cuerpo de POST incluye un documento JSON que proporciona la ruta de acceso al archivo JAR.The body of the POST includes a JSON document that provides the path to your JAR. Y el nombre de la clase cuyo método principal define y ejecuta la aplicación de streaming y, opcionalmente, los requisitos de recursos del trabajo (por ejemplo, el número de ejecutores, la memoria y los núcleos).And the name of the class whose main method defines and runs the streaming application, and optionally the resource requirements of the job (such as the number of executors, memory, and cores). Además, todos los valores de configuración que requiere el código de la aplicación.Also, any configuration settings your application code requires.

Implementación de una aplicación de Spark Streaming

También se puede comprobar el estado de todas las aplicaciones con una solicitud GET en un punto de conexión de LIVY.The status of all applications can also be checked with a GET request against a LIVY endpoint. Por último, puede finalizar una aplicación en ejecución emitiendo una solicitud DELETE en el punto de conexión de LIVY.Finally, you can end a running application by issuing a DELETE request against the LIVY endpoint. Para más información sobre la API de LIVY, consulte Trabajos remotos con Apache LIVY.For details on the LIVY API, see Remote jobs with Apache LIVY

Pasos siguientesNext steps