Canalización de procesamiento de flujos con Azure Stream AnalyticsStream processing pipeline with Azure Stream Analytics

Esta arquitectura de referencia muestra una canalización de procesamiento de flujos de datos de un extremo a otro.This reference architecture shows an end-to-end stream processing pipeline. La canalización ingiere los datos de dos orígenes, correlaciona los registros de los dos flujos de datos y calcula una media acumulada en un intervalo de tiempo.The pipeline ingests data from two sources, correlates records in the two streams, and calculates a rolling average across a time window. Los resultados se almacenan para su posterior análisis.The results are stored for further analysis.

Logotipo de github una implementación de referencia para esta arquitectura está disponible en GitHub.GitHub logo A reference implementation for this architecture is available on GitHub.

Arquitectura de referencia para la creación de una canalización de procesamiento de flujos de datos con Azure Stream Analytics

Escenario: Una empresa de taxi recopila los datos acerca de cada carrera de taxi.Scenario: A taxi company collects data about each taxi trip. En este escenario, se supone que hay dos dispositivos independientes que envían datos.For this scenario, we assume there are two separate devices sending data. El taxi tienen un medidor que envía la información acerca de cada carrera: duración, distancia y ubicaciones de recogida y destino.The taxi has a meter that sends information about each ride — the duration, distance, and pickup and dropoff locations. Un dispositivo independiente acepta los pagos de clientes y envía los datos sobre las tarifas.A separate device accepts payments from customers and sends data about fares. La empresa de taxi desea calcular el promedio de propinas por milla conducida, en tiempo real, con el fin de identificar las tendencias.The taxi company wants to calculate the average tip per mile driven, in real time, in order to spot trends.

ArquitecturaArchitecture

La arquitectura consta de los siguientes componentes:The architecture consists of the following components.

Orígenes de datos.Data sources. En esta arquitectura, hay dos orígenes de datos que generan flujos de datos en tiempo real.In this architecture, there are two data sources that generate data streams in real time. El primer flujo de datos contiene información sobre la carrera y, el segundo, contiene información sobre las tarifas.The first stream contains ride information, and the second contains fare information. La arquitectura de referencia incluye un generador de datos simulados que lee un conjunto de archivos estáticos e inserta los datos en Event Hubs.The reference architecture includes a simulated data generator that reads from a set of static files and pushes the data to Event Hubs. En una aplicación real, los orígenes de datos serían los dispositivos instalados en el taxi.In a real application, the data sources would be devices installed in the taxi cabs.

Azure Event Hubs.Azure Event Hubs. Event Hubs es un servicio de ingesta de eventos.Event Hubs is an event ingestion service. Esta arquitectura emplea dos instancias de centro de eventos, uno para cada origen de datos.This architecture uses two event hub instances, one for each data source. Cada origen de datos envía un flujo de datos al centro de eventos asociado.Each data source sends a stream of data to the associated event hub.

Azure Stream Analytics.Azure Stream Analytics. Stream Analytics es un motor de procesamiento de eventos.Stream Analytics is an event-processing engine. Un trabajo de Stream Analytics lee los flujos de datos desde los dos centros de eventos y realiza el procesamiento de los flujos.A Stream Analytics job reads the data streams from the two event hubs and performs stream processing.

Cosmos DB.Cosmos DB. La salida del trabajo de Stream Analytics es una serie de registros que se escriben como documentos JSON en una base de datos de documentos de Cosmos DB.The output from the Stream Analytics job is a series of records, which are written as JSON documents to a Cosmos DB document database.

Microsoft Power BI.Microsoft Power BI. Power BI es un conjunto de herramientas de análisis de negocios que sirve para analizar datos con el fin de obtener perspectivas empresariales.Power BI is a suite of business analytics tools to analyze data for business insights. En esta arquitectura, carga los datos desde Cosmos DB.In this architecture, it loads the data from Cosmos DB. Esto permite a los usuarios analizar el conjunto completo de los datos históricos que se han recopilado.This allows users to analyze the complete set of historical data that's been collected. También podría transmitir los resultados directamente desde Stream Analytics a Power BI para obtener una vista en tiempo real de los datos.You could also stream the results directly from Stream Analytics to Power BI for a real-time view of the data. Para más información, consulte Streaming en tiempo real en Power BI.For more information, see Real-time streaming in Power BI.

Azure Monitor.Azure Monitor. Azure Monitor recopila métricas de rendimiento sobre los servicios de Azure implementados en la solución.Azure Monitor collects performance metrics about the Azure services deployed in the solution. Puede visualizarlos en un panel para obtener información acerca del estado de la solución.By visualizing these in a dashboard, you can get insights into the health of the solution.

Ingesta de datosData ingestion

Para simular un origen de datos, esta arquitectura de referencia usa el conjunto de datos New York City Taxi Data[1].To simulate a data source, this reference architecture uses the New York City Taxi Data dataset[1]. Este conjunto de datos contiene datos sobre viajes de taxi en la ciudad de Nueva York durante un período–de cuatro años (2010 2013).This dataset contains data about taxi trips in New York City over a four-year period (2010–2013). Contiene dos tipos de registro: los datos de entrada y los datos de tarifas.It contains two types of record: ride data and fare data. Los datos de carreras incluyen la duración del viaje, la distancia de viaje y la ubicación de recogida y destino.Ride data includes trip duration, trip distance, and pickup and dropoff location. Los datos de tarifas incluyen las tarifas, los impuestos y las propinas.Fare data includes fare, tax, and tip amounts. Los campos comunes en ambos tipos de registro son la placa y el número de licencia, y el identificador del proveedor.Common fields in both record types include medallion number, hack license, and vendor ID. Juntos, estos tres campos identifican un taxi además del conductor.Together these three fields uniquely identify a taxi plus a driver. Los datos se almacenan en formato CSV.The data is stored in CSV format.

[1] Donovan, Brian; Trabajo, Dan (2016): New York City Taxi Trip Data (2010-2013).[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidad de Illinois en Urbana-Champaign.University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8https://doi.org/10.13012/J8PN93H8

El generador de datos es una aplicación de .NET Core que lee los registros y los envía a Azure Event Hubs.The data generator is a .NET Core application that reads the records and sends them to Azure Event Hubs. El generador envía los datos de carreras en formato JSON y los datos de tarifas en formato CSV.The generator sends ride data in JSON format and fare data in CSV format.

Event Hubs usa particiones para segmentar los datos.Event Hubs uses partitions to segment the data. Las particiones permiten a los consumidores leer cada partición en paralelo.Partitions allow a consumer to read each partition in parallel. Cuando se envían datos a Event Hubs, puede especificar explícitamente la clave de partición.When you send data to Event Hubs, you can specify the partition key explicitly. En caso contrario, los registros se asignan a las particiones en modo round-robin.Otherwise, records are assigned to partitions in round-robin fashion.

En este escenario en particular, los datos de carreras y los datos de tarifas deben terminar con el mismo identificador de partición para un taxi determinado.In this particular scenario, ride data and fare data should end up with the same partition ID for a given taxi cab. Esto permite a Stream Analytics aplicar un cierto paralelismo cuando se establece una correlación entre los dos flujos.This enables Stream Analytics to apply a degree of parallelism when it correlates the two streams. Un registro en la partición n de los datos de carreras coincidirá con un registro en la partición de datos n de los datos de tarifas.A record in partition n of the ride data will match a record in partition n of the fare data.

Diagrama de procesamiento de flujos de datos con Azure Stream Analytics y Event Hubs

En el generador de datos, el modelo de datos común para ambos tipos de registro tiene una propiedad PartitionKey que es la concatenación de Medallion, HackLicense y VendorId.In the data generator, the common data model for both record types has a PartitionKey property which is the concatenation of Medallion, HackLicense, and VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Esta propiedad se utiliza para proporcionar una clave de partición explícita cuando se realizan envíos a Event Hubs:This property is used to provide an explicit partition key when sending to Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Procesamiento de flujosStream processing

El trabajo de procesamiento de flujos se define mediante una consulta SQL con diferentes pasos.The stream processing job is defined using a SQL query with several distinct steps. Los dos primeros pasos simplemente seleccionan los registros de los dos flujos de entrada.The first two steps simply select records from the two input streams.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

El paso siguiente combina los dos flujos de entrada para seleccionar los registros coincidentes de cada flujo.The next step joins the two input streams to select matching records from each stream.

Step3 AS (
  SELECT
         tr.Medallion,
         tr.HackLicense,
         tr.VendorId,
         tr.PickupTime,
         tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.Medallion = tf.Medallion
     AND tr.HackLicense = tf.HackLicense
     AND tr.VendorId = tf.VendorId
     AND tr.PickupTime = tf.PickupTime
     AND tr.PartitionId = tf.PartitionId
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Esta consulta combina los registros en un conjunto de campos que identifican los registros coincidentes de forma única (Medallion, HackLicense, VendorId y PickupTime).This query joins records on a set of fields that uniquely identify matching records (Medallion, HackLicense, VendorId, and PickupTime). La instrucción JOIN también incluye el identificador de partición.The JOIN statement also includes the partition ID. Como ya se mencionó, esto aprovecha el hecho de que los registros coincidentes siempre tienen el mismo identificador de partición en este escenario.As mentioned, this takes advantage of the fact that matching records always have the same partition ID in this scenario.

En Stream Analytics, las combinaciones son temporales, lo que significa que los registros se combinan dentro de un intervalo de tiempo determinado.In Stream Analytics, joins are temporal, meaning records are joined within a particular window of time. De caso contrario, el trabajo podría tener que esperar indefinidamente a una coincidencia.Otherwise, the job might need to wait indefinitely for a match. La función DATEDIFF especifica la separación máxima en el tiempo de dos registros coincidentes para considerarlo una coincidencia.The DATEDIFF function specifies how far two matching records can be separated in time for a match.

El último paso del trabajo calcula la media de propinas por milla, agrupada por una ventana de salto de 5 minutos.The last step in the job computes the average tip per mile, grouped by a hopping window of 5 minutes.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics proporciona varias funciones de intervalos de tiempo.Stream Analytics provides several windowing functions. Una ventana de salto avanza en el tiempo un período fijo; en este caso, 1 minuto por salto.A hopping window moves forward in time by a fixed period, in this case 1 minute per hop. El objetivo es calcular una media móvil en los últimos cinco minutos.The result is to calculate a moving average over the past 5 minutes.

En la arquitectura que se muestra a continuación, solo se guardan los resultados del trabajo de Stream Analytics en Cosmos DB.In the architecture shown here, only the results of the Stream Analytics job are saved to Cosmos DB. En un escenario de macrodatos, considere la posibilidad de usar también Event Hubs Capture para guardar los datos de eventos sin procesar en Azure Blob Storage.For a big data scenario, consider also using Event Hubs Capture to save the raw event data into Azure Blob storage. Mantener los datos sin procesar le permitirá ejecutar consultas por lotes en los datos históricos en el futuro, con el fin de obtener información nueva a partir de los datos.Keeping the raw data will allow you to run batch queries over your historical data at later time, in order to derive new insights from the data.

Consideraciones sobre escalabilidadScalability considerations

Event HubsEvent Hubs

La capacidad de procesamiento de Event Hubs se mide en unidades de procesamiento.The throughput capacity of Event Hubs is measured in throughput units. Para escalar un centro de eventos automáticamente, puede habilitar el inflado automático, que permite escalar las unidades de procesamiento en función del tráfico, hasta un máximo configurado.You can autoscale an event hub by enabling auto-inflate, which automatically scales the throughput units based on traffic, up to a configured maximum.

Stream AnalyticsStream Analytics

Para Stream Analytics, los recursos de proceso que se asignan a un trabajo se miden en unidades de streaming.For Stream Analytics, the computing resources allocated to a job are measured in Streaming Units. Los trabajos de Stream Analytics escalan mejor si el trabajo se puede ejecutar en paralelo.Stream Analytics jobs scale best if the job can be parallelized. De este modo, Stream Analytics puede distribuir el trabajo entre varios nodos de proceso.That way, Stream Analytics can distribute the job across multiple compute nodes.

Para los datos de entrada de Event Hubs, utilice la palabra clave PARTITION BY para dividir el trabajo de Stream Analytics en particiones.For Event Hubs input, use the PARTITION BY keyword to partition the Stream Analytics job. Los datos se dividirán en subconjuntos en función de las particiones de Event Hubs.The data will be divided into subsets based on the Event Hubs partitions.

Las funciones de intervalos de tiempo y las combinaciones temporales requieren unidades de streaming adicionales.Windowing functions and temporal joins require additional SU. Cuando sea posible, use PARTITION BY para que cada partición se procese por separado.When possible, use PARTITION BY so that each partition is processed separately. Para más información, consulte Descripción y ajuste de las unidades de streaming.For more information, see Understand and adjust Streaming Units.

Si no es posible ejecutar todo el trabajo de Stream Analytics en paralelo, intente dividirlo en varios pasos, empezando con uno o varios pasos en paralelo.If it's not possible to parallelize the entire Stream Analytics job, try to break the job into multiple steps, starting with one or more parallel steps. De este modo, los primeros pasos que pueden ejecutar en paralelo.That way, the first steps can run in parallel. Por ejemplo, en esta arquitectura de referencia:For example, in this reference architecture:

  • Los pasos 1 y 2 son instrucciones SELECT simples que seleccionan registros dentro de una sola partición.Steps 1 and 2 are simple SELECT statements that select records within a single partition.
  • El paso 3 realiza una combinación con particiones en los dos flujos de entrada.Step 3 performs a partitioned join across two input streams. Este paso aprovecha del hecho de que los registros que coinciden comparten la misma clave de partición y, por lo tanto, garantiza que tendrán el mismo identificador de partición en cada flujo de entrada.This step takes advantage of the fact that matching records share the same partition key, and so are guaranteed to have the same partition ID in each input stream.
  • El paso 4 realiza la agregación en todas las particiones.Step 4 aggregates across all of the partitions. Este paso no se puede ejecutar en paralelo.This step cannot be parallelized.

Use el diagrama de trabajo de Stream Analytics para ver cuántas particiones se asignan a cada paso del trabajo.Use the Stream Analytics job diagram to see how many partitions are assigned to each step in the job. El siguiente diagrama muestra el diagrama de trabajo de esta arquitectura de referencia:The following diagram shows the job diagram for this reference architecture:

Diagrama de trabajo

Cosmos DBCosmos DB

La capacidad de rendimiento de Cosmos DB se mide en unidades de solicitud (RU).Throughput capacity for Cosmos DB is measured in Request Units (RU). Para escalar un contenedor de Cosmos DB más allá de 10 000 RU, debe especificar una clave de partición al crear el contenedor, e incluir la clave de partición en todos los documentos.In order to scale a Cosmos DB container past 10,000 RU, you must specify a partition key when you create the container, and include the partition key in every document.

En esta arquitectura de referencia, se crean nuevos documentos solo una vez por minuto (el intervalo de la ventana salto), por lo que los requisitos de procesamiento son bastante bajos.In this reference architecture, new documents are created only once per minute (the hopping window interval), so the throughput requirements are quite low. Por ese motivo, no es necesario asignar una clave de partición en este escenario.For that reason, there's no need to assign a partition key in this scenario.

Consideraciones sobre supervisiónMonitoring considerations

Con cualquier solución de procesamiento de flujos, es importante supervisar el rendimiento y el estado del sistema.With any stream processing solution, it's important to monitor the performance and health of the system. Azure Monitor recopila métricas y registros de diagnóstico para los servicios de Azure que se utilizan en la arquitectura.Azure Monitor collects metrics and diagnostics logs for the Azure services used in the architecture. Azure Monitor se integra en la plataforma de Azure y no requiere ningún código adicional en la aplicación.Azure Monitor is built into the Azure platform and does not require any additional code in your application.

Cualquiera de las siguientes señales de advertencia indica que debe escalar horizontalmente el recurso de Azure correspondiente:Any of the following warning signals indicate that you should scale out the relevant Azure resource:

  • Event Hubs limita las solicitudes o está cerca de la cuota diaria de mensajes.Event Hubs throttles requests or is close to the daily message quota.
  • El trabajo de Stream Analytics utiliza sistemáticamente más del 80 % de las unidades de streaming (SU) asignadas.The Stream Analytics job consistently uses more than 80% of allocated Streaming Units (SU).
  • Cosmos DB comienza a limitar las solicitudes.Cosmos DB begins to throttle requests.

La arquitectura de referencia incluye un panel personalizado, que se implementa en Azure Portal.The reference architecture includes a custom dashboard, which is deployed to the Azure portal. Después de implementar la arquitectura, puede ver el panel abriendo el Azure portal y seleccionando TaxiRidesDashboard en la lista de paneles.After you deploy the architecture, you can view the dashboard by opening the Azure portal and selecting TaxiRidesDashboard from list of dashboards. Para más información sobre cómo crear e implementar paneles personalizados en Azure Portal, consulte Creación mediante programación de paneles de Azure.For more information about creating and deploying custom dashboards in the Azure portal, see Programmatically create Azure Dashboards.

La siguiente imagen muestra el panel después de que el trabajo de Stream Analytics se ejecutara durante una hora aproximadamente.The following image shows the dashboard after the Stream Analytics job ran for about an hour.

Captura de pantalla del panel de carreras de taxi

El panel de la parte inferior izquierda muestra que el consumo de unidades de streaming para el trabajo de Stream Analytics aumenta durante los primeros 15 minutos y después se nivela.The panel on the lower left shows that the SU consumption for the Stream Analytics job climbs during the first 15 minutes and then levels off. Este es un patrón típico mientras el trabajo alcanza un estado estable.This is a typical pattern as the job reaches a steady state.

Tenga en cuenta que Event Hubs limita las solicitudes, tal y como se muestra en el panel superior derecho.Notice that Event Hubs is throttling requests, shown in the upper right panel. Un solicitud limitada de vez en cuanto no es un problema, porque el SDK de cliente de Event Hubs realiza reintentos automáticamente cuando recibe un error de limitación.An occasional throttled request is not a problem, because the Event Hubs client SDK automatically retries when it receives a throttling error. Sin embargo, si se producen errores de limitación sistemáticamente, significa que el centro de eventos necesita más unidades de procesamiento.However, if you see consistent throttling errors, it means the event hub needs more throughput units. El gráfico siguiente muestra una ejecución de prueba con la característica de inflado automático de Event Hubs, que escala horizontalmente las unidades de procesamiento automáticamente cuando es necesario.The following graph shows a test run using the Event Hubs auto-inflate feature, which automatically scales out the throughput units as needed.

Captura de pantalla del escalado automático de Event Hubs

El inflado automático se habilitó en la marca de 06:35 aproximadamente.Auto-inflate was enabled at about the 06:35 mark. Puede ver la caída de solicitudes limitadas, porque Event Hubs escaló automáticamente a 3 unidades de procesamiento.You can see the p drop in throttled requests, as Event Hubs automatically scaled up to 3 throughput units.

Curiosamente, esto tuvo el efecto secundario de aumentar el uso de las unidades de streaming en el trabajo de Stream Analytics.Interestingly, this had the side effect of increasing the SU utilization in the Stream Analytics job. Con la limitación, Event Hubs redujo artificialmente la tasa de ingesta del trabajo de Stream Analytics.By throttling, Event Hubs was artificially reducing the ingestion rate for the Stream Analytics job. Es bastante habitual que, al resolver un cuello de botella de rendimiento, se produzca otro.It's actually common that resolving one performance bottleneck reveals another. En este caso, asignar unidades de streaming adicionales para el trabajo de Stream Analytics resolvió el problema.In this case, allocating additional SU for the Stream Analytics job resolved the issue.

Implementación de la soluciónDeploy the solution

Para la implementación y la ejecución de la implementación de referencia, siga los pasos del archivo Léame de GitHub.To the deploy and run the reference implementation, follow the steps in the GitHub readme.

Puede examinar los siguientes escenarios de ejemplo de Azure que muestran soluciones específicas que usan algunas tecnologías similares:You may wish to review the following Azure example scenarios that demonstrate specific solutions using some of the same technologies: