Usar a paralelização de consultas no Azure Stream Analytics

Este artigo mostra como tirar proveito da paralelização no Azure Stream Analytics. Aprenda a dimensionar trabalhos do Stream Analytics configurando partições de entrada e ajustando a definição da consulta de análise.

Como pré-requisito, talvez você queira conhecer a noção de unidade de streaming descrita em Compreender e ajustar unidades de streaming.

Quais são as partes de um trabalho do Stream Analytics?

Uma definição de trabalho do Stream Analytics inclui pelo menos uma entrada, uma consulta e uma saída de streaming. As entradas são de onde o trabalho lê o fluxo de dados. A consulta é usada para transformar o fluxo de entrada de dados e a saída é para onde são enviados os resultados do trabalho.

Partições em entradas e saídas

Particionamento permite que você divida dados em subconjuntos com base em uma chave de partição. Se a sua entrada (por exemplo, Hubs de Eventos) for particionada por uma chave, recomendamos que você especifique a chave de partição ao adicionar uma entrada ao trabalho do Stream Analytics. O dimensionamento de um trabalho do Stream Analytics utiliza partições na entrada e saída. Um trabalho do Stream Analytics pode consumir e gravar diferentes partições em paralelo, o que aumenta a taxa de transferência.

Entradas

Todas as entradas de streaming do Azure Stream Analytics podem aproveitar o particionamento: Hubs de Eventos, Hub IoT, armazenamento Blobs, Azure Data Lake Storage Gen2.

Observação

Para o nível de compatibilidade 1.2 e superior, a chave de partição deve ser definida como uma propriedade de entrada, sem a necessidade da palavra-chave PARTITION BY na consulta. Para o nível de compatibilidade 1.1 e inferior, a chave de partição precisa ser definida com a palavra-chave PARTITION BY na consulta.

outputs

Quando você trabalha com o Stream Analytics, você pode tirar proveito do particionamento nas saídas:

  • Armazenamento do Azure Data Lake
  • Funções do Azure
  • tabela do Azure
  • Armazenamento de Blob (é possível definir a chave de partição explicitamente)
  • Azure Cosmos DB (é preciso definir explicitamente a chave de partição)
  • Hubs de Eventos (é preciso definir a chave de partição explicitamente)
  • Hub IoT (é preciso definir a chave de partição explicitamente)
  • Barramento de Serviço
  • SQL e Azure Synapse Analytics com o particionamento opcional: confira mais informações na página Saída do Banco de Dados SQL do Azure.

O Power BI não dá suporte ao particionamento. No entanto, ainda é possível particionar a entrada conforme descrito nesta seção.

Para obter mais informações sobre partições, consulte os seguintes artigos:

Consulta

Para que um trabalho seja paralelo, as chaves de partição precisam estar alinhadas entre todas as entradas, todas as etapas da lógica de consulta e todas as saídas. O particionamento lógico de consulta é determinado pelas chaves usadas para junções e agregações (GROUP BY). Esse último requisito poderá ser ignorado se a lógica de consulta não for chaveada (projeção, filtros, junções referenciais...).

  • Se uma entrada e uma saída forem particionadas por WarehouseId e a consulta for agrupada por ProductId sem WarehouseId, o trabalho não será paralelo.
  • Se duas entradas a serem unidas forem particionadas por chaves de partição diferentes (WarehouseId e ProductId), o trabalho não será paralelo.
  • Se dois ou mais fluxos de dados independentes estiverem contidos em um único trabalho, cada um com sua própria chave de partição, o trabalho não será paralelo.

Somente quando todas as entradas, saídas e etapas de consulta estiverem usando a mesma chave, o trabalho será paralelo.

Trabalhos embaraçosamente paralelos

Um trabalho embaraçosamente paralelo é o cenário mais escalonável no Azure Stream Analytics. Ele conecta uma partição da entrada para uma instância da consulta a uma partição da saída. Este paralelismo tem os seguintes requisitos:

  • Se sua lógica de consulta for dependente da mesma chave que está sendo processada pela mesma instância de consulta, você deverá garantir que os eventos sejam encaminhados para a mesma partição da entrada. Para Hubs de Eventos ou Hub IoT, isso significa que os dados do evento devem ter o valor PartitionKey definido. Como alternativa, você pode usar os remetentes particionados. Para armazenamento de Blobs, isso significa que os eventos são enviados à mesma pasta de partição. Um exemplo seria uma instância de consulta que agrega dados por userID em que o hub de eventos de entrada é particionado usando userID como chave de partição. Contudo, se a lógica de consulta não exigir que a mesma chave seja processada pela mesma instância de consulta, você poderá ignorar esse requisito. Um exemplo disso seria uma consulta simples de seleção/projeto/filtro.

  • A próxima etapa é fazer com que sua consulta seja particionada. Para tarefas com nível de compatibilidade 1.2 ou superior (recomendado), a coluna personalizada pode ser especificada como Chave de Partição nas configurações de entrada e o trabalho será paralelo automaticamente. Trabalhos com nível de compatibilidade 1.0 ou 1.1 exigem que você use PARTITION BY PartitionId em todas as etapas da sua consulta. Várias etapas são permitidas, mas todas elas devem ser particionadas pela mesma chave.

  • A maioria das saídas com suporte no Stream Analytics pode utilizar o particionamento. Se você usar um tipo de saída que não dá suporte ao particionamento, seu trabalho não será embaraçosamente paralelo. Para saída dos Hubs de Eventos, verifique se a Coluna de chave de partição está definida como a mesma chave de partição usada na consulta. Para obter mais informações, consulte a seção de saída.

  • O número de partições de entrada deve ser igual ao número de partições de saída. A saída do Armazenamento de Blob pode dar suporte a partições, e herda o esquema de particionamento da consulta de upstream. Quando uma chave de partição do Armazenamento de Blob for especificada, os dados serão particionados por partição de entrada, portanto, o resultado ainda será totalmente paralelo. Exemplos de valores de partição que permitem um trabalho totalmente paralelo:

    • Oito partições de entrada de Hubs de Eventos e oito partições de saída de Hubs de Eventos
    • Oito partições de entrada de Hubs de Eventos e Saída de armazenamento de Blobs
    • Oito partições de entrada do hub de eventos e a saída do armazenamento de blob particionadas por um campo personalizado com cardinalidade aleatória
    • Oito partições de entrada de armazenamento de Blobs e Saída de armazenamento de Blobs
    • Oito partições de entrada de armazenamento de Blobs e oito partições de saída de Hubs de Eventos

As seções a seguir discutem alguns cenários de exemplo que são embaraçosamente paralelos.

Consulta simples

  • Entrada: um hub de eventos com oito partições
  • Saída: um hub de eventos com oito partições ("Coluna de chave de partição" deve ser definido para usar PartitionId)

Consulta:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Essa consulta é um filtro simples. Portanto, nós não precisamos se preocupar sobre particionamento da entrada que está sendo enviada para o hub de eventos. Observe que os trabalhos com nível de compatibilidade anterior a 1.2 devem incluir a cláusula PARTITION BY PartitionId, portanto ele atende ao requisito nº 2 de anteriormente. Para a saída, é necessário configurar a saída de Hubs de Eventos no trabalho para ter a chave de partição definida como PartitionId. Uma última verificação é certificar-se de que o número de partições de entrada é igual ao número de partições de saída.

Consulta com chave de agrupamento

  • Entrada: Hub de eventos com oito partições
  • Saída: Armazenamento de blob

Consulta:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Esta consulta tem uma chave de agrupamento. Portanto, os eventos agrupados devem ser enviados para a mesma partição dos Hubs de Eventos. Como neste exemplo agrupamos por TollBoothID, devemos nos certificar de que TollBoothID seja usado como chave de partição quando os eventos forem enviados aos Hubs de Eventos do Azure. Em seguida, no Azure Stream Analytics, você pode usar PARTITION BY PartitionId para herdar esse esquema de partição e habilitar a paralelização completa. Como a saída é o armazenamento de Blobs, não precisamos nos preocupar sobre como configurar um valor de chave de partição, de acordo com o requisito #4.

Exemplo de cenários que não são* perfeitamente paralelo

Na seção anterior, o artigo abordou alguns cenários perfeitamente paralelos. Nesta seção, você aprenderá sobre cenários que não atendem a todos os requisitos para serem perfeitamente paralelos.

Contagem de partições incompatível

  • Entrada: um hub de eventos com oito partições
  • Resultado: um hub de eventos com 32 partições

Se a contagem de partição de entrada não corresponder à contagem de partição de saída, a topologia não será embaraçosamente paralela, independentemente da consulta. No entanto, ainda podemos obter algum nível de paralelização.

Consultar usando a saída não particionada

  • Entrada: um hub de eventos com oito partições
  • Saída: Power BI

Atualmente, a saída do Power BI não suporta particionamento. Portanto, esse cenário não é embaraçosamente paralelo.

Consulta de várias etapas com diferentes valores de PARTITION BY

  • Entrada: Hub de eventos com oito partições
  • Saída: Hub de eventos com oito partições
  • Nível de compatibilidade: 1.0 ou 1.1

Consulta:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Como você pode ver, a segunda etapa usa TollBoothId como a chave de particionamento. Esta etapa não é igual à primeira etapa e, portanto, exige que façamos um embaralhamento.

Consulta de várias etapas com diferentes valores de PARTITION BY

  • Entrada: hub de eventos com oito partições ("Coluna de chave da partição" não definida, o padrão é "PartitionId")
  • Saída: O hub de eventos com oito partições ("Coluna de chave de partição" deve ser definida para usar "TollBoothId")
  • Nível de compatibilidade - 1.2 ou superior

Consulta:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

o nível de compatibilidade 1.2 ou superior permite a execução de consulta paralela por padrão. Por exemplo, a consulta da seção anterior será particionada, contanto que a coluna “TollBoothId” seja definida como a Chave de Partição de entrada. A cláusula PARTITION BY PartitionId não é necessária.

Calcule o máximo de unidades de um trabalho de streaming

O número total de unidades de streaming que pode ser usado por um trabalho de Análise de fluxo depende do número de etapas da consulta definida para o trabalho e o número de partições para cada etapa.

Etapas de uma consulta

Uma consulta pode ter uma ou mais etapas. Cada etapa é uma subconsulta definida usando a palavra-chave WITH. A única consulta que está fora da palavra-chave WITH também é contada como uma etapa. Por exemplo, a instrução SELECT na consulta a seguir:

Consulta:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Esta consulta tem duas etapas.

Observação

Esta consulta é discutida em mais detalhes mais adiante neste artigo.

Uma etapa de partição

Particionamento de uma etapa exige as seguintes condições:

  • A fonte de entrada deve ser particionada.
  • A instrução SELECT da consulta deve ser lida de uma origem de entrada particionada.
  • A consulta dentro da etapa deve ter a palavra-chave PARTITION BY.

Quando uma consulta for particionada, os eventos de entrada serão processados e agregados em diferentes grupos de partição e saídas de eventos são geradas para cada um dos grupos. Se você quiser uma agregação combinada, deverá criar uma segunda etapa não particionada para agregar.

Calcule o máximo de unidades de um trabalho de streaming

Todas as etapas não particionadas juntas podem ser escaladas verticalmente até uma unidade de streaming (SU V2s) para um trabalho do Stream Analytics. Além disso, você pode adicionar uma SU V2 para cada partição em uma etapa particionada. Você pode ver alguns exemplos na tabela a seguir.

Consulta Máxima quantidade de SUs para o trabalho
  • A consulta contém uma única etapa.
  • A etapa não é particionada.
1 SU V2
  • O fluxo de dados de entrada é particionado por 16.
  • A consulta contém uma única etapa.
  • A etapa é particionada.
16 SUs V2 (1 * 16 partições)
  • A consulta contém duas etapas.
  • Nenhuma das etapas é particionada.
1 SU V2
  • O fluxo de dados de entrada é particionado por 3.
  • A consulta contém duas etapas. A etapa de entrada é particionada e a segunda etapa não é.
  • A instrução SELECT lê da entrada particionada.
4 SU V2s (3 para etapas particionadas + 1 para etapas não particionadas

Exemplos de escala

A consulta a seguir calcula o número de carros passando por um pedágio que tem três pedágios dentro de uma janela de três minutos. Essa consulta pode ser escalada verticalmente para uma SU V2.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Para usar mais unidades de streaming para a consulta, tanto o fluxo de dados de entrada quanto a consulta devem ser particionados. Com a partição do fluxo de dados definida como 3, a seguinte consulta modificada pode ser escalada verticalmente para até 3 SUs V2:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Quando uma consulta é particionada, os eventos de entrada são processados e agregados em grupos de partições separados. Eventos de saída também são gerados para cada um dos grupos. O particionamento pode causar alguns resultados inesperados quando o campo GROUP BY não for a chave da partição no fluxo de dados de entrada. Por exemplo, o campo TollBoothId na consulta de exemplo anterior não é a chave de partição de Input1. O resultado é que os dados do Pedágio #1 podem ser distribuídos em várias partições.

Cada uma das partições Entrada1 serão processadas separadamente pelo Stream Analytics. Como resultado, vários registros da contagem de carros para o mesmo pedágio na mesma janela em cascata serão criados. Se a chave de partição de entrada não puder ser alterada, esse problema pode ser resolvido adicionando uma etapa não particionada para agregar valores entre partições, como no exemplo a seguir:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Essa consulta pode ser escalada para 4 SU V2s.

Observação

Se você estiver unindo dois fluxos, certifique-se de eles sejam particionados por chave de partição da coluna que você usa para criar as junções. Além disso, certifique-se de que você tem o mesmo número de partições em ambos os fluxos.

Alcançar maior taxa de transferência em escala

Um trabalho embaraçosamente paralelo é necessário, mas não é suficiente para sustentar uma maior taxa de transferência em escala. Cada sistema de armazenamento e a saída do Stream Analytics correspondente têm variações sobre como obter a melhor taxa de transferência de gravação possível. Como em qualquer cenário em escala, há alguns desafios que podem ser resolvidos com o uso das configurações corretas. Esta seção discute as configurações de algumas saídas comuns e fornece amostras para sustentar taxas de ingestão de eventos de 1 mil, 5 mil e 10 mil por segundo.

As observações a seguir usam um trabalho do Stream Analytics com consulta sem estado (passagem), um UDF JavaScript básico que grava em Hubs de Eventos, SQL do Azure ou Azure Cosmos DB.

Hubs de Eventos

Taxa de ingestão (eventos por segundo) Unidades de streaming Recursos de Saída
1 mil 1/3 2 TU
5 mil 1 6 TU
10.000 2 10 TU

A solução dos Hubs de Eventos é dimensionada linearmente em termos de SU (unidades de streaming) e taxa de transferência, fazendo dela a maneira mais eficiente e de alto desempenho de analisar e transmitir dados do Stream Analytics. Os trabalhos podem ser escalados verticalmente para até 66 SUs V2, o que significa, aproximadamente, o processamento de até 400 MB/s ou 38 trilhões de eventos por dia.

SQL do Azure

Taxa de ingestão (eventos por segundo) Unidades de streaming Recursos de Saída
1 mil 2/3 S3
5 mil 3 P4
10.000 6 P6

O SQL do Azure dá suporte à gravação em paralelo, chamada Herdar Particionamento, mas não é habilitado por padrão. No entanto, a habilitação do Particionamento Herdado, juntamente com uma consulta totalmente paralela, pode não ser suficiente para obter taxas de transferência mais altas. As taxas de transferência de gravação do SQL dependem significativamente da configuração do banco de dados e do esquema de tabela. O artigo Desempenho de Saída do SQL tem mais detalhes sobre os parâmetros que podem maximizar a taxa de transferência de gravação. Conforme observado no artigo Saída do Azure Stream Analytics para o Banco de Dados SQL do Azure, essa solução não é dimensionada linearmente como um pipeline totalmente paralelo além de 8 partições e pode precisar de reparticionamento antes da saída do SQL (consulte INTO). Os SKUs Premium são necessários para sustentar altas taxas de E/S juntamente com a sobrecarga dos backups de log de transações ocorrendo a cada poucos minutos.

Azure Cosmos DB

Taxa de ingestão (eventos por segundo) Unidades de streaming Recursos de Saída
1 mil 2/3 RU de 20 K
5 mil 4 RU de 60 K
10.000 8 RU de 120 K

A saída do Azure Cosmos DB do Stream Analytics foi atualizada para usar a integração nativa no nível de compatibilidade 1.2. O nível de compatibilidade 1.2 permite uma taxa de transferência significativamente maior e reduz o consumo de RU em comparação com o 1.1, que é o nível de compatibilidade padrão para novos trabalhos. A solução usa contêineres do Azure Cosmos DB particionados em /deviceId e o restante da solução é configurado de maneira idêntica.

Todos os exemplos de transmissão em escala do Azure usam Hubs de Eventos como entrada que é alimentado por clientes de teste de simulação de carga. Cada evento de entrada é um documento JSON de 1 KB, que converte facilmente as taxas de ingestão configuradas em taxas de transferência (1 MB/s, 5 MB/s e 10 MB/s). Os eventos simulam um dispositivo de IoT enviando os seguintes dados JSON (em um formato abreviado) para até 1.000 dispositivos:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Observação

As configurações estão sujeitas a alteração devido aos vários componentes usados na solução. Para obter uma estimativa mais precisa, personalize os exemplos para se ajustarem ao seu cenário.

Identificar gargalos

Use o painel Métricas em seu trabalho do Azure Stream Analytics para identificar gargalos em seu pipeline. Examine Eventos de Entrada/Saída para taxa de transferência e "Atraso de Marca-d'água" ou Eventos Acumulados para ver se o trabalho está acompanhando a taxa de entrada. Para as métricas dos Hubs de Eventos, procure Solicitações Limitadas e ajuste as Unidades de Limite adequadamente. Para métricas do Azure Cosmos DB, examine Máximo de RU/s consumidas por intervalo de chaves de partição em Taxa de Transferência para verificar se os intervalos de chave de partição foram consumidos uniformemente. Para o BD SQL do Azure, monitore a E/S de Log e CPU.

Obter ajuda

Para obter mais assistência, confira nossa página de Perguntas e respostas do Microsoft do Azure Stream Analytics.

Próximas etapas