Processamento de fluxos com o Azure Stream Analytics

Cosmos DB
Hubs de Eventos
Monitor
Stream Analytics

Esta arquitetura de referência mostra um gasoduto de processamento de fluxo de ponta a ponta. O gasoduto ingere dados de duas fontes, correlaciona os registos nos dois fluxos, e calcula uma média de rolamento através de uma janela de tempo. Os resultados são armazenados para análise posterior.

GitHub logotipo Uma implementação de referência para esta arquitetura está disponível em GitHub.

Arquitetura de referência para criar um pipeline de processamento de fluxo com Azure Stream Analytics

Cenário: Uma empresa de táxis recolhe dados sobre cada viagem de táxi. Para este cenário, assumimos que existem dois dispositivos separados que enviam dados. O táxi tem um medidor que envia informações sobre cada passeio - a duração, distância e locais de recolha e entrega. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre tarifas. A empresa de táxis quer calcular a gorjeta média por milha percorrido, em tempo real, de modo a detetar tendências.

Arquitetura

A arquitetura é composta pelos seguintes componentes.

Origens de dados. Nesta arquitetura, existem duas fontes de dados que geram fluxos de dados em tempo real. O primeiro fluxo contém informações sobre o passeio, e o segundo contém informações sobre tarifas. A arquitetura de referência inclui um gerador de dados simulado que lê a partir de um conjunto de ficheiros estáticos e empurra os dados para os Centros de Eventos. Numa aplicação real, as fontes de dados seriam dispositivos instalados nos táxis.

Azure Event Hubs. O Event Hubs é um serviço de ingestão de eventos. Esta arquitetura usa dois casos de centro de eventos, um para cada fonte de dados. Cada fonte de dados envia um fluxo de dados para o centro de eventos associado.

Azure Stream Analytics. Stream Analytics é um motor de processamento de eventos. Um trabalho do Stream Analytics lê os fluxos de dados dos dois centros de eventos e executa o processamento de fluxo.

Cosmos DB. A saída do trabalho stream Analytics é uma série de registos, que são escritos como documentos JSON para uma base de dados de documentos da Cosmos DB.

Microsoft Power BI. Power BI é um conjunto de ferramentas de análise de negócios para analisar dados para insights de negócio. Nesta arquitetura, carrega os dados da Cosmos DB. Isto permite que os utilizadores analisem o conjunto completo de dados históricos que foram recolhidos. Também pode transmitir os resultados diretamente do Stream Analytics para Power BI para uma visão em tempo real dos dados. Para obter mais informações, veja Transmissão em tempo real no Power BI.

Azure Monitor. O Azure Monitor recolhe métricas de desempenho sobre os serviços Azure implantados na solução. Ao visualizar estes num dashboard, você pode obter insights sobre a saúde da solução.

Ingestão de dados

Para simular uma fonte de dados, esta arquitetura de referência utiliza o conjunto de dados de dados de táxi da cidade de Nova Iorque[1]. Este conjunto de dados contém dados sobre viagens de táxi em Nova Iorque durante um período de quatro anos (2010-2013). Contém dois tipos de registo: dados de passeio e dados de tarifas. Os dados do passeio incluem a duração da viagem, a distância da viagem e o local de recolha e entrega. Os dados das tarifas incluem tarifas, impostos e valores de gorjeta. Os campos comuns em ambos os tipos de registo incluem o número de medalhão, a licença de hack e a identificação do fornecedor. Juntos, estes três campos identificam exclusivamente um táxi mais um motorista. Os dados são armazenados em formato CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidade de Illinois em Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

O gerador de dados é uma aplicação .NET Core que lê os registos e os envia para os Azure Event Hubs. O gerador envia dados de viagem no formato JSON e dados de tarifas em formato CSV.

O Event Hubs utiliza divisórias para segmentar os dados. As divisórias permitem ao consumidor ler cada divisória em paralelo. Quando envia dados para Os Centros de Eventos, pode especificar explicitamente a chave de partição. Caso contrário, os registos são atribuídos a divisórias em forma de rodapé.

Neste cenário específico, os dados do passeio e os dados das tarifas devem acabar com a mesma identificação de partição para um determinado táxi. Isto permite que o Stream Analytics aplique um certo paralelismo quando correlaciona os dois fluxos. Um recorde na partição n dos dados do passeio corresponderá a um recorde na partição n dos dados da tarifa.

Diagrama de processamento de fluxo com Azure Stream Analytics e Centros de Eventos

No gerador de dados, o modelo comum de dados para ambos os tipos de registos tem uma PartitionKey propriedade que é a concatenação MedallionHackLicense de, e VendorId .

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Esta propriedade é usada para fornecer uma chave de partição explícita ao enviar para Os Centros de Eventos:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Processamento de fluxos

O trabalho de processamento de fluxo é definido usando uma consulta SQL com vários passos distintos. Os dois primeiros passos simplesmente selecionam registos dos dois fluxos de entrada.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

O passo seguinte junta-se aos dois fluxos de entrada para selecionar registos correspondentes de cada fluxo.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Esta consulta junta-se a registos num conjunto de campos que identificam exclusivamente registos correspondentes PartitionIdPickupTime (e).

Nota

Queremos que os TaxiRideTaxiFare e riachos se juntem à combinação única MedallionHackLicense de, e VendorIdPickupTime . Neste caso, as PartitionId coberturas Medallion e os HackLicenseVendorId campos, mas isso não deve ser tomado como o caso em geral.

No Stream Analytics, as juntas são temporais,o que significa que os registos são unidos dentro de uma determinada janela de tempo. Caso contrário, o trabalho pode ter de esperar indefinidamente por uma correspondência. A função DATEDIFF especifica até que ponto dois registos correspondentes podem ser separados a tempo de uma partida.

O último passo no trabalho calcula a inclinação média por milha, agrupada por uma janela de 5 minutos.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

O Stream Analytics fornece várias funções de janela. Uma janela de salto avança no tempo por um período fixo, neste caso 1 minuto por salto. O resultado é calcular uma média móvel nos últimos 5 minutos.

Na arquitetura aqui mostrada, apenas os resultados do trabalho stream Analytics são guardados para Cosmos DB. Para um grande cenário de dados, considere também usar o Event Hubs Capture para guardar os dados brutos do evento no armazenamento de Azure Blob. A manutenção dos dados brutos permitir-lhe-á executar consultas sobre os seus dados históricos mais tarde, de forma a obter novos conhecimentos a partir dos dados.

Considerações de escalabilidade

Hubs de Eventos

A capacidade de produção dos Centros de Eventos é medida em unidades de produção. Pode autoescalar um hub de eventos ativando a auto-insuflado,que escala automaticamente as unidades de produção com base no tráfego, até um máximo configurado.

Stream Analytics

Para o Stream Analytics, os recursos de computação atribuídos a um trabalho são medidos em Unidades de Streaming. Os trabalhos stream Analytics são melhores se o trabalho puder ser paralelo. Dessa forma, o Stream Analytics pode distribuir o trabalho por vários nós de computação.

Para a entrada de Centros de Eventos, utilize a PARTITION BY palavra-chave para dividir o trabalho Stream Analytics. Os dados serão divididos em subconjuntos com base nas divisórias Do Event Hubs.

As funções de janela e as juntas temporais requerem SU adicional. Quando possível, utilize PARTITION BY de modo a que cada divisória seja processada separadamente. Para obter mais informações, consulte Compreender e ajustar Unidades de Streaming.

Se não for possível paralelizar todo o trabalho stream Analytics, tente dividir o trabalho em vários passos, começando com um ou mais passos paralelos. Assim, os primeiros passos podem correr em paralelo. Por exemplo, nesta arquitetura de referência:

  • Os passos 1 e 2 são declarações simples SELECT que selecionam registos dentro de uma única partição.
  • O passo 3 executa uma junção dividida através de dois fluxos de entrada. Este passo tira partido do facto de os registos correspondentes partilharem a mesma chave de partição, pelo que é garantido ter o mesmo ID de partição em cada fluxo de entrada.
  • Passo 4 agrega-se em todas as divisórias. Este passo não pode ser paralelo.

Utilize o diagrama de trabalho stream Analytics para ver quantas divisórias são atribuídas a cada passo no trabalho. O diagrama a seguir mostra o diagrama de trabalho desta arquitetura de referência:

Diagrama de trabalho

Cosmos DB

A capacidade de produção para Cosmos DB é medida em Unidades de Pedido (RU). Para escalar um recipiente Cosmos DB passado 10.000 RU, você deve especificar uma chave de partição quando você criar o recipiente, e incluir a chave de partição em cada documento.

Nesta arquitetura de referência, novos documentos são criados apenas uma vez por minuto (o intervalo da janela de salto), pelo que os requisitos de produção são bastante baixos. Por isso, não há necessidade de atribuir uma chave de partição neste cenário.

Considerações de monitorização

Com qualquer solução de processamento de fluxo, é importante monitorizar o desempenho e a saúde do sistema. O Azure Monitor recolhe métricas e registos de diagnóstico para os serviços Azure utilizados na arquitetura. O Azure Monitor está integrado na plataforma Azure e não requer nenhum código adicional na sua aplicação.

Qualquer um dos seguintes sinais de aviso indica que deve reduzir o recurso Azure relevante:

  • O Event Hubs acelera os pedidos ou está próximo da quota de mensagem diária.
  • O trabalho stream Analytics utiliza consistentemente mais de 80% das Unidades de Streaming (SU) atribuídas.
  • Cosmos DB começa a acelerar os pedidos.

A arquitetura de referência inclui um dashboard personalizado, que é implantado no portal Azure. Depois de implementar a arquitetura, pode ver o painel de instrumentos abrindo o portal Azure e selecionando a partir da lista de dashboards. Para obter mais informações sobre a criação e implementação de dashboards personalizados no portal Azure, consulte Programmaticamente criar Dashboards Azure.

A imagem que se segue mostra o painel de instrumentos depois do trabalho do Stream Analytics ter funcionado durante cerca de uma hora.

Screenshot do painel de táxi rides

O painel na parte inferior esquerda mostra que o consumo de SU para o trabalho stream Analytics sobe durante os primeiros 15 minutos e, em seguida, níveis fora. Este é um padrão típico à medida que o trabalho atinge um estado estável.

Note que os Centros de Eventos estão a estrangular os pedidos, mostrados no painel superior direito. Um pedido ocasional de aceleração não é um problema, porque o cliente do Event Hubs SDK automaticamente recauchutado quando recebe um erro de estrangulamento. No entanto, se vir erros de estrangulamento consistentes, significa que o centro de eventos precisa de mais unidades de produção. O gráfico seguinte mostra um teste executado utilizando a função de insuflação automática do Event Hubs, que escala automaticamente as unidades de produção conforme necessário.

Screenshot do evento Hubs autoscaling

A auto-insuflado foi ativada por volta das 06:35. Pode ver a queda p em pedidos acelerados, uma vez que os Event Hubs aumentaram automaticamente até 3 unidades de produção.

Curiosamente, isto teve o efeito colateral de aumentar a utilização de SU no trabalho stream Analytics. Ao acelerar, o Event Hubs estava a reduzir artificialmente a taxa de ingestão para o trabalho de Stream Analytics. É comum que a resolução de um estrangulamento de desempenho revele outra. Neste caso, a atribuição de SU adicional para o trabalho stream Analytics resolveu a questão.

Considerações de custos

Utilize a calculadora de preços do Azure para prever os custos. Aqui estão algumas considerações para os serviços utilizados nesta arquitetura de referência.

Azure Stream Analytics

O Azure Stream Analytics tem o preço do número de unidades de streaming ($0,11/hora) necessárias para processar os dados no serviço.

O Stream Analytics pode ser caro se não estiver a processar os dados em tempo real ou em pequenas quantidades de dados. Para esses casos de utilização, considere usar as Funções Azure ou aplicações lógicas para transferir dados do Azure Event Hubs para uma loja de dados.

Azure Event Hubs e Azure Cosmos DB

Para considerações de custos sobre Azure Event Hubs e Cosmos DB, consulte considerações de custo ver o processamento do Stream com a arquitetura de referência Azure Databricks.

Implementar a solução

Para implementar e executar a implementação de referência, siga os passos na leitura GitHub.

Considerações de DevOps

  • Criar grupos de recursos separados para ambientes de produção, desenvolvimento e teste. A utilização de grupos de recursos separados torna mais fácil gerir as implementações, eliminar as implementações de teste e atribuir direitos de acesso.

  • Utilize o modelo do Gestor de Recursos Azure para implantar os recursos Azure seguindo a infraestrutura como Processo código (IaC). Com modelos, automatizar implementações utilizando serviços Azure DevOps,ou outras soluções ci/CD é mais fácil.

  • Coloque cada carga de trabalho num modelo de implantação separado e guarde os recursos em sistemas de controlo de fontes. Pode implementar os modelos em conjunto ou individualmente como parte de um processo de CI/CD, facilitando o processo de automatização.

    Nesta arquitetura, os Azure Event Hubs, Log Analytics e Cosmos DB são identificados como uma única carga de trabalho. Estes recursos estão incluídos num único modelo ARM.

  • Considere encenar as suas cargas de trabalho. Desloque-se para várias fases e efetue verificações de validação em cada fase antes de passar para a fase seguinte. Desta forma, pode impulsionar as atualizações para os seus ambientes de produção de forma altamente controlada e minimizar problemas de implantação imprevistos.

  • Considere utilizar o Azure Monitor para analisar o desempenho do seu pipeline de processamento de fluxo. Para obter mais informações, consulte Monitoring Azure Databricks.

Para mais informações, consulte a secção DevOps no Quadro Microsoft Azure Well-Architected.

Pode desejar rever os seguintes cenários de exemplo Azure que demonstrem soluções específicas utilizando algumas das mesmas tecnologias: