Building your First End to End StreamInsight Application (Going Deep on Adapters)

So you’ve heard about this shiny StreamInsight thing, perhaps from my last post, and want to build something for yourself and understand how all of this stuff fits together.  At this point you should have your Visual Studio environment up and running, along with StreamInsight installed (if you don’t, have a look at this post).  Every StreamInsight application consists of the following core components:


  • Input adapters.  Input adapters provide the interface for collecting data from a variety of sources (web services, databases, data feeds, sensors).  A StreamInsight input adapter is a set of .NET classes implementing a set of interfaces. 
    • They can be typed or untyped.  Typed adapters have a fixed (or known at compile time) input data type, whereas untyped adapters infer their type at runtime based on configuration information.  For example, a SQL Server input adapter would infer a data type based on a SELECT statement, stored procedure, etc, given to it at runtime.
  • Queries.  Standing queries define the questions being asked of the data streaming through.  Queries can be chained together, with a complete end to end query consisting of a set of input adapters, series of queries, and a set of output adapters.
    • Queries are defined in StreamInsight’s flavour of LINQ (lots more later).
  • Output adapters.  Output adapters provide the interface for delivering query results to external systems and applications.  Output adapters are the mirror image of input adapters (i.e. can be typed or untyped, etc).
  • Host.  StreamInsight supports a variety of deployment models to best suit the needs of your application or deployment.  The API (i.e. all of the StreamInsight code) can target an in-process “embedded” StreamInsight server, or a remote server (typically hosted in a Windows Service, such as the StreamInsightHost.exe that ships with StreamInsight).

In this post we’ll go through building a very simple end to end StreamInsight application consisting of the following components:

  • Simulated data input adapter.  This will be an input adapter which can raise typed input events with random values and intervals.
  • Console output adapter.  This will be an output adapter which can deliver output events to the console window.
  • Queries.  We’ll develop a couple of different queries to demonstrate query composition, time windows and aggregates.
  • In-process host.  To keep things simple we’ll use the in-process hosting model.

Readers familiar with StreamInsight may be thinking “those adapters are part of the standard StreamInsight samples on codeplex”.  You’d be absolutely right; I’d like to walk you through how those are built (having been one of the authors :) and talk about some of the design choices.

Writing our First Query

I’ll first go through a simple end to end query example, using some of the components that we’ll walk through developing later in this post.

  • Open up the IntroToStreamInsight project, downloaded here
  • Open up the Main.cs file, with the following code:
    1:  using System;
    2:  using System.Collections.Generic;
    3:  using System.Linq;
    4:  using System.Text;
    5:  using Microsoft.ComplexEventProcessing.Linq;
    6:  using IntroHost.SimulatedInputAdapter;
    7:  using Microsoft.ComplexEventProcessing;
    8:  using StreamInsight.Samples.Adapters.OutputTracer;
   10:  namespace IntroHost
   11:  {
   12:      class Program
   13:      {
   14:          static void Main(string[] args)
   15:          {
   16:              // Initialize the StreamInsight log using the default settings from
   17:              // the app.config file
   18:              StreamInsightLog.Init();
   20:              // Use the StreamInsight "Default" instance
   21:              string instanceName = "Default";
   23:              // Embed the StreamInsight engine
   24:              using (Server cepServer = Server.Create(instanceName))
   25:              {
   26:                  // Create an application in which to host our queries and adapters
   27:                  Application cepApplication = cepServer.CreateApplication("simple");
   29:                  // Create an input data stream using the SimulatedInput adapter,
   30:                  // raising events of the SimpleEventType type, and using the 
   31:                  // SimpleEventTypeFiller class to fill in the payload fields.  We
   32:                  // will raise a Point event every 100 ms (10 every second).
   33:                  var input = CepStream<SimpleEventType>.Create("inputStream",
   34:                      typeof(SimulatedInputFactory), new SimulatedInputAdapterConfig()
   35:                      {
   36:                           CtiFrequency = 1,
   37:                           EventPeriod = 100,
   38:                           EventPeriodRandomOffset = 0,
   39:                           TypeInitializer = typeof(SimpleEventTypeFiller).AssemblyQualifiedName
   40:                      }, 
   41:                      EventShape.Point);
   43:                  // Simple pass through query.  Grab the input values, pass to the 
   44:                  // output adapter
   45:                  //var query = from e in input select e;
   47:                  // Aggregating query.  Average the meter values by meter over a 
   48:                  // 3 second window, along with the min, max and number of events
   49:                  // for that meter
   50:                  var query = from e in input
   51:                                 group e by e.MeterId into meterGroups
   52:                                 from win in meterGroups.HoppingWindow(
   53:                                     TimeSpan.FromSeconds(3),
   54:                                     TimeSpan.FromSeconds(2),
   55:                                     HoppingWindowOutputPolicy.ClipToWindowEnd)
   56:                                 select new
   57:                                 {
   58:                                     meterId = meterGroups.Key,
   59:                                     avg = win.Avg(e => e.Value),
   60:                                     max = win.Max(e => e.Value),
   61:                                     min = win.Min(e => e.Value),
   62:                                     count = win.Count()
   63:                                 };
   66:                  // Bind the query to an output adapter, writing events to the 
   67:                  // console
   68:                  var output = query.ToQuery(cepApplication, "simpleQuery", "A Simple Query",
   69:                      typeof(TracerFactory), new TracerConfig()
   70:                      {
   71:                           DisplayCtiEvents = true,
   72:                           SingleLine = true,
   73:                          TracerKind = TracerKind.Console
   74:                      }, EventShape.Point, StreamEventOrder.FullyOrdered);
   76:                  output.Start();
   78:                  Console.WriteLine("Query active - press <enter> to shut down");
   79:                  Console.ReadLine();
   81:                  output.Stop();
   82:              }
   83:          }
   84:      }
   85:  }

All of the magic is line 50 to 63, which defines our query.  This takes the incoming simulated events, groups them by the meter ID, and creates a window in time that is 3 seconds wide, and advances (creates output) every 2 seconds.  The output of this query is the meter ID, along with the average, min and max values.

Executing this application will result in output similar to the following (since the values are randomly generated the details will differ):

 2010-08-09 17:12:09,314 INFO  - Advance time policy: CTI f 1, CTI offset -00:00:00.0000001, time policy: AdjustQuery active - press <enter> to shut downCTI - 12:12:08.000POINT(12:12:08.000) avg = 57.7530399389036, count = 3, max = 93.6887342453416, meterId = ValveThree, min = 7.61713502351992,POINT(12:12:08.000) avg = 60.2433371172488, count = 2, max = 77.736259381164, meterId = ValveTwo, min = 42.7504148533337,POINT(12:12:08.000) avg = 60.1549154078378, count = 4, max = 84.4917737340982, meterId = ValveOne, min = 13.812115236098,CTI - 12:12:10.000POINT(12:12:10.000) avg = 46.1892059753599, count = 8, max = 93.6887342453416, meterId = ValveThree, min = 7.61713502351992,POINT(12:12:10.000) avg = 50.7219083455338, count = 6, max = 77.736259381164, meterId = ValveTwo, min = 8.38052840362328,POINT(12:12:10.000) avg = 62.5471921749812, count = 13, max = 94.2385576638573,meterId = ValveOne, min = 3.48205389617107,CTI - 12:12:12.000

Let’s change the SingleLine = true to SingleLine = false, and run the application again:

 Query active - press <enter> to shut downCTI - 12:12:54.000POINT(12:12:54.000)        avg:            29.9929076712545        count:          2        max:            37.6554307703187        meterId:                ValveTwo        min:            22.3303845721904POINT(12:12:54.000)        avg:            58.6186501121235        count:          4        max:            91.9514771047754        meterId:                ValveThree        min:            10.0425148895208POINT(12:12:54.000)        avg:            42.7088354447432        count:          4        max:            71.3766915590394        meterId:                ValveOne        min:            23.3763407093363CTI - 12:12:56.000

There we go.. much more intuitive.  We see the result of grouping by the meter ID, then aggregating the simulated data looking for average, min and max values.  To put it in a more tabular sense (through the magic of Excel using the row centric data from the first run):

Date Time Avg Count Max Meter ID Min
8/9/2010 21:54:02 13.58 2 16.65 ValveThree 10.52
8/9/2010 21:54:02 74.14 3 88.43 ValveTwo 64.09
8/9/2010 21:54:02 45.35 7 88.92 ValveOne 3.18

Ok, that wasn’t too crazy – what all went into building this out?  Funny you should ask…in general, the above code is what you'll spend the majority of your time doing in StreamInsight - defining and executing queries.  For some situations you'll need to dig a bit deeper, define your own adapters, etc.  The rest of this post is intended as the from the ground up walkthrough of building out all of the low level components.  This won't be an everyday activity.

Digging Deeper – How to Build the Components

Here are the sections in this post:


Setting up the Visual Studio Project

  1. Create a new C# Console Application in Visual Studio (for the purposes of the downloadable code at the end of this post, I called mine IntroToStreamInsight).  You can use .NET 3.5 or .NET 4.0 (if you’ve installed the June update to StreamInsight v1.0).
  2. Add the following references to your project, available from the C:\Program Files\Microsoft StreamInsight 1.0\Bin folder:
    1. Microsoft.ComplexEventProcessing.dll
    2. Microsoft.ComplexEventProcessing.Adapters.dll
    3. Microsoft.ComplexEventProcessing.ManagementService.dll


If you don’t have this snazzy Add Reference dialog (for VS2010 users) you’ll definitely want to install these amazing add-ons from the Extension Manager (Tools –> Extension Manager):

  1. Click on Tools, then Extension Manager.
  2. From the Extension Manager dialog, click on the Online Gallery.
  3. From the list (sorted by Highest Ranked), select the Productivity Power Tools and PowerCommands for Visual Studio 2010.
  4. Enjoy the awesomeness of a searchable Add Reference dialog and the ability to Copy References and Paste References between projects.


Building a Typed Input Adapter for Generating Simulated Data

Before writing any awesome queries (or even any mildly cool queries) we need some data to stream through our application.  This data can come from many sources; for getting up to speed it’s generally easiest to work with simulated or canned data.  This can be replayed from a canned data set (such as a .CSV file), or dynamically generated.  In this walkthrough let’s go ahead and implement an input adapter that can:

  • Implement different types of events (Point, Interval and Edge events are the three types of events supported by StreamInsight.  I’ll get into the messy details of the differences between these in a future post (for more details see the Event Model topic under StreamInsight Server Concepts, but for now:
    • Point events happen at a specific point in time (such as reading a sensor).
    • Interval events happen between two points in time (such as a stock tick that’s valid between two points in time)
    • Edge events that start at a specific time but have a not-yet-known end time (such as a stock tick which is still valid, but may become invalid at some point in time)
  • Raised typed events that can be defined at compile time (i.e. using generics), with the ability to randomize the payload and event rate interval (i.e. how often the events are raised).
  • Provide configurable logging to assist in diagnostics and debugging (not that we’re going to make mistakes or anything :), but it’ll come in handy in future posts with more complicated adapters).

For more background on developing adapters, check out the Creating Input and Output Adapters topic on MSDN.  Having laid out the baseline requirements, let’s dive in through the process of building this out.

Defining the Configuration File

The first step in creating an adapter will to define the configuration interface (this is used by all of the rest of the classes that make up an adapter).  Our adapter will have three configuration properties:

  • CTI Frequency.  A Current Time Indicator (CTI) is the way that adapter tells the StreamInsight engine what the current time is; that is to make the statement:

I, the adapter, say that I have received all incoming data up to time X. I will not expect you to account for any data that arrives after time X.

The effect of the CTI’s are to inform the StreamInsight engine that it’s OK to go ahead and process incoming events.  Without CTI’s advancing time in the engine, no output will ever be generated (as we haven’t informed the engine that it has enough data in order to do some work).  More CTIs (i.e. a lower frequency - a frequency of 1 will send a CTI with every data event, a frequency of 5 will send a CTI every 5 data events. It would be more accurate to call it the CTI period, but the advance time settings object uses frequency, so that's what we use) mean a more responsive output stream; fewer CTIs will provide for more throughput at the expense of higher latency (as the engine can perform work in chunks, and won’t need to process as many CTIs).

The other configuration variables are related to the rate at which events are raised by the adapter.  The basic formula will be to wait a certain period in milliseconds between raising events as defined by:

EventPeriod + Random(0 –> Event Period Random Offset)

  • Event Period.  The baseline period at which the adapter will generate simulated events in milliseconds (i.e. a value of 500 means that the adapter will raise an event twice a second, every 500 milliseconds).
  • Event Period Random Offset.  The maximum random offset on top of the event period.
  • Type Initializer.  In order to provide random field values, we’ll define a simple interface (ITypeInitializer<TPayload>, defined below), and pass in the Assembly Qualified Type Name in the configuration (as passing in the actual type value will fail on DataContractSerialization).


    1:   public interface ITypeInitializer<TPayload>
    2:      {
    3:          void FillValues(TPayload obj);
    4:      }

Create a solution folder called SimulatedInputAdapter.   Inside that folder create a new class SimulatedInputAdapterConfig with the following content:

    2:  namespace IntroHost.SimulatedInputAdapter
    3:  {
    4:      /// <summary>
    5:      /// This is the configuration type for the SimulatedDataInputFactory.  Use instances
    6:      /// of this class to configure how often the simulated data input adapter raises events.
    7:      /// </summary>
    8:      public struct SimulatedInputAdapterConfig
    9:      {
   10:          /// <summary>
   11:          /// How often to send a CTI event (1 = send a CTI event with every data event)
   12:          /// </summary>
   13:          public uint CtiFrequency { get; set; }
   15:          /// <summary>
   16:          /// The baseline period at which the adapter will generate simulated 
   17:          /// events in milliseconds (i.e. a value of 500 means that the adapter 
   18:          /// will raise an event twice a second, every 500 milliseconds).
   19:          /// </summary>
   20:          public int EventPeriod { get; set; }
   22:          /// <summary>
   23:          /// The maximum random offset on top of the event period.
   24:          /// </summary>
   25:          public int EventPeriodRandomOffset { get; set; } 
   26:      }
   27:  }


Creating the Adapter Factory

Adapters are not directly instantiated in your application, but rather created by the StreamInsight engine in response to a query starting up, based on the binding configuration.  This has a couple of implications for developing adapters:

  • Adapter instances are created via a factory interface.
  • Adapters must be able to instantiate based on a serialized configuration file
    • This is why you can’t hand a class instance reference or delegate directly to an adapter.  Instead, you need to provide some sort of lookup mechanism (typically through a singleton or static class method) based on an entry in the adapter config file.

Adapter factories must be public classes, implementing one of the adapter factory interfaces (depending on the type of adapter):

Interface Implements a
ITypedInputAdapterFactory Typed input adapter that receives strongly typed events.
IInputAdapterFactory Untyped input adapter that infers the event type at run-time (for example, based on a SQL table’s schema).
ITypedOutputAdapterFactory Typed output adapter that emits strongly typed events (that have to align with the output type of the bound query).
IOutputAdapterFactory Untyped output adapter that emits loosely typed events based on the fields in output of the bound query.

Optionally, factories can implement declarative CTI behavior by implementing the ITypedDeclareAdvanceTimeProperties interface.  This is the recommended option for when an adapter does not need to implement fine-grained control of advancing time.

Future posts will delve into more detail on creating various flavors of adapters; for now, we’ll simply create a basic typed input adapter that lets us raise random events (i.e. simulated data).  The responsibility of the adapter factory is to handle any cross-adapter responsibilities, and create an instance of the appropriate adapter type (Point, Interval, Edge) when requested.

    1:   public class SimulatedInputFactory : ITypedInputAdapterFactory<SimulatedInputAdapterConfig>,
    2:          ITypedDeclareAdvanceTimeProperties<SimulatedInputAdapterConfig>
    3:      {
    4:          public InputAdapterBase Create<TPayload>(SimulatedInputAdapterConfig configInfo, 
    5:              EventShape eventShape)
    6:          {
    7:              throw new NotImplementedException();
    8:          }
   10:          public void Dispose()
   11:          {
   12:              throw new NotImplementedException();
   13:          }
   15:          public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(
   16:              SimulatedInputAdapterConfig configInfo, EventShape eventShape)
   17:          {
   18:              throw new NotImplementedException();
   19:          }
   20:      }

We’re implementing two interfaces here – one to declare that we’re an adapter factory, the other to declaratively inject CTI events into the stream based on a policy (as defined in the configuration class). 

Note that throughout this application I make use of a helper class called StreamInsightLog - creating a general purpose logging wrapper for StreamInsight is discussed in this post.

Let’s go ahead and implement each of the interfaces for generating the appropriate adapter instances.

    1:  using System;
    2:  using System.Diagnostics;
    3:  using Microsoft.ComplexEventProcessing;
    4:  using Microsoft.ComplexEventProcessing.Adapters;
    6:  namespace IntroHost.SimulatedInputAdapter
    7:  {
    8:      public class SimulatedInputFactory : ITypedInputAdapterFactory<SimulatedInputAdapterConfig>,
    9:          ITypedDeclareAdvanceTimeProperties<SimulatedInputAdapterConfig>
   10:      {
   11:          private static readonly string ADAPTER_NAME = "SimulatedInput";
   12:          private static readonly StreamInsightLog trace = new StreamInsightLog(ADAPTER_NAME);
   14:          /// <summary>
   15:          /// Based on a configuration and an event type generate an adapter reference
   16:          /// </summary>
   17:          public InputAdapterBase Create<TPayload>(SimulatedInputAdapterConfig configInfo, 
   18:              EventShape eventShape)
   19:          {
   20:              InputAdapterBase ret = default(InputAdapterBase);
   21:              switch (eventShape)
   22:              {
   23:                  case EventShape.Point:
   24:                      ret = new SimulatedInputPointAdapter<TPayload>(configInfo);
   25:                      break;
   27:                  case EventShape.Interval:
   28:                      ret = new SimulatedInputIntervalAdapter<TPayload>(configInfo);
   29:                      break;
   31:                  case EventShape.Edge:
   32:                      ret = new SimulatedInputEdgeAdapter<TPayload>(configInfo);
   33:                      break;
   34:              }
   36:              return ret;
   37:          }
   39:          /// <summary>
   40:          /// No shared resources in the adapter - empty Dispose
   41:          /// </summary>
   42:          public void Dispose()
   43:          { }
   45:          /// <summary>
   46:          /// Declaratively advance application time (i.e. inject CTI's)
   47:          /// </summary>
   48:          /// <returns></returns>
   49:          public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(
   50:              SimulatedInputAdapterConfig configInfo, EventShape eventShape)
   51:          {
   52:              trace.LogMsg(TraceEventType.Information, 
   53:                  "Advance time policy: CTI f {0}, CTI offset {1}, time policy: {2}",
   54:                  configInfo.CtiFrequency, TimeSpan.FromTicks(-1), AdvanceTimePolicy.Adjust);
   56:              var timeGenSettings = new AdvanceTimeGenerationSettings(configInfo.CtiFrequency,
   57:                  TimeSpan.FromTicks(-1), true);
   59:              return new AdapterAdvanceTimeSettings(timeGenSettings, AdvanceTimePolicy.Adjust);
   60:          }
   61:      }
   62:  }

Adapter factories that don’t need to manage shared resources or context tend to be very simple.  This adapter factory only does two core things:

  • Based on an event shape generates the appropriate adapter instance. 
  • Declares a CTI policy that says for every N events (CtiFrequency) advance time in the engine.

Creating the Adapter Instances

From this baseline, let’s go ahead and add the rest of the adapter instances.  Each of these adapters will, on a timer, create, fill and enqueue an event of the appropriate type.  The filling of the event (i.e. filling in the payload fields) will be the responsibility of an external pluggable class.  Ordinarily we’d go ahead and use a lambda expression or two to inject the logic.  To avoid the complication of setting up such an expression that can round trip through the serialization experience, let’s use a interface such as ITypeInitializer<TPayload>, and define a TPayload class, and an ITypeInitializer<TPayload> class:

    1:     public interface ITypeInitializer<TPayload>
    2:      {
    3:          void FillValues(TPayload obj);
    4:      }

Coupled with an event type class that we’ll use in our application:

    1:     public class SimpleEventType
    2:      {
    3:          public string MeterId { get; set; }
    4:          public double Value { get; set; }
    5:          public DateTime Timestamp { get; set; } 
    6:      }
    8:      public class SimpleEventTypeFiller : ITypeInitializer<SimpleEventType>
    9:      {
   10:          private Random rand = new Random();
   12:          private string[] Meters = new string[]
   13:          {
   14:              "ValveOne",
   15:              "ValveTwo",
   16:              "ValveThree"
   17:          };
   19:          public void FillValues(SimpleEventType obj)
   20:          {
   21:              obj.MeterId = Meters[rand.Next(Meters.Length)];
   22:              obj.Value = rand.NextDouble() * 100;
   23:              obj.Timestamp = DateTime.Now;
   24:          }
   25:      }

This is a pretty basic setup that will let us raise SimpleEventType events, and randomly fill their values.  Using this interface to drive simulated input into Point, Interval and Edge adapters will look like this (Point input shown, Interval and Edge in the downloadable project with the differences in explained below):

    1:  using System;
    2:  using System.Collections.Generic;
    3:  using System.Linq;
    4:  using System.Text;
    5:  using Microsoft.ComplexEventProcessing;
    6:  using Microsoft.ComplexEventProcessing.Adapters;
    7:  using System.Diagnostics;
    9:  namespace IntroHost.SimulatedInputAdapter
   10:  {
   11:      /// <summary>
   12:      /// Simulated input adapter for point events.  Will periodically raise 
   13:      /// simulated event data.  If a class implementing the 
   14:      /// ITypeInitializer<TPayload> interface is specified in the configuration
   15:      /// the events will be filled in by an instance of that class.
   16:      /// </summary>
   17:      /// <typeparam name="TPayload"></typeparam>
   18:      public class SimulatedInputPointAdapter<TPayload> : 
   19:          TypedPointInputAdapter<TPayload>
   20:      {
   21:          /// <summary>
   22:          /// Store the simulated input configuration
   23:          /// </summary>
   24:          private SimulatedInputAdapterConfig config;
   26:          /// <summary>
   27:          /// Create a log object using the category name from the factory
   28:          /// </summary>
   29:          private StreamInsightLog trace = new StreamInsightLog(
   30:              SimulatedInputFactory.ADAPTER_NAME);
   32:          /// <summary>
   33:          /// The timer object responsible for periodically raising events
   34:          /// </summary>
   35:          private System.Threading.Timer myTimer;
   37:          /// <summary>
   38:          /// Reference to the type initializer if one is specified
   39:          /// </summary>
   40:          private ITypeInitializer<TPayload> init;
   42:          /// <summary>
   43:          /// Lock object used to synchronize access to raising events.  This is
   44:          /// primarily for use during debugging when stepping through the 
   45:          /// adapter code (as the timer will continue to fire, resulting in 
   46:          /// multiple threads trying to raise an event concurrently).
   47:          /// </summary>
   48:          private object lockObj = new object();
   50:          /// <summary>
   51:          /// Random object used to create the offset to when the next event
   52:          /// should be raised
   53:          /// </summary>
   54:          private Random rand;
   56:          public SimulatedInputPointAdapter(SimulatedInputAdapterConfig config)
   57:          {
   58:              // Hold onto the configuration, and generate a type initializer if
   59:              // one has been configured.
   60:              this.config = config;
   61:              if (this.config.TypeInitializer != null)
   62:              {
   63:                  init = (ITypeInitializer<TPayload>)Activator.CreateInstance(
   64:                      Type.GetType(config.TypeInitializer));
   65:              }
   68:          }
   70:          /// <summary>
   71:          /// All events are raised asynchronously.  If the timer fires while
   72:          /// the adapter is paused the RaiseEvent function fires it will 
   73:          /// immediately exit so we don't need to account for that here
   74:          /// </summary>
   75:          public override void Resume()
   76:          {
   77:          }
   79:          /// <summary>
   80:          /// Create the random time interval generator and the thread timer
   81:          /// used to schedule events being raised.  Start it 500 ms to give
   82:          /// everything time to settle out
   83:          /// </summary>
   84:          public override void Start()
   85:          {
   86:              rand = new Random();
   88:              myTimer = new System.Threading.Timer(
   89:                  new System.Threading.TimerCallback(RaiseEvent),
   90:                  null, 500, config.EventPeriod);
   91:          }
   93:          /// <summary>
   94:          /// When the timer fires, check for the state of the adapter; if running
   95:          /// raise a new simulated event
   96:          /// </summary>
   97:          /// <param name="state"></param>
   98:          private void RaiseEvent(object state)
   99:          {
  100:              // Ensure that the adapter is in the running state.  If we're 
  101:              // shutting down, kill the timer and signal Stopped()
  102:              if (AdapterState.Stopping == AdapterState)
  103:              {
  104:                  myTimer.Dispose();
  105:                  Stopped();
  106:              }
  107:              if (AdapterState.Running != AdapterState)
  108:                  return;
  110:              // Allocate a point event to hold the data for the incoming message.  
  111:              // If the event could not be allocated, exit the function
  112:              lock (lockObj)
  113:              {
  114:                  PointEvent<TPayload> currEvent = CreateInsertEvent();
  115:                  if (currEvent == null)
  116:                      return;
  117:                  currEvent.StartTime = DateTime.Now;
  119:                  // Create a payload object, and fill with values if we have a
  120:                  // an initializer defined
  121:                  currEvent.Payload = (TPayload)Activator.CreateInstance(typeof(TPayload));                
  122:                  if (init != null) 
  123:                      init.FillValues(currEvent.Payload);
  125:                  if (trace.ShouldLog(TraceEventType.Verbose))
  126:                  {
  127:                      trace.LogMsg(TraceEventType.Verbose, "INSERT - {0}",
  128:                          currEvent.FormatEventForDisplay(false));
  129:                  }
  131:                  // If the event cannot be enqueued, release the memory and signal that
  132:                  // the adapter is ready to process more events (via. Ready())
  133:                  if (EnqueueOperationResult.Full == Enqueue(ref currEvent))
  134:                  {                    
  135:                      ReleaseEvent(ref currEvent);
  136:                      Ready();
  137:                  }
  138:              }
  140:              // The next event will be raised at now + event period ms, plus a 
  141:              // random offset
  142:              int nextEventInterval = config.EventPeriod +
  143:                  rand.Next(config.EventPeriodRandomOffset);
  144:              myTimer.Change(nextEventInterval, nextEventInterval);
  145:          }
  146:      }
  147:  }



The key concept in this class is raising events from an asynchronous (push) source.  Many of the StreamInsight examples use a thread that polls a resource.  This is a perfectly suitable pattern for pull-based sources; however, our Timer is a push based source so a lot of the thread/loop code isn’t necessary.  All of the state management code (checking for stop, etc) happens in the asynchronous delegate (i.e. RaiseEvent).

Looking on line 121, you may see a curious (old-school) construct, the use of Activator.CreateInstance to generate an object from a type definition at run-time.  Some amongst you might ask why I didn’t use a generic constraint on the adapter definition such as

TypedPointInputAdapter<TPayload> where TPayload: class, new()

Here’s what happens when I implement this change:

Error 1 'TPayload' must be a non-abstract type with a public parameterless constructor in order to use it as parameter 'TPayload' in the generic type or method 'IntroHost.SimulatedInputAdapter.SimulatedInputPointAdapter<TPayload>'   

Error 2 The type 'TPayload' must be a reference type in order to use it as parameter 'TPayload' in the generic type or method 'IntroHost.SimulatedInputAdapter.SimulatedInputPointAdapter<TPayload>'   

Ok, no big deal – we’ll add the constraint to the factory class as well, by a

   public InputAdapterBase Create<TPayload>(SimulatedInputAdapterConfig configInfo,             EventShape eventShape) where TPayload : class, new()

Declaration.  Now here’s where things get interesting – since the factory object implements a non-generic interface…

The constraints for type parameter 'TPayload' of method 'IntroHost.SimulatedInputAdapter.SimulatedInputFactory.Create<TPayload>(IntroHost.SimulatedInputAdapter.SimulatedInputAdapterConfig, Microsoft.ComplexEventProcessing.EventShape)' must match the constraints for type parameter 'TPayload' of interface method 'Microsoft.ComplexEventProcessing.Adapters.ITypedInputAdapterFactory< IntroHost.SimulatedInputAdapter.SimulatedInputAdapterConfig>.Create<TPayload>(IntroHost.SimulatedInputAdapter.SimulatedInputAdapterConfig, Microsoft.ComplexEventProcessing.EventShape)'. Consider using an explicit interface implementation instead.   

A long way of saying we can’t really impose constraints on a non-generic interface further up the inheritance chain, hence the usage of Activator.

The interval and edge events are almost exactly the same, save for how they define the StartTime and EndTime.  The Interval event adapter defines EndTime based on the random interval (on line 15):

    1:     // Allocate a point event to hold the data for the incoming message.  
    2:              // If the event could not be allocated, exit the function
    3:              lock (lockObj)
    4:              {
    5:                  // The next event will be raised at now + event period ms, plus a 
    6:                  // random offset
    7:                  int nextEventInterval = config.EventPeriod +
    8:                      rand.Next(config.EventPeriodRandomOffset);
    9:                  myTimer.Change(nextEventInterval, nextEventInterval);
   11:                  IntervalEvent<TPayload> currEvent = CreateInsertEvent();
   12:                  if (currEvent == null)
   13:                      return;
   14:                  currEvent.StartTime = DateTime.Now;
   15:                  currEvent.EndTime = currEvent.StartTime.AddMilliseconds(nextEventInterval);
   17:                  // Create a payload object, and fill with values if we have a
   18:                  // an initializer defined
   19:                  currEvent.Payload = (TPayload)Activator.CreateInstance(typeof(TPayload));
   20:                  if (init != null)
   21:                      init.FillValues(currEvent.Payload);
   23:                  if (trace.ShouldLog(TraceEventType.Verbose))
   24:                  {
   25:                      trace.LogMsg(TraceEventType.Verbose, "INSERT - {0}",
   26:                          currEvent.FormatEventForDisplay(false));
   27:                  }
   29:                  // If the event cannot be enqueued, release the memory and signal that
   30:                  // the adapter is ready to process more events (via. Ready())
   31:                  if (EnqueueOperationResult.Full == Enqueue(ref currEvent))
   32:                  {
   33:                      ReleaseEvent(ref currEvent);
   34:                      Ready();
   35:                  }
   36:              }

The edge adapter remembers the previous payload, and also inserts an End event to close out the previous start event.  In this case, every edge event is closed when a new edge shows up.  This is not a realistic scenario, as the more typical case would be that edge events are closed by specific conditions (such as a new reading for a given meter), not by any new event.  I’ll cover techniques for addressing this in a future post on integrating reference data.

    1:    private EdgeEvent<TPayload> lastEvent = null;
    3:          /// <summary>
    4:          /// When the timer fires, check for the state of the adapter; if running
    5:          /// raise a new simulated event
    6:          /// </summary>
    7:          /// <param name="state"></param>
    8:          private void RaiseEvent(object state)
    9:          {
   10:              // Ensure that the adapter is in the running state.  If we're 
   11:              // shutting down, kill the timer and signal Stopped()
   12:              if (AdapterState.Stopping == AdapterState)
   13:              {
   14:                  myTimer.Dispose();
   15:                  Stopped();
   16:              }
   17:              if (AdapterState.Running != AdapterState)
   18:                  return;
   20:              // Allocate a point event to hold the data for the incoming message.  
   21:              // If the event could not be allocated, exit the function
   22:              lock (lockObj)
   23:              {
   24:                  // The next event will be raised at now + event period ms, plus a 
   25:                  // random offset
   26:                  int nextEventInterval = config.EventPeriod +
   27:                      rand.Next(config.EventPeriodRandomOffset);
   28:                  myTimer.Change(nextEventInterval, nextEventInterval);
   30:                  if (lastEvent != null)
   31:                  {
   32:                      EdgeEvent<TPayload> closeEvent = CreateInsertEvent(EdgeType.End);
   33:                      closeEvent.StartTime = lastEvent.StartTime;
   34:                      closeEvent.EndTime = DateTimeOffset.Now;
   35:                      closeEvent.Payload = lastEvent.Payload;
   37:                      // If the event cannot be enqueued, release the memory and signal that
   38:                      // the adapter is ready to process more events (via. Ready())
   39:                      if (EnqueueOperationResult.Full == Enqueue(ref closeEvent))
   40:                      {
   41:                          ReleaseEvent(ref closeEvent);
   42:                          Ready();
   43:                      }
   45:                      lastEvent = null;
   46:                  }
   48:                  EdgeEvent<TPayload> currEvent = CreateInsertEvent(EdgeType.Start);
   49:                  if (currEvent == null)
   50:                      return;
   51:                  currEvent.StartTime = DateTime.Now;
   52:                  currEvent.EndTime = currEvent.StartTime.AddMilliseconds(nextEventInterval);
   54:                  // Create a payload object, and fill with values if we have a
   55:                  // an initializer defined
   56:                  currEvent.Payload = (TPayload)Activator.CreateInstance(typeof(TPayload));
   57:                  if (init != null)
   58:                      init.FillValues(currEvent.Payload);
   60:                  if (trace.ShouldLog(TraceEventType.Verbose))
   61:                  {
   62:                      trace.LogMsg(TraceEventType.Verbose, "INSERT - {0}",
   63:                          currEvent.FormatEventForDisplay(false));
   64:                  }
   66:                  // If the event cannot be enqueued, release the memory and signal that
   67:                  // the adapter is ready to process more events (via. Ready())
   68:                  if (EnqueueOperationResult.Full == Enqueue(ref currEvent))
   69:                  {
   70:                      ReleaseEvent(ref currEvent);
   71:                      Ready();
   72:                  }
   74:                  // Remember the start edge
   75:                  lastEvent = currEvent;
   76:              }
   77:          }


That’s it – your first input adapter raising simulated data.  Seems like a lot of work just to raise a few simple events, but this scaffolding can be reused to create very complex, rich adapters.  It gets easier from here :)

Formatting Events for Display

Hey – where did that magical FormatEventForDisplay function come from?  If you look in the sample project, you’ll see a StreamInsightUtils.cs file that contains a number of helper functions:

 public static string FormatEventForDisplay<TPayload>(this TypedEvent<TPayload> evt,
            bool verbose)

public static void AddPayloadDetailsList<TPayload>(StringBuilder sb,
            TypedEvent<TPayload> evt)

public static void AddPayloadDetailsRow<TPayload>(StringBuilder sb,
            TypedEvent<TPayload> evt)

public static void AddHeaderRow<TPayload>(StringBuilder sb)

Along with their UntypedEvent equivalents.  Won’t dive into the details here; each of these functions iterates over the contents of the payload object (using reflection for TypedEvents and the CepEventType for UntypedEvents and pumps out the details.  This handles the differences between Point, Interval and Edge events automatically.

Building an Untyped output adapter for writing to console

Now that we’ve built an input adapter, it’s time to build the corresponding output adapter.  This particular adapter will take untyped events coming from a query and write them to the windows or debug console.  The steps are very much the same as for building an input adapter, with the exception of not having to manage CTI’s (as the output adapter will be receiving CTI’s from the engine, as opposed to being responsible for creating them).

Defining the Configuration File

In our configuration file we’ll define three key properties:

  • DisplayCtiEvents.  Whether or not to include CTI events in the output.  This is helpful for understanding when and how the StreamInsight engine is advancing time and processing events (and in identifying window boundaries – each time a window is processed a CTI will be emitted).
  • SingleLine.  Whether or not to output data in a row-centric (single line) format, or in a detailed list.
  • Target.  The trace target (debug or console), using the TraceTarget enum.
    1:   /// <summary>
    2:      /// Possible trace types.
    3:      /// </summary>
    4:      public enum TraceTarget
    5:      {
    6:          /// <summary>
    7:          /// Write messages to the debug output.
    8:          /// </summary>
    9:          Debug,
   11:          /// <summary>
   12:          /// Write messages to the console.
   13:          /// </summary>
   14:          Console,
   15:      }
   17:      /// <summary>
   18:      /// Configuration structure for tracer output adapters.
   19:      /// </summary>
   20:      public class ConsoleAdapterConfig
   21:      {
   22:          /// <summary>
   23:          /// Gets or sets a value indicating whether or not to display CTI events in the output stream.
   24:          /// </summary>
   25:          public bool DisplayCtiEvents { get; set; }
   27:          /// <summary>
   28:          /// Gets or sets a value indicating which output stream to use.
   29:          /// </summary>
   30:          public TraceTarget Target { get; set; }
   32:          /// <summary>
   33:          /// Gets or sets a value indicating whether to output each event in
   34:          /// a single line or use a more verbose output format.
   35:          /// </summary>
   36:          public bool SingleLine { get; set; }
   37:      }

Note – this is a stripped down version from the sample on codeplex, and doesn’t include writing to files or to the .NET Trace facility.

Creating the Adapter Factory

The adapter factory is very similar to the input adapter factory; note the differences for using an Untyped set of adapters.  The event type is passed into the Create method via a CepEventType class (which has the key/value pairs and data types) rather than being defined through a generic.

    1:    public class ConsoleAdapterFactory : IOutputAdapterFactory<ConsoleAdapterConfig>
    2:      {
    3:          internal static readonly string APP_NAME = "ConsoleOutput";
    5:          /// <summary>
    6:          /// Create an instance of a console output adapter that dumps received
    7:          /// events to the .NET Debug or Console window.
    8:          /// </summary>
    9:          public OutputAdapterBase Create(ConsoleAdapterConfig configInfo, 
   10:              EventShape eventShape, CepEventType cepEventType)
   11:          {
   12:              OutputAdapterBase ret = default(OutputAdapterBase);
   13:              switch(eventShape)
   14:              {
   15:                  case EventShape.Point:
   16:                      ret = new ConsolePointOutputAdapter(configInfo, cepEventType);
   17:                      break;
   19:                  case EventShape.Interval:
   20:                      ret = new ConsoleIntervalOutputAdapter(configInfo, cepEventType);
   21:                      break;
   23:                  case EventShape.Edge:
   24:                      ret = new ConsoleEdgeOutputAdapter(configInfo, cepEventType);
   25:                      break;
   26:              }
   27:              return ret;
   28:          }
   30:          public void Dispose()
   31:          {
   33:          }
   34:      }


Creating the Adapter Instances



To implement each of the untyped adapter instances we need to extend the Point/Interval/Edge AdapterBase class, and handle the various Start/Resume aspects of pulling data out of a queue then posting it to the destination data sink (in this case the Console or the Debug streams).

    1:  internal sealed class ConsolePointOutputAdapter : PointOutputAdapter
    2:      {
    3:          private StreamInsightLog trace;
    4:          private ConsoleAdapterConfig config;
    5:          private CepEventType eventType;
    7:          public ConsolePointOutputAdapter(ConsoleAdapterConfig config, 
    8:              CepEventType type)
    9:          {
   10:              trace = new StreamInsightLog(ConsoleAdapterFactory.APP_NAME);
   11:              this.config = config;
   12:              this.eventType = type;
   13:          }
   15:          /// <summary>
   16:          /// Start() is called when the engine wants to let the adapter start producing events.
   17:          /// This method is called on a threadpool thread, which should be released as soon as possible.
   18:          /// </summary>
   19:          public override void Start()
   20:          {
   21:              new Thread(this.ConsumeEvents).Start();
   22:          }
   24:          /// <summary>
   25:          /// Resume() is called when the engine is able to produce further events after having been emptied
   26:          /// by Dequeue() calls. Resume() will only be called after the adapter called Ready().
   27:          /// This method is called on a threadpool thread, which should be released as soon as possible.
   28:          /// </summary>
   29:          public override void Resume()
   30:          {
   31:              new Thread(this.ConsumeEvents).Start();
   32:          }
   34:          /// <summary>
   35:          /// Main worker thread function responsible for dequeueing events and 
   36:          /// posting them to the output stream.
   37:          /// </summary>
   38:          private void ConsumeEvents()
   39:          {
   40:              PointEvent currentEvent = default(PointEvent);
   42:              try
   43:              {
   44:                  while (true)
   45:                  {
   46:                      if (AdapterState.Stopping == AdapterState)
   47:                      {
   48:                          Stopped();
   49:                          return;
   50:                      }
   52:                      // Dequeue the event. If the dequeue fails, then the adapter state is suspended
   53:                      // or stopping. Assume the former and call Ready() to indicate
   54:                      // readiness to be resumed, and exit the thread.
   55:                      if (DequeueOperationResult.Empty == Dequeue(out currentEvent))
   56:                      {
   57:                          Ready();
   58:                          return;
   59:                      }
   61:                      string writeMsg = String.Empty;
   63:                      if (currentEvent.EventKind == EventKind.Insert)
   64:                      {
   65:                          writeMsg = currentEvent.FormatEventForDisplay(eventType, 
   66:                              !config.SingleLine);
   67:                      }
   68:                      else if (currentEvent.EventKind == EventKind.Cti)
   69:                      {
   70:                          writeMsg = String.Format("CTI - {0}",
   71:                              currentEvent.StartTime.ToString("hh:mm:ss.fff"));
   72:                      }
   74:                      if (config.Target == TraceTarget.Console)
   75:                          Console.WriteLine(writeMsg);
   76:                      else if (config.Target == TraceTarget.Debug)
   77:                          Debug.Write(writeMsg);
   79:                      // Every received event needs to be released.
   80:                      ReleaseEvent(ref currentEvent);
   81:                  }
   82:              }
   83:              catch (Exception e)
   84:              {
   85:                  trace.LogException(e, "Error in console adapter dequeue");
   86:              }
   87:          }
   88:      }

Very similar to what you’ve seen before in the input adapter, leveraging a few more of the formatting display helper classes to clean up the message for output.  The edge and interval events are identical to this (save for the different base class) as each leverages the helper classes to abstract away the differences.

Hey, wait a minute – both the Start() and Resume() methods are identical – what’s up with that? In the case of this adapter, the output resource (the Debug and Console streams) don’t need to be explicitly initialized.  In the case of something like a SQL output adapter, additional initialization and connection management code would be required in the Start() method.

In Conclusion

Wow.. little bit of code thrown around there.  Ok, deep breath.  This is not the normal, run of the mill StreamInsight experience of connecting and configuring adapters, then focusing on writing queries.  This is a from the ground up, end to end project.  For many of your applications, adapters can be pulled (or adapted, pun intended) from the codeplex samples.  Having been through this exercise, all of the code therein should make a lot more sense.

Key Takeaways:

  • You don’t have to go through this exercise every time to build out very powerful StreamInsight applications :)

  • Input and output adapters can be typed or untyped, and can operate in both push (asynchronous) and pull (polling) modes depending on their implementation.

  • Current Time Indicators (CTI’s) advance application time – they let the StreamInsight engine know that it has received all of the data up to time X, and can process events.

    • If your query does not output any results, it is typically related to CTIs not being properly injected into the incoming data streams.
    • CTIs can be injected into streams automatically by implementing ITypedDeclareAdvanceTimeProperties in the factory.
  • With adapters in place, query development and binding is pretty smooth.