Criar um pipeline de processamento de fluxo com o Azure Stream AnalyticsCreate a stream processing pipeline with Azure Stream Analytics

Esta arquitetura de referência mostra uma ponto a ponto processamento de fluxo pipeline.This reference architecture shows an end-to-end stream processing pipeline. O pipeline ingere dados de duas origens, correlaciona os registos em dois fluxos e calcula uma média móvel numa janela de tempo.The pipeline ingests data from two sources, correlates records in the two streams, and calculates a rolling average across a time window. Os resultados são armazenados para análise adicional.The results are stored for further analysis.

Logótipo do GitHub uma implementação de referência para esta arquitetura está disponível no GitHub.GitHub logo A reference implementation for this architecture is available on GitHub.

Arquitetura de referência para a criação de um pipeline de processamento de fluxo com o Azure Stream Analytics

Cenário: Uma empresa de táxis recolhe dados sobre cada viagem de táxis.Scenario: A taxi company collects data about each taxi trip. Para este cenário, partimos do princípio existem dois dispositivos separados, o envio de dados.For this scenario, we assume there are two separate devices sending data. A táxis tem um medidor que envia informações sobre cada jornada — a duração, distância e localizações de recolha e de redução.The taxi has a meter that sends information about each ride — the duration, distance, and pickup and dropoff locations. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre fares.A separate device accepts payments from customers and sends data about fares. A empresa de táxis pretende calcular a média dica por quilómetro condicionada por em tempo real, por ordem para detetar tendências.The taxi company wants to calculate the average tip per mile driven, in real time, in order to spot trends.

ArquiteturaArchitecture

A arquitetura é composta pelos seguintes componentes.The architecture consists of the following components.

Origens de dados.Data sources. Nesta arquitetura, existem duas origens de dados que geram fluxos de dados em tempo real.In this architecture, there are two data sources that generate data streams in real time. O primeiro fluxo contém informações de jornada, e o segundo contém informações de Europeia.The first stream contains ride information, and the second contains fare information. A arquitetura de referência inclui um gerador de dados simulados que lê a partir de um conjunto de ficheiros estáticos e envia os dados para os Hubs de eventos.The reference architecture includes a simulated data generator that reads from a set of static files and pushes the data to Event Hubs. Num aplicativo real, as origens de dados seria dispositivos instalados em cabs a táxis.In a real application, the data sources would be devices installed in the taxi cabs.

Hubs de Eventos do Azure.Azure Event Hubs. Os Hubs de eventos é um serviço de ingestão de eventos.Event Hubs is an event ingestion service. Esta arquitetura utiliza duas instâncias de hub de eventos, um para cada origem de dados.This architecture uses two event hub instances, one for each data source. Cada origem de dados envia um fluxo de dados para o hub de eventos associados.Each data source sends a stream of data to the associated event hub.

Azure Stream Analytics.Azure Stream Analytics. Stream Analytics é um motor de processamento de eventos.Stream Analytics is an event-processing engine. Uma tarefa do Stream Analytics lê os dados transmite a dois dos hubs de eventos e efetua o processamento de fluxo.A Stream Analytics job reads the data streams from the two event hubs and performs stream processing.

O cosmos DB.Cosmos DB. A saída da tarefa do Stream Analytics é uma série de registos, que são escritos como documentos JSON numa base de dados de documento do Cosmos DB.The output from the Stream Analytics job is a series of records, which are written as JSON documents to a Cosmos DB document database.

Microsoft Power BI.Microsoft Power BI. O Power BI é um conjunto de ferramentas de análise de negócio para analisar dados de informações empresariais.Power BI is a suite of business analytics tools to analyze data for business insights. Nesta arquitetura, ele carrega os dados do Cosmos DB.In this architecture, it loads the data from Cosmos DB. Isso permite que os utilizadores analisar o conjunto completo de dados históricos que tenham sido recolhidos.This allows users to analyze the complete set of historical data that's been collected. Também poderia transmitir os resultados diretamente a partir do Stream Analytics para o Power BI para uma visão em tempo real dos dados.You could also stream the results directly from Stream Analytics to Power BI for a real-time view of the data. Para obter mais informações, consulte transmissão em fluxo em tempo real no Power BI.For more information, see Real-time streaming in Power BI.

O Azure Monitor.Azure Monitor. O Azure Monitor recolhe métricas de desempenho sobre os serviços do Azure implementados na solução.Azure Monitor collects performance metrics about the Azure services deployed in the solution. Ao visualizar estes num dashboard, pode obter informações sobre o estado de funcionamento da solução.By visualizing these in a dashboard, you can get insights into the health of the solution.

Ingestão de dadosData ingestion

Para simular uma origem de dados, esta arquitetura de referência utiliza o dados de táxis da cidade de Nova Iorque conjunto de dados[1].To simulate a data source, this reference architecture uses the New York City Taxi Data dataset[1]. Este conjunto de dados contém dados sobre viagens de táxis de nova York durante um período de 4 anos (2010 – 2013).This dataset contains data about taxi trips in New York City over a 4-year period (2010 – 2013). Ele contém dois tipos de registo: Enfrente os dados e se comportarão de dados.It contains two types of record: Ride data and fare data. Dados de jornada incluem a duração da viagem, a distância de viagem e a localização de recolha e de redução.Ride data includes trip duration, trip distance, and pickup and dropoff location. Dados de Europeia incluem Europeia, o imposto e o tip quantidades.Fare data includes fare, tax, and tip amounts. Campos comuns em ambos os tipos de registo incluem o número de medallion, licença de acesso e ID do fornecedor.Common fields in both record types include medallion number, hack license, and vendor ID. Em conjunto estes três campos de identificam exclusivamente um táxi além de um driver.Together these three fields uniquely identify a taxi plus a driver. Os dados são armazenados no formato CSV.The data is stored in CSV format.

[1] Donovan, Brian; Work, Dan (2016): Dados de viagens de táxis da cidade de nova York (2010-2013).[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidade de Illinois em Urbana Champaign.University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8https://doi.org/10.13012/J8PN93H8

O gerador de dados é uma aplicação .NET Core que lê os registos e as envia para os Hubs de eventos do Azure.The data generator is a .NET Core application that reads the records and sends them to Azure Event Hubs. O gerador de envia os dados de jornada em dados Europeia formato JSON no formato CSV.The generator sends ride data in JSON format and fare data in CSV format.

Os Event Hubs utilizam partições para segmentar os dados.Event Hubs uses partitions to segment the data. As partições permitem um consumidor ler cada partição em paralelo.Partitions allow a consumer to read each partition in parallel. Quando envia dados para os Hubs de eventos, pode especificar explicitamente a chave de partição.When you send data to Event Hubs, you can specify the partition key explicitly. Caso contrário, os registos são atribuídos a partições em rodízio.Otherwise, records are assigned to partitions in round-robin fashion.

Neste cenário específico, é garantida dados e dados de Europeia devem acabar com o mesmo ID de partição para um cab de táxis determinado.In this particular scenario, ride data and fare data should end up with the same partition ID for a given taxi cab. Isto permite que o Stream Analytics aplicar um grau de paralelismo quando ele correlaciona os dois fluxos.This enables Stream Analytics to apply a degree of parallelism when it correlates the two streams. Um registo numa partição n de simultaneamente dados corresponderá um registo numa partição n dos dados Europeia.A record in partition n of the ride data will match a record in partition n of the fare data.

Diagrama de processamento com o Azure Stream Analytics e Hubs de eventos de fluxo

O gerador de dados, o modelo de dados comum para ambos os tipos de registo tem um PartitionKey propriedade que é a concatenação de Medallion, HackLicense, e VendorId.In the data generator, the common data model for both record types has a PartitionKey property which is the concatenation of Medallion, HackLicense, and VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Esta propriedade é utilizada para fornecer uma chave de partição explícita ao enviar para os Hubs de eventos:This property is used to provide an explicit partition key when sending to Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Processamento de fluxosStream processing

A tarefa de processamento de fluxo é definida usando uma consulta SQL com várias etapas distintas.The stream processing job is defined using a SQL query with several distinct steps. Os primeiros dois passos, basta selecionar registos de dois fluxos de entrada.The first two steps simply select records from the two input streams.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

O passo seguinte associa os dois fluxos de entrada para selecionar registos correspondentes de cada fluxo.The next step joins the two input streams to select matching records from each stream.

Step3 AS (
  SELECT
         tr.Medallion,
         tr.HackLicense,
         tr.VendorId,
         tr.PickupTime,
         tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.Medallion = tf.Medallion
     AND tr.HackLicense = tf.HackLicense
     AND tr.VendorId = tf.VendorId
     AND tr.PickupTime = tf.PickupTime
     AND tr.PartitionId = tf.PartitionId
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Esta consulta associa registos num conjunto de campos que identificam os registos correspondentes (Medallion, HackLicense, VendorId e PickupTime).This query joins records on a set of fields that uniquely identify matching records (Medallion, HackLicense, VendorId, and PickupTime). O JOIN instrução também inclui o ID de partição.The JOIN statement also includes the partition ID. Conforme mencionado, isso se beneficia do fato de que registos correspondentes tenham sempre o mesmo ID de partição neste cenário.As mentioned, this takes advantage of the fact that matching records always have the same partition ID in this scenario.

No Stream Analytics, as associações são temporal, registos de significado estão associados dentro de um determinado período de tempo.In Stream Analytics, joins are temporal, meaning records are joined within a particular window of time. Caso contrário, a tarefa poderá ter de aguardar indefinidamente que uma correspondência.Otherwise, the job might need to wait indefinitely for a match. O DATEDIFF função especifica até dois registos correspondentes podem ser separados em tempo para uma correspondência.The DATEDIFF function specifies how far two matching records can be separated in time for a match.

A última etapa na tarefa calcula a média dica por lá, agrupado por uma janela de salto de 5 minutos.The last step in the job computes the average tip per mile, grouped by a hopping window of 5 minutes.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics proporciona várias modos.Stream Analytics provides several windowing functions. Uma janela de salto que remonte há avança no tempo por um período fixo, neste caso 1 minuto para cada salto.A hopping window moves forward in time by a fixed period, in this case 1 minute per hop. O resultado é calcular uma média móvel nos últimos 5 minutos.The result is to calculate a moving average over the past 5 minutes.

Na arquitetura mostrada aqui, apenas os resultados da tarefa do Stream Analytics são guardados do Cosmos DB.In the architecture shown here, only the results of the Stream Analytics job are saved to Cosmos DB. Para um cenário de grandes volumes de dados, considere a utilização de também captura de Hubs de eventos para guardar os dados de eventos não processados para o armazenamento de Blobs do Azure.For a big data scenario, consider also using Event Hubs Capture to save the raw event data into Azure Blob storage. Manter os dados não processados, poderá executar consultas em lote em seus dados históricos momento posterior, para derivarem novas informações a partir dos dados.Keeping the raw data will allow you to run batch queries over your historical data at later time, in order to derive new insights from the data.

Considerações de escalabilidadeScalability considerations

Hubs de EventosEvent Hubs

A capacidade de débito dos Hubs de eventos é medida em unidades de débito.The throughput capacity of Event Hubs is measured in throughput units. Pode dimensionar automaticamente um hub de eventos, ativando ampliação automática, que dimensiona automaticamente as unidades de débito com base no tráfego, até um máximo configurado.You can autoscale an event hub by enabling auto-inflate, which automatically scales the throughput units based on traffic, up to a configured maximum.

Stream AnalyticsStream Analytics

Para o Stream Analytics, os recursos de computação alocados a um trabalho são medidos em unidades de transmissão em fluxo.For Stream Analytics, the computing resources allocated to a job are measured in Streaming Units. Tarefas do Stream Analytics Dimensionar melhor se a tarefa pode ser colocado em paralelo.Stream Analytics jobs scale best if the job can be parallelized. Dessa forma, o Stream Analytics pode distribuir o trabalho em vários nós de computação.That way, Stream Analytics can distribute the job across multiple compute nodes.

Para a entrada de Hubs de eventos, utilize o PARTITION BY palavra-chave para particionar a tarefa do Stream Analytics.For Event Hubs input, use the PARTITION BY keyword to partition the Stream Analytics job. Os dados serão subdivididos em subconjuntos com base em partições dos Hubs de eventos.The data will be divided into subsets based on the Event Hubs partitions.

Funções de janelas e associações temporais requerem SU adicional.Windowing functions and temporal joins require additional SU. Sempre que possível, utilize PARTITION BY para que cada partição é processada separadamente.When possible, use PARTITION BY so that each partition is processed separately. Para obter mais informações, consulte compreender e ajustar as unidades transmissão em fluxo.For more information, see Understand and adjust Streaming Units.

Se não for possível paralelizar o trabalho inteiro do Stream Analytics, experimente dividir o trabalho em várias etapas, começando com um ou mais passos paralelos.If it's not possible to parallelize the entire Stream Analytics job, try to break the job into multiple steps, starting with one or more parallel steps. Dessa forma, os primeiros passos podem ser executadas em paralelo.That way, the first steps can run in parallel. Por exemplo, nesta arquitetura de referência:For example, in this reference architecture:

  • Os passos 1 e 2 são simples SELECT instruções que selecionar registos dentro de uma única partição.Steps 1 and 2 are simple SELECT statements that select records within a single partition.
  • Passo 3 executa uma junção particionada em dois fluxos de entrada.Step 3 performs a partitioned join across two input streams. Essa etapa tira proveito do fato de que os registos correspondentes partilham a mesma chave de partição e então são garantidos para ter o mesmo ID de partição em cada fluxo de entrada.This step takes advantage of the fact that matching records share the same partition key, and so are guaranteed to have the same partition ID in each input stream.
  • Passo 4 agregados em todas as partições.Step 4 aggregates across all of the partitions. Este passo não pode ser colocado em paralelo.This step cannot be parallelized.

Utilizar o Stream Analytics diagrama de tarefas para ver quantas partições são atribuídas a cada passo da tarefa.Use the Stream Analytics job diagram to see how many partitions are assigned to each step in the job. O diagrama seguinte mostra o diagrama de tarefas para esta arquitetura de referência:The following diagram shows the job diagram for this reference architecture:

Diagrama de tarefas

BD do CosmosCosmos DB

Capacidade de débito para o Cosmos DB é medida em unidades de pedido (RU).Throughput capacity for Cosmos DB is measured in Request Units (RU). Para dimensionar um contentor do Cosmos DB últimos 10 000 RU, tem de especificar um chave de partição quando criar o contentor e incluir a chave de partição em todos os documentos.In order to scale a Cosmos DB container past 10,000 RU, you must specify a partition key when you create the container, and include the partition key in every document.

Nesta arquitetura de referência, os novos documentos são criados apenas uma vez por minuto (o salto que remonte há janela intervalo), para que os requisitos de débito são bastante baixos.In this reference architecture, new documents are created only once per minute (the hopping window interval), so the throughput requirements are quite low. Por esse motivo, não é necessário atribuir uma chave de partição neste cenário.For that reason, there's no need to assign a partition key in this scenario.

Considerações sobre a monitorizaçãoMonitoring considerations

Com qualquer solução de processamento de fluxos, é importante monitorizar o desempenho e estado de funcionamento do sistema.With any stream processing solution, it's important to monitor the performance and health of the system. O Azure Monitor recolhe registos de diagnóstico e métricas para os serviços do Azure utilizados na arquitetura.Azure Monitor collects metrics and diagnostics logs for the Azure services used in the architecture. Monitor do Azure baseia-se na plataforma do Azure e não requer qualquer código adicional na sua aplicação.Azure Monitor is built into the Azure platform and does not require any additional code in your application.

Qualquer um dos sinais de aviso seguintes indicam que deve aumentar horizontalmente o recurso do Azure relevante:Any of the following warning signals indicate that you should scale out the relevant Azure resource:

  • Os Hubs de eventos limita os pedidos ou próximo a quota de mensagens diária.Event Hubs throttles requests or is close to the daily message quota.
  • A tarefa do Stream Analytics utiliza consistentemente superior a 80% de alocado de transmissão em fluxo unidades (SU).The Stream Analytics job consistently uses more than 80% of allocated Streaming Units (SU).
  • O cosmos DB começa a limitar os pedidos.Cosmos DB begins to throttle requests.

A arquitetura de referência inclui um dashboard personalizado, o que é implementado para o portal do Azure.The reference architecture includes a custom dashboard, which is deployed to the Azure portal. Depois de implementar a arquitetura, pode ver o dashboard, abrindo o Portal do Azure e selecionando TaxiRidesDashboard da lista de dashboards.After you deploy the architecture, you can view the dashboard by opening the Azure Portal and selecting TaxiRidesDashboard from list of dashboards. Para obter mais informações sobre como criar e implementar dashboards personalizados no portal do Azure, consulte criar programaticamente Dashboards do Azure.For more information about creating and deploying custom dashboards in the Azure portal, see Programmatically create Azure Dashboards.

A imagem seguinte mostra o dashboard depois da tarefa do Stream Analytics for executada durante menos de uma hora.The following image shows the dashboard after the Stream Analytics job ran for about an hour.

Captura de ecrã do dashboard anda de táxis

O painel na parte inferior esquerda mostra que o consumo de SU para a tarefa do Stream Analytics climbs durante os primeiros 15 minutos e, em seguida, os níveis de.The panel on the lower left shows that the SU consumption for the Stream Analytics job climbs during the first 15 minutes and then levels off. Este é um padrão típico quando a tarefa atinge um estado estável.This is a typical pattern as the job reaches a steady state.

Tenha em atenção que os Hubs de eventos é limitação de pedidos, mostrados no painel à direita superior.Notice that Event Hubs is throttling requests, shown in the upper right panel. Um pedido de limitados ocasional não é um problema, porque o SDK do cliente dos Hubs de eventos repete automaticamente quando recebe um erro de limitação.An occasional throttled request is not a problem, because the Event Hubs client SDK automatically retries when it receives a throttling error. No entanto, se vir erros de limitação consistentes, significa que o hub de eventos tem de obter mais unidades de débito.However, if you see consistent throttling errors, it means the event hub needs more throughput units. O gráfico seguinte mostra um teste executado com os Event Hubs a ampliação automática recurso, que automaticamente aumenta horizontalmente as unidades de débito, conforme necessário.The following graph shows a test run using the Event Hubs auto-inflate feature, which automatically scales out the throughput units as needed.

Dimensionamento automático de captura de ecrã dos Hubs de eventos

Ampliação automática foi ativado nas sobre 06:35 marcar.Auto-inflate was enabled at about the 06:35 mark. Pode ver o p soltar na pedidos limitados, como os Hubs de eventos aumentados automaticamente até 3 unidades de débito.You can see the p drop in throttled requests, as Event Hubs automatically scaled up to 3 throughput units.

Curiosamente, isso tinha o efeito colateral de aumento da utilização de SU da tarefa do Stream Analytics.Interestingly, this had the side effect of increasing the SU utilization in the Stream Analytics job. Através da limitação, os Hubs de eventos foi, artificialmente, reduzindo a velocidade de ingestão para a tarefa do Stream Analytics.By throttling, Event Hubs was artificially reducing the ingestion rate for the Stream Analytics job. É comum que, na verdade, resolução de um estrangulamento de desempenho revela que outra.It's actually common that resolving one performance bottleneck reveals another. Neste caso, a alocação de SU adicional para a tarefa do Stream Analytics resolvido o problema.In this case, allocating additional SU for the Stream Analytics job resolved the issue.

Implementar a soluçãoDeploy the solution

Para a implementar e executar a implementação de referência, siga os passos a Leiame do GitHub.To the deploy and run the reference implementation, follow the steps in the GitHub readme.

Reveja o seguinte pode ser útil cenários de exemplo do Azure que demonstram a soluções específicas com algumas das mesmas tecnologias:You may wish to review the following Azure example scenarios that demonstrate specific solutions using some of the same technologies: