New in V1.2: LINQ Macros

StreamInsight LINQ is powerful and expressive. At the same time, the learning curve for a new user can be steep, because the query operators work on a semantically basic level. This means that complex use cases need to be build bottom up, composing the built-in operators to create higher-level query modules, etc. Many such modules are used over and over again, like the conversion of a stream to a signal, or the left-outer join. In StreamInsight V1.1 it was possible to some extent to write your own extension methods over CepStreams, but due to the mechanics and capabilities of the StreamInsight LINQ provider, for many such methods this was often difficult if not impossible. In V1.2 We overhauled the LINQ infrastructure significantly to make the definition of LINQ macros much easier. This enables you to express common building blocks as extension methods just once, and then use and re-use them to compose more sophisticated computations. The user of these macros doesn’t need to think in terms of AlterLifeTime or Left-Anti-Semi-Join anymore.

Let’s look at the first example, creating an event stream into a series of continuous, non-overlapping interval events. This is a typical preparation to implement a correlation (join) of two streams. In StreamInsight we can easily do this by extending the lifetime of each event and then clipping it right at the beginning of the next event (this is also described in the section “Point to Signal Conversion” on our MSDN pages):

var result = source
                .AlterEventDuration(e => TimeSpan.MaxValue)
                .ClipEventDuration(source, (s, e) => true);

Being a typical query pattern, I can easily convert this into an extension method:

public static CepStream<T> ToSignal<T>(this CepStream<T> inputstream)
{
    return inputstream
        .AlterEventDuration(e => TimeSpan.MaxValue)
        .ClipEventDuration(inputstream, (e1, e2) => true);
}

And use it as follows:

var result = source.ToSignal();

Now assume that my input is a multiplexed stream, containing events from different sources, identified by some payload field. Obviously, I want to convert each source into a separate signal, so the matching condition in ClipEventDuration() needs to change:

var result = source
                .AlterEventDuration(e => TimeSpan.MaxValue)
                .ClipEventDuration(source, (s, e) => s.Id == e.Id);

Ideally, I have an according extension method that allows me to pass the matching expression as a parameter:

var result = source.ToSignal((s, e) => s.Id == e.Id);

Here is how I can write such an extension method:

public static CepStream<T> ToSignal<T>(this CepStream<T> inputstream, Expression<Func<T, T, bool>> matchExpression)
{
    return inputstream
        .AlterEventDuration(e => TimeSpan.MaxValue)
        .ClipEventDuration(inputstream, (e1, e2) => matchExpression.Compile()(e1, e2));
}

The ability to compile expressions into delegates is new in V1.2. It also allows to create the equality predicate inside the macro:

public static CepStream<T> ToSignal<T, K>(this CepStream<T> inputstream, Expression<Func<T, K>> keySelector)
{
    return inputstream
        .AlterEventDuration(e => TimeSpan.MaxValue)
        .ClipEventDuration(inputstream, (e1, e2) => (keySelector.Compile()(e1)).Equals(keySelector.Compile()(e2)));
}

Now I can use the macro by just referring to the field that distinguishes my substreams:

var result = source.ToSignal(s => s.Id);

Our favorite example is the left outer join, which is not a native StreamInsight operator. In this operation, we want to join left and right streams, but also keep every event on the left side that does not join with the right side. We can model this as a plain inner join, a left-anti-semi-join (everything left without right), and the union thereof. Since Colin has already blogged about this pattern, I’ll not repeat it here and just refer to his posting.

Here is another really nice one: De-duplication. The following extension method helps you to get rid of duplicate events (same time, same payload):

public static CepStream<T> Distinct<T>(this CepStream<T> source)
{
    return from x in source
            group x by x into g
            from win in g.SnapshotWindow()
            select new { g.Key, IgnoreMe = win.Count() } into x
            select x.Key;
}

The usage now becomes very simple:

var result = input.Distinct();

Macros make it possible to build a domain-specific set of subqueries on top of the StreamInsight LINQ language, so that defining computations and analytics becomes more accessible and straightforward. The new V1.2 LINQPad samples include several other useful macros. Note that LINQPad does not allow true extension methods in the same file – they would have to be compiled and the resulting DLL referenced. This is why in our LINQPad samples the macros are defined as regular old methods.

Regards,
The StreamInsight Team