Share via


Agregar dados em um pipeline Processador de Dados da Internet das Coisas do Azure Versão Prévia

Importante

O recurso Pré-visualização de Operações do Azure IoT — habilitado pelo Azure Arc — está atualmente em VERSÃO PRÉVIA. Você não deve usar esse software em versão prévia em ambientes de produção.

Veja os Termos de Uso Complementares para Versões Prévias do Microsoft Azure para obter termos legais que se aplicam aos recursos do Azure que estão em versão beta, versão prévia ou que, de outra forma, ainda não foram lançados em disponibilidade geral.

O estágio de agregação é um estágio de pipeline opcional, configurável e intermediário que permite executar operações de envio em lote e redução de amostragem em streaming de dados do sensor em janelas de tempo definidas pelo usuário.

Use um estágio de agregação para acumular mensagens em uma janela definida e calcular valores de agregação de propriedades nas mensagens. O estágio emite os valores agregados como propriedades em uma única mensagem no final de cada janela de tempo.

  • Cada partição de pipeline realiza a agregação independentemente uma da outra.
  • A saída do estágio é uma única mensagem que contém todas as propriedades de agregação definidas.
  • O estágio descarta todas as outras propriedades. No entanto, você pode usar as funções Last, First ou Collect para preservar propriedades que, de outra forma, seriam descartadas pelo estágio durante a agregação.
  • Para que o estágio de agregação funcione, o estágio da fonte de dados no pipeline deve desserializar a mensagem de entrada.

Pré-requisitos

Para configurar e usar um estágio de pipeline agregado, você precisa de uma instância implantada do Processador de Dados da Internet das Coisas do Azure Versão Prévia.

Configurar o estágio

A configuração JSON do estágio agregado define os detalhes do estágio. Para criar a fase, você pode interagir com a interface do usuário baseada em formulários ou fornecer a configuração JSON na guia Avançado:

Campo Type Descrição Obrigatório Padrão Exemplo
Nome String Um nome a ser mostrado na interface do usuário do Processador de Dados. Sim - Calculate Aggregate
Descrição Cadeia de caracteres Uma descrição amigável do que o estágio de agregação faz. Não Aggregation over temperature
Janela de tempo A Duração que especifica o período durante o qual a agregação é executada. Sim - 10s
Propriedades > Função Enumeração A função de agregação a ser usada. Sim - Sum
Propriedades > InputPath1 Caminho O Caminho para a propriedade na mensagem de entrada à qual aplicar a função. Sim - .payload.temperature
Propriedades > OutputPath2 Caminho O Caminho para o local na mensagem de saída para colocar o resultado. Sim - .payload.temperature.average

Você pode definir várias configurações de Propriedades em um estágio de agregação. Por exemplo, calcule a soma da temperatura e calcule a média da pressão.

1Caminho de entrada:

  • O tipo de dados do valor da propriedade do caminho de entrada deve ser compatível com o tipo de função definida.
  • Você pode fornecer o mesmo caminho de entrada entre várias configurações de agregação para calcular várias funções na mesma propriedade do caminho de entrada. Verifique se os caminhos de saída são diferentes para evitar a substituição dos resultados.

2Caminho de saída:

  • Os caminhos de saída podem ser iguais ou diferentes do caminho de entrada. Use caminhos de saída diferentes se você estiver calculando várias agregações na mesma propriedade do caminho de entrada.
  • Configure caminhos de saída distintos para evitar a substituição de valores agregados.

Windows

A janela é o intervalo de tempo sobre o qual o estágio acumula mensagens. No final da janela, o estágio aplica a função configurada às propriedades da mensagem. Em seguida, o estágio emite uma única mensagem.

Atualmente, o estágio só dá suporte a janelas em cascata.

As janelas em cascata são uma série de intervalos de tempo de tamanho fixo, não sobrepostos e consecutivos. A janela começa e termina em pontos fixos no tempo:

Diagram that shows 10 second tumbling windows in the aggregate stage.

O tamanho da janela define o intervalo de tempo sobre o qual o estágio acumula as mensagens. Você define o tamanho da janela usando o padrão comum Duração.

Funções

O estágio de agregação dá suporte às seguintes funções para calcular valores agregados sobre a propriedade de mensagem definida no caminho de entrada:

Função Descrição
Somar Calcula a soma dos valores da propriedade nas mensagens de entrada.
Média Calcula a média dos valores da propriedade nas mensagens de entrada.
Contagem Conta o número de vezes que a propriedade aparece na janela.
Min Calcula o valor mínimo dos valores da propriedade nas mensagens de entrada.
Max Calcula o valor máximo dos valores da propriedade nas mensagens de entrada.
Último Retorna o valor mais recente dos valores da propriedade nas mensagens de entrada.
First Retorna o primeiro valor dos valores da propriedade nas mensagens de entrada.
Coletar Retorna todos os valores da propriedade nas mensagens de entrada.

A tabela a seguir lista os tipos de dados de mensagem compatíveis com cada função:

Função Inteiro Flutuante String Datetime Array Objeto Binário
Soma
Média
Contagem
Min
Max
Último
First
Coletar

Exemplo de configuração

O exemplo JSON a seguir mostra uma configuração de estágio de agregação completa:

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

A configuração define um estágio de agregação que calcula, em uma janela de dez segundos:

  • Temperatura média
  • Soma da temperatura
  • Soma da pressão

Exemplo

Este exemplo inclui duas mensagens de entrada de exemplo e uma mensagem de saída de exemplo gerada usando a configuração anterior:

Mensagem de entrada 1:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

Mensagem de entrada 2:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

Mensagem de saída:

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}