Aggregate data in an Azure IoT Data Processor Preview pipeline
Important
Azure IoT Operations Preview – enabled by Azure Arc is currently in PREVIEW. You shouldn't use this preview software in production environments.
See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.
The aggregate stage is an optional, configurable, intermediate pipeline stage that lets you run down-sampling and batching operations on streaming sensor data over user-defined time windows.
Use an aggregate stage to accumulate messages over a defined window and calculate aggregation values from properties in the messages. The stage emits the aggregated values as properties in a single message at the end of each time window.
- Each pipeline partition carries out aggregation independently of each other.
- The output of the stage is a single message that contains all the defined aggregate properties.
- The stage drops all other properties. However, you can use the Last, First, or Collect functions to preserve properties that would otherwise be dropped by the stage during aggregation.
- For the aggregate stage to work, the data source stage in the pipeline should deserialize the incoming message.
Prerequisites
To configure and use an aggregate pipeline stage, you need a deployed instance of Azure IoT Data Processor Preview.
Configure the stage
The aggregate stage JSON configuration defines the details of the stage. To author the stage, you can either interact with the form-based UI, or provide the JSON configuration on the Advanced tab:
Field | Type | Description | Required | Default | Example |
---|---|---|---|---|---|
Name | String | A name to show in the Data Processor UI. | Yes | - | Calculate Aggregate |
Description | String | A user-friendly description of what the aggregate stage does. | No | Aggregation over temperature |
|
Time window | Duration that specifies the period over which the aggregation runs. | Yes | - | 10s |
|
Properties > Function | Enum | The aggregate function to use. | Yes | - | Sum |
Properties > InputPath1 | Path | The Path to the property in the incoming message to apply the function to. | Yes | - | .payload.temperature |
Properties > OutputPath2 | Path | The Path to the location in the outgoing message to place the result. | Yes | - | .payload.temperature.average |
You can define multiple Properties configurations in one aggregate stage. For example, calculate the sum of temperature and calculate the average of pressure.
1Input path:
- The data type of the value of the input path property must be compatible with the type of function defined.
- You can provide the same input path across multiple aggregation configurations to calculate multiple functions over the same input path property. Make sure the output paths are different to avoid overwriting the results.
2Output path:
- Output paths can be the same as or different from the input path. Use different output paths if you're calculating multiple aggregations on the same input path property.
- Configure distinct output paths to avoid overwriting aggregate values.
Windows
The window is the time interval over which the stage accumulates messages. At the end of the window, the stage applies the configured function to the message properties. The stage then emits a single message.
Currently, the stage only supports tumbling windows.
Tumbling windows are a series of fixed-size, nonoverlapping, and consecutive time intervals. The window starts and ends at fixed points in time:
The size of the window defines the time interval over which the stage accumulates the messages. You define the window size by using the Duration common pattern.
Functions
The aggregate stage supports the following functions to calculate aggregate values over the message property defined in the input path:
Function | Description |
---|---|
Sum | Calculates the sum of the values of the property in the input messages. |
Average | Calculates the average of the values of the property in the input messages. |
Count | Counts the number of times the property appears in the window. |
Min | Calculates the minimum value of the values of the property in the input messages. |
Max | Calculates the maximum value of the values of the property in the input messages. |
Last | Returns the latest value of the values of the property in the input messages. |
First | Returns the first value of the values of the property in the input messages. |
Collect | Return all the values of the property in the input messages. |
The following table lists the message data types supported by each function:
Function | Integer | Float | String | Datetime | Array | Object | Binary |
---|---|---|---|---|---|---|---|
Sum | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Average | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Count | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Min | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Max | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Last | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
First | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Collect | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Sample configuration
The following JSON example shows a complete aggregate stage configuration:
{
"displayName":"downSample",
"description":"Calculate average for production tags",
"window":
{
"type":"tumbling",
"size":"10s"
},
"properties":
[
{
"function":"average",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_avg"
},
{
"function":"collect",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_all"
},
{
"function":"average",
"inputPath":".payload.pressure",
"outputPath":".payload.pressure"
},
{
"function":"last",
"inputPath":".systemProperties",
"outputPath": ".systemProperties"
}
]
}
The configuration defines an aggregate stage that calculates, over a ten-second window:
- Average temperature
- Sum of temperature
- Sum of pressure
Example
This example includes two sample input messages and a sample output message generated by using the previous configuration:
Input message 1:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 10,
"temperature":250,
"pressure":30,
"runningState": true
}
}
Input message 2:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 11,
"temperature":235,
"pressure":25,
"runningState": true
}
}
Output message:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"payload":{
"temperature_avg":242.5,
"temperature_all":[250,235],
"pressure":27.5
}
}
Related content
Feedback
https://aka.ms/ContentUserFeedback.
Coming soon: Throughout 2024 we will be phasing out GitHub Issues as the feedback mechanism for content and replacing it with a new feedback system. For more information see:Submit and view feedback for