time_window_rolling_avg_fl()

The function time_window_rolling_avg_fl() calculates the rolling average of the required value over a constant duration time window.

Calculating rolling average over a constant time window for regular time series (that is, having constant intervals) can be achieved using series_fir(), as the constant time window can be converted to a fixed width filter of equal coefficients. However, calculating it for irregular time series is more complex, as the actual number of samples in the window varies. Still it can be achieved using the powerful scan operator.

This type of rolling window calculation is required for use cases where the metric values are emitted only when changed (and not in constant intervals). For example in IoT, where edge devices send metrics to the cloud only upon changes, optimizing communication bandwidth.

Note

This function is a UDF (user-defined function). For more information, see usage.

Syntax

T | invoke time_window_rolling_avg_fl(t_col, y_col, key_col, dt [, direction ])

Arguments

  • t_col: The name of the column containing the time stamp of the records.
  • y_col: The name of the column containing the metric value of the records.
  • key_col: The name of the column containing the partition key of the records.
  • dt: Duration of the rolling window.
  • direction: Aggregation direction (optional). +1/-1: rolling window is set from current time forward/backward respectively. Default is -1, as backward rolling window is the only possible method for streaming scenarios.

Usage

time_window_rolling_avg_fl() is a user-defined function. You can either embed its code in your query, or install it in your database. There are two usage options: ad hoc and persistent usage. See the below tabs for examples.

For ad hoc usage, embed its code using a let statement. No permission is required.

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
}
;
let tbl = datatable(ts:datetime,  val:real, key:string) [
    datetime(8:00), 1, 'Device1',
    datetime(8:01), 2, 'Device1',
    datetime(8:05), 3, 'Device1',
    datetime(8:05), 10, 'Device2',
    datetime(8:09), 20, 'Device2',
    datetime(8:40), 4, 'Device1',
    datetime(9:00), 5, 'Device1',
    datetime(9:01), 6, 'Device1',
    datetime(9:05), 30, 'Device2',
    datetime(9:50), 7, 'Device1'
];
tbl
| invoke time_window_rolling_avg_fl('ts', 'val', 'key', 10m)
timestamp	                value	avg_value	key
2021-11-29 08:05:00.0000000	10	    10	        Device2
2021-11-29 08:09:00.0000000	20    	15        	Device2
2021-11-29 09:05:00.0000000	30	    30        	Device2
2021-11-29 08:00:00.0000000	1	    1	        Device1
2021-11-29 08:01:00.0000000	2	    1.5        	Device1
2021-11-29 08:05:00.0000000	3	    2        	Device1
2021-11-29 08:40:00.0000000	4	    4        	Device1
2021-11-29 09:00:00.0000000	5	    5        	Device1
2021-11-29 09:01:00.0000000	6	    5.5	        Device1
2021-11-29 09:50:00.0000000	7	    7	        Device1

The first value (10) at 8:05 contains only a single value, which fell in the 10-minute backward window, the second value (15) is the average of two samples at 8:09 and at 8:05, etc.