Aggregate data in an Azure IoT Data Processor Preview pipeline

Important

Azure IoT Operations Preview – enabled by Azure Arc is currently in PREVIEW. You shouldn't use this preview software in production environments.

See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.

The aggregate stage is an optional, configurable, intermediate pipeline stage that lets you run down-sampling and batching operations on streaming sensor data over user-defined time windows.

Use an aggregate stage to accumulate messages over a defined window and calculate aggregation values from properties in the messages. The stage emits the aggregated values as properties in a single message at the end of each time window.

  • Each pipeline partition carries out aggregation independently of each other.
  • The output of the stage is a single message that contains all the defined aggregate properties.
  • The stage drops all other properties. However, you can use the Last, First, or Collect functions to preserve properties that would otherwise be dropped by the stage during aggregation.
  • For the aggregate stage to work, the data source stage in the pipeline should deserialize the incoming message.

Prerequisites

To configure and use an aggregate pipeline stage, you need a deployed instance of Azure IoT Data Processor Preview.

Configure the stage

The aggregate stage JSON configuration defines the details of the stage. To author the stage, you can either interact with the form-based UI, or provide the JSON configuration on the Advanced tab:

Field Type Description Required Default Example
Name String A name to show in the Data Processor UI. Yes - Calculate Aggregate
Description String A user-friendly description of what the aggregate stage does. No Aggregation over temperature
Time window Duration that specifies the period over which the aggregation runs. Yes - 10s
Properties > Function Enum The aggregate function to use. Yes - Sum
Properties > InputPath1 Path The Path to the property in the incoming message to apply the function to. Yes - .payload.temperature
Properties > OutputPath2 Path The Path to the location in the outgoing message to place the result. Yes - .payload.temperature.average

You can define multiple Properties configurations in one aggregate stage. For example, calculate the sum of temperature and calculate the average of pressure.

1Input path:

  • The data type of the value of the input path property must be compatible with the type of function defined.
  • You can provide the same input path across multiple aggregation configurations to calculate multiple functions over the same input path property. Make sure the output paths are different to avoid overwriting the results.

2Output path:

  • Output paths can be the same as or different from the input path. Use different output paths if you're calculating multiple aggregations on the same input path property.
  • Configure distinct output paths to avoid overwriting aggregate values.

Windows

The window is the time interval over which the stage accumulates messages. At the end of the window, the stage applies the configured function to the message properties. The stage then emits a single message.

Currently, the stage only supports tumbling windows.

Tumbling windows are a series of fixed-size, nonoverlapping, and consecutive time intervals. The window starts and ends at fixed points in time:

Diagram that shows 10 second tumbling windows in the aggregate stage.

The size of the window defines the time interval over which the stage accumulates the messages. You define the window size by using the Duration common pattern.

Functions

The aggregate stage supports the following functions to calculate aggregate values over the message property defined in the input path:

Function Description
Sum Calculates the sum of the values of the property in the input messages.
Average Calculates the average of the values of the property in the input messages.
Count Counts the number of times the property appears in the window.
Min Calculates the minimum value of the values of the property in the input messages.
Max Calculates the maximum value of the values of the property in the input messages.
Last Returns the latest value of the values of the property in the input messages.
First Returns the first value of the values of the property in the input messages.
Collect Return all the values of the property in the input messages.

The following table lists the message data types supported by each function:

Function Integer Float String Datetime Array Object Binary
Sum
Average
Count
Min
Max
Last
First
Collect

Sample configuration

The following JSON example shows a complete aggregate stage configuration:

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

The configuration defines an aggregate stage that calculates, over a ten-second window:

  • Average temperature
  • Sum of temperature
  • Sum of pressure

Example

This example includes two sample input messages and a sample output message generated by using the previous configuration:

Input message 1:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

Input message 2:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

Output message:

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}