June 2011

Volume 26 Number 06

StreamInsight - Master Large Data Streams with Microsoft StreamInsight

By Rob Pierry | June 2011

Discovering that the production line has gone down, users’ media streams are skipping or one of your products has become a “must have” is easy once it has already happened. The real trick is identifying these scenarios as they happen or even predicting them based on past trends.

Successfully predicting scenarios like these requires a near-real-time approach. By the time relevant data is extracted, transformed and loaded into a traditional business intelligence (BI) solution like SQL Server Analysis Services (SSAS), the situation has long since changed. Similarly, any system that relies on a request-response pattern to request updated data from a transactional data store (such as a SQL Server Reporting Services, or SSRS, report) is always operating on stale data near the end of its request-polling interval. The polling interval is usually fixed, so even if a burst of interesting activity occurs, the consuming system won’t know until the next interval comes around. Instead, the consuming system should be continuously notified the moment interesting criteria are met.

When detecting emerging trends, intervals of time are key—100 purchases of a specific item in the last five minutes is clearly a bigger indicator of an emerging trend than a steady trickle over the last five months. Traditional systems like SSAS and SSRS require the developer to keep track of the timeliness of data on their own, through a separate dimension in a cube or a time-stamp column in a transactional store. Ideally, tools for identifying emerging scenarios would have the concept of time built-in and provide a rich API for working with it.

Finally, a good indicator of the future comes from analyzing the past. In fact, this is what traditional BI is all about—aggregating and analyzing large volumes of historical data to identify trends. Unfortunately, different tools and query languages are involved when using these systems when compared to more transactional systems. Successfully identifying emerging scenarios requires seamless correlation of past and present data. This kind of tight integration is only possible when the same tools and query language are used for both.

For specific scenarios such as production-line monitoring, highly specific custom tools exist to perform these functions, but these tools are often comparatively expensive and not general purpose.

To prevent the production line from going down or to make sure your products are priced appropriately, the key is to be responsive enough to identify and adjust as the situation changes. To easily and quickly identify these scenarios, historical and real-time queries should use the same developer-friendly toolsets and query languages, the system should handle large volumes of data in near-real time (on the order of hundreds of thousands of events per second), and the engine should be flexible enough to handle scenarios across a wide set of problem domains.

Fortunately, this tool exists. It’s called Microsoft StreamInsight.

StreamInsight Architectural Overview

StreamInsight is a complex event-processing engine capable of handling hundreds of thousands of events per second with extremely low latency. It can be hosted by any process, such as a Windows Service, or embedded directly in an application. StreamInsight has a simple adapter model for getting data in and out, and queries over both real-time and historical data use the same LINQ syntax accessed just like any other assembly from any Microsoft .NET Framework language. It’s licensed as part of SQL Server 2008 R2.

The high-level architecture of StreamInsight is quite simple: events are collected from a variety of sources via input adapters. These events are analyzed and transformed via queries, and the results of the queries are distributed to other systems and people via output adapters. Figure 1 shows this simple structure.

High-Level Architecture of Microsoft StreamInsight

Figure 1 High-Level Architecture of Microsoft StreamInsight

In the same way that service-oriented architectures are concerned with messages and database systems are concerned with rows, complex event-processing systems like StreamInsight are organized around events. An event is a simple piece of data along with the time at which that data is relevant—like a sensor reading at a particular time of day or a stock ticker price. The data carried by the event is called its payload.

StreamInsight supports three types of events. Point events are events that are instant and have no duration. Interval events are events whose payloads are relevant for a specific period of time. Edge events are similar to interval events, except the duration of the event isn’t known upon arrival of the event. Instead, the start time is set and the event effectively has infinite duration until another edge event arrives that sets the end time. For example, a speedometer reading might be a point event because it changes constantly, but the price of milk at the supermarket could be an edge event because it’s relevant for a longer period of time. When the retail price of milk changes (say, due to a change in distributor pricing), the duration of the new price isn’t known, so an edge event is more appropriate than an interval one. Later, when the distributor updates their pricing again, a new edge event can cap the duration of the previous price change while another edge event will set a new price going forward.

Input and output adapters in StreamInsight are an abstract example of the Adapter design pattern. The StreamInsight engine operates over its own representation of events, but the actual sources of these events can vary wildly, ranging from a proprietary interface to a hardware sensor to status messages generated by applications in the enterprise. Input adapters transform the source events into a stream of events that the engine understands.

The results from StreamInsight queries represent specific business knowledge and can be highly specialized. It’s important that these results be routed to the most appropriate place. Output adapters can be used to turn the internal representation of an event into text printed to the console, a message sent via Windows Communication Foundation (WCF) to another system for processing, or even a point on a chart in a Windows Presentation Foundation application. Sample adapters for working with text files, WCF, SQL and more are available at streaminsight.codeplex.com.

StreamInsight Queries by Example

At first glance, StreamInsight queries appear similar to querying rows from a database, but there are critical distinctions. When querying a database, the query is constructed and executed and the results are returned. If the underlying data changes, the output isn’t affected because the query has already run. Database query results represent a snapshot of a moment in time, available via the request-response paradigm.

StreamInsight queries are standing queries. As new input events arrive, the query continuously reacts and new output events are created, if necessary.

The query examples in this article are drawn from the sample solution available for download. They start simply, but grow more powerful as new features of the query language are introduced. The queries all use the same class for the payload. Here’s the definition of a simple class with properties for Region and Value:

public class EventPayload {
  public string Region { get; set; }
  public double Value { get; set; }

  public override string ToString() {
    return string.Format("{0}\t{1:F4}", Region, Value);
  }
}

The queries in the sample application make use of an input adapter that randomly generates data and an output adapter that simply writes each event to the console. The adapters in the sample application are simplified for clarity.

To run each query, uncomment the line in the Program.cs file in the sample solution that assigns the query to the local variable named “template.”

Here’s a basic query that filters the events by the Value property:

var filtered =
  from i in inputStream
  where i.Value > 0.5
  select i;

This query should look familiar to any developer with experience using LINQ. Because StreamInsight uses LINQ as its query language, this query looks just like a LINQ to SQL query hitting a database or an in-memory filtering of an IList. As events arrive from the input adapter, their payloads are inspected, and if the value of the Value property is greater than 0.5, they’re passed to the output adapter where they’re printed to the console.

When the application runs, notice that events continually arrive in the output. This is effectively a push model. StreamInsight computes new output events from inputs as they arrive, rather than a pull model like a database where the application must periodically poll the data source to see if new data has arrived. This fits nicely with the support of IObservable available in the Microsoft .NET Framework 4, which will be covered later.

Having a push model for continuous data instead of polling is nice, but the real power of StreamInsight becomes apparent when querying over properties relating to time. As events arrive through the input adapter, they’re given a timestamp. This timestamp may come from the data source itself (suppose the events represent historical data with an explicit column storing the time) or can be set to the time the event arrived. Time is, in effect, first class in the querying language of StreamInsight.

Queries often look like standard database queries with a time qualifier stuck on the end, such as “every five seconds” or “every three seconds over a five-second span.” For example, here’s a simple query that finds the average of the Value property every five seconds:

var aggregated =
  from i in inputStream
    .TumblingWindow(TimeSpan.FromSeconds(5), 
    HoppingWindowOutputPolicy.ClipToWindowEnd)
  select new { Avg = i.Avg(p => p.Value)};

Windows of Data

Because the concept of time is a fundamental necessity to complex event-processing systems, it’s important to have a simple way to work with the time component of query logic in the system. StreamInsight uses the concept of windows to represent groupings by time. The previous query uses a tumbling window. When the application runs, the query will generate a single output event every five seconds (the size of the window). The output event represents the average over the last five seconds. Just like in LINQ to SQL or LINQ to Objects, aggregation methods like Sum and Average can roll up events grouped by time into single values, or Select can be used to project the output into a different format.

Tumbling windows are just a special case of another window type: the hopping window. Hopping windows have a size, too, but they also have a hop size that isn’t equal to their window size. This means hopping windows can overlap each other.

For example, a hopping window with a window size of five seconds and a hop size of three seconds will produce output every three seconds (the hop size), giving you the average over the last five seconds (the window size). It hops forward three seconds at a time and is five seconds long. Figure 2 shows an event stream grouped into tumbling and hopping windows.

Tumbling and Hopping Windows

Figure 2 Tumbling and Hopping Windows

Notice that the tumbling windows do not overlap, but the hopping windows can if the hop size is smaller than the window size. If the windows overlap, an event may end up in more than one, like the third event, which is in both window 1 and window 2. Edge events (that have duration) may also overlap window boundaries and end up in more than one window, like the second-to-last event in the tumbling window.

Another common window type is the count window. Count windows contain a specific number of events rather than events at a particular point or duration of time. A query to find the average of the last three events that arrived would use a count window. One current limitation of count windows is that the built-in aggregation methods like Sum and Average are not supported. Instead, you must create a user-defined aggregate. This simple process is explained later in the article.

The final window type is the snapshot window. Snapshot windows are easiest to understand in the context of edge events. Every time an event begins or ends, the current window is completed and a new one starts. Figure 3 shows how edge events are grouped into snapshot windows. Notice how every event boundary triggers a window boundary. E1 begins and so does w1. When E2 begins, w1 is completed and w2 begins. The next edge is E1 ending, which completes w2 and starts w3. The result is three windows: w1 containing E1, w2 containing E1 and E2, and w3 containing E3. Once the events are grouped into the windows, they’re stretched so that it appears that the event begins and ends when the window does.

Snapshot Windows

Figure 3 Snapshot Windows

More Complex Queries

Given these available windows and basic query methods like where, group by and order by, a wide variety of queries is possible. Here’s a query that groups the input events by region and then uses a hopping window to output the sum of the payload Value for each Region for the last minute:

var payloadByRegion =
  from i in inputStream
  group i by i.Region into byRegion
  from c in byRegion.HoppingWindow(
    TimeSpan.FromMinutes(1),
    TimeSpan.FromSeconds(2), 
    HoppingWindowOutputPolicy.ClipToWindowEnd)
  select new { 
    Region = byRegion.Key, 
    Sum = c.Sum(p => p.Value) };

These windows use a hop size of two seconds, so the engine sends output events every two seconds.

Because the query operators are defined on the IQueryable interface, composing queries is possible. The following code uses the previous query that finds sums by region to then calculate the region with the highest sum. A snapshot window allows the event stream to be sorted by sum so that the Take method can grab the region with the highest sum:

var highestRegion = 
  // Uses groupBy query 
  (from i in payloadByRegion.SnapshotWindow(
    SnapshotWindowOutputPolicy.Clip)
    from sumByRegion in i
    orderby sumByRegion.Sum descending
    select sumByRegion).Take(1);

A common scenario is a query that relates a stream of fast-moving events (like the reading from a sensor) to slower-moving or static reference data (like the fixed location of the sensor). Queries use joins to accomplish this goal.

The StreamInsight join syntax is the same as any other LINQ join, with one important caveat: events will only join together if their durations overlap. If sensor 1 reports a value at time t1, but the reference data about sensor 1’s location is only valid for time t2-t3, then the join will not match. The join criteria for duration isn’t explicitly written into the query definition; it’s a fundamental property of the StreamInsight engine. When working with static data, it’s common for the input adapter to effectively treat the data as edge events with infinite duration. This way, all the joins to the fast-moving event streams will succeed.

Correlating multiple event streams via joins is a powerful concept. Assembly lines, oil production facilities or high-volume Web sites don’t often fail due to isolated events. One piece of equipment triggering a temperature alarm doesn’t usually bring the line down; it’s a combination of circumstances such as the temperature being too high over a sustained period of time while a specific tool is in heavy use and the human operators are changing shifts.

Without joins, the isolated events wouldn’t have as much business value. Using joins and StreamInsight queries over historical data, users can correlate isolated streams into highly specific monitoring criteria that are then monitored in real time. A standing query can look for the situations that would lead to failure and automatically generate an output event that could be routed to a system that knows how to take the overheating piece of equipment offline instead of waiting until it brings down the whole line.

In a retailing scenario, events relating to sales volume by item over time can feed into pricing systems and customer order histories to ensure optimal pricing per item, or to drive which items to suggest to the user before checkout. Because queries are easily created, modified and composed, you can start out with simple scenarios and refine them over time, yielding increasing value to the business.

User-Defined Aggregates

StreamInsight ships with several of the most common aggregate functions including Count, Sum and Average. When these functions aren’t enough (or you need to aggregate over a count window, as mentioned earlier), StreamInsight supports user-defined aggregate functions.

The process of creating a user-defined aggregate involves two steps: writing the actual aggregate method, then exposing the method to LINQ via an extension method.

The first step involves inheriting from either CepAggregate<TInput, TOutput> if the aggregate is not time-dependent, or CepTimeSensitiveAggregate<TInput,TOutput> if it is. These abstract classes have a single method to be implemented called GenerateOutput. Figure 4 shows the implementation of the EveryOtherSum aggregate, which adds up every other event.

Figure 4 EveryOtherSum Aggregate

public class EveryOtherSum : 
  CepAggregate<double, double> {

  public override double GenerateOutput(
    IEnumerable<double> payloads) {

    var sum = default(double);
    var include = true;
    foreach (var d in payloads) {
      if (include) sum += d;
      include = !include;
    }
    return sum;
  }
}

The second step involves creating an extension method on CepWindow<TPayload> so that your aggregate can be used in queries. The CepUserDefinedAggregateAttribute is applied to the extension method to tell StreamInsight where to find the implementation of the aggregate (in this case, the class created in the first step). The code for both steps of this process is available in the EveryOtherSum.cs file in the downloadable sample application.

More About Adapters

Queries represent business logic that acts over data provided by adapters. The sample application uses a simple input adapter that generates random data and an output adapter that writes to the console. Both follow a similar pattern, which is also followed by the adapters available on the CodePlex site.

StreamInsight uses a Factory pattern for creating adapters. Given a configuration class, the factory creates an instance of the appropriate adapter. In the sample application, the configuration classes for the input and output adapters are quite simple. The output adapter configuration has a single field to hold a format string to use when writing the output. The input adapter configuration has a field for the time to sleep between generating random events as well as another field called CtiFrequency.

The Cti in CtiFrequency stands for Current Time Increment. StreamInsight uses Cti events to help ensure that events are delivered in the correct order. By default, StreamInsight supports events arriving out of order. The engine will automatically order them appropriately when passing them through the queries. However, there’s a limit to this reordering.

Suppose events really could arrive in any order. How would it ever be possible to determine that the earliest event had arrived and could thus be pushed through the query? It wouldn’t be, because the next event might have a time earlier than the earliest you’ve already received. StreamInsight uses Cti events to signal the engine that no more events earlier than what have already been received will arrive. Cti events effectively cue the engine to process the events that have arrived and subsequently ignore or adjust any with timestamps earlier than the current time.

The sample input adapter generates an ordered event stream, so it automatically inserts a Cti event after every generated event to keep things moving along. If you’re ever writing an input adapter and your program produces no output, make sure your adapter is inserting Ctis, because without them the engine will wait forever.

StreamInsight ships with a variety of base classes for adapters: typed, untyped, point, interval and edge. Typed adapters always produce events with a well-known payload type—in the sample case, the class RandomPayload. Untyped adapters are useful for event sources that may generate multiple types of events or things like CSV files, where the layout and content of the rows isn’t known in advance.

The sample input adapter has a well-known payload type and generates point events, so it inherits from TypedPointInputAdapter<RandomPayload>. The base class has two abstract methods that must be implemented: Start and Resume. In the sample, the Start method enables a timer that fires at the interval specified by the configuration. The timer’s Elapsed event runs the ProduceEvent method, which does the main work of the adapter. The body of this method follows a common pattern.

First, the adapter checks whether the engine has stopped since it last ran, and that it’s still running. Then a method in the base class is called to create an instance of a point event, its payload is set and the event is queued in the stream. In the sample, the SetRandomEventPayload method stands in for any real adapter logic—for example, reading from a file, talking to a sensor or querying a database.

The input adapter factory is also simple. It implements an interface ITypedInputAdapterFactory<RandomPayloadConfig> because it’s a factory for typed adapters. The only trick to this factory is that it also implements the ITypedDeclareAdvanceTimeProperties<RandomPayloadConfig> interface. This interface allows the factory to handle inserting the Ctis, as explained earlier.

The sample application’s output adapter follows almost exactly the same pattern as the input. There’s a configuration class, a factory and the output adapter itself. The adapter class looks a lot like the input adapter. The main difference is that the adapter removes events from the queue rather than queueing them. Because Cti events are events just like the others, they arrive at the output adapter, too, and are simply ignored.

Observables

Though the adapter model is quite simple, there’s an even easier way to get events into and out of the engine. If your application is using the embedded deployment model for StreamInsight, you can use both IEnumerables and IObservables as inputs and outputs for the engine. Given an IEnumerable or an IObservable, you can create an input stream by calling one of the provided extension methods like ToStream, ToPointStream, ToIntervalStream or ToEdgeStream. This creates an event stream that looks exactly the same as one created by an input adapter.

Likewise, given a query, extension methods like ToObservable/Enumerable, ToPointObservable/Enumerable, ToIntervalObservable/Enumerable or ToEdgeObservableEnumerable will route the query output to an IObservable or IEnumerable, respectively. These patterns are especially useful for replaying historical data saved in a database.

Using the Entity Framework or LINQ to SQL, create a database query. Use the ToStream extension method to convert the database results into an event stream and define a StreamInsight query over it. Finally, use ToEnumerable to route the StreamInsight results into something that you can easily foreach over and print out.

Deployment Model and Other Tools

To use the Observable and Enumerable support, StreamInsight must be embedded in your application. StreamInsight does support a standalone model, though. When installing, you’re asked whether to create a Windows Service to host the default instance. The service can then host StreamInsight, allowing multiple applications to connect to the same instance and share adapters and queries.

Communicating with a shared server instead of an embedded one simply involves a different static method on the Server class. Instead of calling Create with the instance name, call Connect with an EndpointAddress that points to the shared instance. This deployment strategy is more useful for enterprise scenarios where multiple applications would want to consume shared queries or adapters.

In both cases, it’s sometimes necessary to figure out why the output generated by StreamInsight isn’t what it should be. The product ships with a tool called the Event Flow Debugger for just this purpose. The use of the tool is beyond the scope of this article, but in general it allows you to connect to instances and trace events from the input through queries and to the output.

A Flexible, Reactive Tool

Flexible deployment options, a familiar programming model and easily creatable adapters make StreamInsight a good choice for a wide variety of scenarios. From a centralized instance querying and correlating thousands of sensor inputs a second, to an embedded instance monitoring current and historical events within a single application, StreamInsight utilizes developer-friendly frameworks like LINQ to enable highly customizable solutions.

Simple-to-create adapters and built-in support for converting between event streams and IEnumerables and IObservables make it easy to quickly get solutions up and running, so the incremental work of creating and refining queries that encapsulate specific business knowledge can begin. As they’re refined, these queries provide more and more value, enabling applications and organizations to identify and react to interesting scenarios as they occur, rather than after the window of opportunity has passed.                                                                    


Rob Pierry is a principal consultant with Captura (capturaonline.com), a consulting company that delivers innovative user experiences backed by scalable technology. He can be reached at rpierry+msdn@gmail.com.

Thanks to the following technical experts for reviewing this article: Ramkumar Krishnan, Douglas Laudenschlager and Roman Schindlauer