Stream join aggregates using SQL Server Stream Insight

Lately I have been working with StreamInsight and thought it was about time to share some of my observations and key points.

This is loosely based upon a requirement to aggregate Financial risk vectors and deliver changes to clients in an event based manner. When working through this I was primarily interested in the following factors:

  • Joining the two streams in a timely manner, with the results being ‘lively’ (that is released as quickly as possible)
  • What sort of performance could be achieved (without much effort)
  • How I could inject queries into the engine at runtime to enable a ‘service based model’ for clients.

Think of risk vectors simply as curves - a series of values plotted over a set of time points (arrays of doubles over distinct points in time). For example, look at the following 'curve' which shows the Bank Of England base rates I took at a specific point in time.


Normally over time longer term rates are higher than shorter term rates.

Essentially I have two streams of data which I want to correlate. One stream is of risk vector arrays - which have been calculated based upon a number of inputs upstream, usually by some form of compute grid.

The second stream is the attribution of the vectors - the things we may want to analyse or aggregate by - Trade, Portfolio etc. with a join key which relates the Trade to the Risk. Once combined I multiply a nominal amount by the corresponding rate and that is my risk. The basic idea for each Trade is:

  • Join to the array of doubles;
  • Multiply a nominal amount by each double;
  • Dequeue the result

This is summarized in the below diagram.


The data sources are as shown below.



Basic Plumbing

The first thing to do is to build out the 'infrastructure' using the StreamInsight programming model. 

This means creating my adapters (Risk, Trade and Output). I won't describe the details of the programming model. You can review the code sample and the Stream Insight documentation for this. However, for this scenario there a couple of key points:

  • Both Trade and Risk must be either Interval or Edge event types. This is because I am joining streams across time and the cardinality of the relationship is many to many. A vector will apply to many Trades, and a Trade can apply to many vectors. A join on two point events would not support this (once the Point event is released it is gone).
  • For aggregate queries I want to release the events whenever there is an update (new source event which contributes to it). This means using the SnapshotWindow – using Hopping or Tumbling will not support this.

Joining Streams

It is worth briefly describing the management of application time. This is a fundamental concept to understand in Stream Insight. Common Time Increments (CTI) events are used as a basis for advancing application time. The CTI is effectively an assertion that there will be no more events beyond this time - and this tells the engine that it is ok to dequeue events. For example if I enqueue 3 events with times of T1, T2 and T3 (assuming Point events) and enqueue them to the engine, they will be dequeued when I assert a CTI of T4.

There are a few ways of managing application time in Stream Insight:

  • Declaratively - using AdvancedTimeSettings
  • Using a heartbeat mechanism - such as a timer
  • Declaratively using an AdvancedTimeImport

When joining streams what is important to realize is that a ‘join event’ is only released to the output after a CTI has been inserted into each of the source streams.

In my example I used the following input files:

  • Trades – 5000 trades, each with a join to a single array
  • Risk – 40 vectors, each an array of 20 values

This means that we will produce 100,000 events (5000 x 20).

Firstly, I looked at using AdvancedTimeSettings, defining the same time policy for each stream. The problem here is that - unless your streams have the same number of events which are enqueued with the same occurrence of CTIs (very unlikely!) - at a certain point in time the slower stream will enqueue all of its events. As mentioned above, this means that there are no more events to enqueue and, as we are using declarative approach to time management, there will be no further CTIs produced. So we could enqueue all of the Risk, more Trades could arrive which have a matching join already enqueued, but they will not be released – because there is no CTI inserted to the Risk stream.

The second option is to manage things manually using a ‘heartbeat’ CTI injection approach – essentially each adapter injecting CTIs using a Timer. This is an approach which does work but you have to be careful to make sure your streams are in sync and you do not end up with CTI violations – whereby you try to insert a CTI with a timestamp greater than an already enqueued event. I took a simple approach with a singleton to hold state which both streams would call a method of to advance time.

The third option is to use AdvancedTimeImport. This allows us to synchronize two streams – the ‘slower’ stream importing CTIs from the ‘faster’ stream. An example is shown below.

  1. #region Time Settings
  3. var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(1),true);
  4. var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);
  6. //These settings will import CTIs from the Risk Stream
  7. var timeImportSettings = new AdvanceTimeSettings(null,
  8.                             new AdvanceTimeImportSettings("TradeStream"), AdvanceTimePolicy.Adjust);
  9. #endregion
  11. #region Streams
  13. //Create the inputstreams
  15. var tStream = CepStream<Trade>.Create(
  16.              "TradeStream",
  17.              typeof(TradeAdapteFactory),
  18.              tconfig,
  19.              EventShape.Interval
  20.              ,ats
  21.              );
  23. //Import timeimport from trade
  24. var rStream = CepStream<Risk>.Create(
  25.              "RiskStream",
  26.              typeof(RiskAdapteFactory),
  27.              rconfig,
  28.              EventShape.Interval//,timeImportSettings
  29.              ,timeImportSettings
  31.              );
  33. #endregion


In my example there is a subtlety here – which stream is the ‘slow’ one? From a logical perspective I should expect Trade events to arrive before Risk (Risk events are produced elsewhere using Trade as an input). If I consider the cardinality and volume of my data I have 5000 Trades and 800 Risk points. This means that Trade is the fast stream because I will have finished enqueuing Risk before I have finished enqueing all my Trade events. Thus, I generate CTIs in the Risk stream for any new Trades imported, thereby releasing any ‘join events’. Whereas this is fine, and works completely as expected, it forced me to think about the timeliness of the arrival of data in the source streams in real life. The two streams are from different sources and completely independent. Event though Trade is the ‘fast’ stream here what happens if there is a gap in the enqueuing of Trades, perhaps due to a system failure? I may get new Risk events arrive which have an existing Trade match but the join events will not be released. If this is not desirable, and you do not want gaps/delays in the fast stream causing delays on valid events being released then a manual approach to time management is probably preferable.


Performance (and ‘null’ adapters)

One of my goals was to see what sort of ‘raw’ performance I could get with StreamInsight. As I was interested in the raw performance of the engine rather than how well (or badly) I could write input and output adapters I used simple csv source files and accessed them using a StreamReader. For the output adapter I came up with the idea of a ‘NULL adapter’ whereby I dequeue events but then do not do anything further with them (outputting them to console for example, which is what I started with, is just far too slow). I could use the Event Flow Debugger and server.GetDiagnosticView to validate that all of the events had been enqueued and released.

So with two StreamReader Input Adapters, and one ‘Null Adapter’ what sort of performance could I get on my laptop?

Without much (ok, any!) effort I was able to process and release all events within 2.5 seconds. That is 40,000 a second or, from a ‘released vectors’ perspective, 2000! This was way in excess of my expected target (400). This hardly represents a soak test with a steady state but there is an important point I took away from this. The StreamInsight engine is very performant – it is up to you as the adapter writer to try to keep up! I think the ‘null adapter’ approach is a useful way to benchmark the kind of output performance you should be trying to get near too. The reality will always be at least a little slower depending on your event sink (Database, Service, Client etc.).


Injecting Queries

StreamInsight has LINQ support for writing queries.

An aggregate query based upon my join query is shown below. As noted above, it must use SnapshotWindow in order to produce the ‘event based result’ I needed.

  1. //SUM by CounterParty over a snapshot to get some aggregates
  2.             var CPartyQuery = from c in joinedrisk
  3.                               group c by c.CounterParty into CpGroup
  4.                               from window in CpGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  5.                               select new { Sum = window.Sum(e => e.Amount), CounterParty=CpGroup.Key };


This is great, but it quickly led me to think about how can you bind queries to the engine – to support a scenario of having CEP ‘as a service’ which clients can bind to, for example. After some poking around there is some support for this. There is a property of a QueryTemplate (Definition) which returns an XML definition of the query. An example query is shown below.

  1. <QueryTemplate Name="cep:/Server/Application/RiskAggregateService/QueryTemplate/filterLogic" Description="filtered" xmlns="">
  2.   - <Import Name="filterInput" Type="cep:/Server/Application/RiskAggregateService/EventType/RealTimeRiskCEPApplication.Events.Risk%2C%20RealTimeRiskCEPApplication%2C%20Version%3D1.0.0.0%2C%20Culture%3Dneutral%2C%20PublicKeyToken%3Dnull">
  3.     <OutputStream Name="Import.2.0" />
  4.   </Import>
  5.   - <Export Name="OutputAdapter">
  6.     <InputStream Name="Select.1.1" />
  7.   </Export>
  8.   - <Select Name="Select.1.1">
  9.     <InputStream Name="Import.2.0" />
  10.     <OutputStream Name="Select.1.1" />
  11.     - <FilterExpression>
  12.       - <Equal>
  13.         <InputField Name="RiskId" StreamName="Import.2.0" />
  14.         <Constant Type="System.Int32" Nullable="false" Value="1" />
  15.       </Equal>
  16.     </FilterExpression>
  17.   </Select>
  18. </QueryTemplate>


Once you have this, you can bind queries from XML, as per the below.


  1. //Create the query template, and bind it
  3. var dummyTemplate1 = application.CreateQueryTemplate("dummyTemplate1", "Description...", CepStream<TradeDetail>.Create("input1"));
  5. var dummyTemplate2 = application.CreateQueryTemplate("dummyTemplate2", "Description...", CepStream<Risk>.Create("input2"));
  8. XmlReader riskFilter = XmlReader.Create(fileDir + "RealTimeRiskCEPApplication\\RiskJoin.xml");
  10. QueryTemplate filterQT = application.CreateQueryTemplate(riskFilter);
  12. QueryBinder queryBinder = new QueryBinder(filterQT);
  14. //Bind to the input adapters
  16. queryBinder.BindProducer<Risk>("RiskStream", RiskInputAdapter, rconfig, EventShape.Interval, timeImportSettings);
  18. queryBinder.BindProducer<TradeDetail>("TradeStream", TradeInputAdapter, tconfig, EventShape.Interval);          
  20. //Bind to the output
  21. queryBinder.AddConsumer("ConsoleOutput", ConsoleOutAdapter, conbaseriskconfig, EventShape.Point, StreamEventOrder.FullyOrdered);
  23. Query basequery = application.CreateQuery("BaseRisk", "Base risk join query", queryBinder);


One wrinkle I found is that you have to have a couple of dummy query templates with the same event type as you use in BindProducer. Otherwise it does not work.

It is also worth noting that there are some limitations to this query binding technique – lambda expressions in the query being a notable one. It is also worth noting that none of this (including the XML structure for queries) was documented, even through it is in the API.

In Summary

In summary, SQL Server StreamInsight is a highly performant CEP engine. The adapter model is relatively straightforward. Generally speaking the engine will be as performant as the adapters you write for it - The ‘null adapter’ approach is worth considering just to get a baseline on the hardware you are running. The semantics of time, and understanding the affects of CTI injection on result liveliness are probably the most important concepts to get to grips with. And I found the Event Flow Debugger essential in understanding what was going on in the engine when results were not as I was expecting.

It is possible to inject queries at runtime, though this was not that obvious. Nonetheless the support is there for developing a service layer which allows clients bind the queries which there are interested in. The results to these could then be published using a message based technology to clients.

Written by Terry Room