StreamInsight Example: Server - Exposing an Embedded Server

 

This example demonstrates the process of creating an embedded StreamInsight server and exposing it for client programs to use as a remote server. In addition to creating the server and making it available, this example functions as a client itself, creating a source and query, binding to a sink, and running the binding as a process. For more information about StreamInsight entities, see StreamInsight Concepts.

The entities created in this example are designed to be used by the other examples in this section. To use the examples in this section together, do the following:

  1. Run this server example

  2. Run one or both of the client examples:

Step-By-Step

In general, a typical StreamInsight client follows these basic steps:

  • Create a StreamInsight server instance

  • Create or get a StreamInsight application

  • Define or get a source

  • Compose a query over the source

  • Define or get a sink

  • Bind and run the query and sink

In this example, the program creates all the entities it needs and deploys them to the server for other clients to use.

Create the Server Instance

The process of creating a StreamInsight program begins with the instantiation of a StreamInsight server instance. In this example, the server is embedded within the program.

server = Server.Create("Default");  

A server must be created using an instance name that has been registered on the machine through the StreamInsight setup process (in this example, Default). For more information, see Installation (StreamInsight).

Next, an endpoint for the embedded server is exposed so that client StreamInsight programs can connect and use this as a remote StreamInsight server.

  
var host = new ServiceHost(server.CreateManagementService());  
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");  
  

Client programs can also use the StreamInsight Host Windows service created when you installed StreamInsight. For more information on the options available for connecting to a StreamInsight server, see Publishing and Connecting to the StreamInsight Server.

Create the Application

An application represents a scoping unit in the server. All other entities are created in the application.

var myApp = server.CreateApplication("serverApp");  

Define and Deploy a Source

Next, an input source is defined and deployed to the server with a name so that it can be used by other StreamInsight clients. In this example, the data is a simple temporal stream of point events generated every second.

  
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);  
mySource.Deploy("serverSource");  
  

Compose a Query Over the Source

Next, compose a query over the input source. The query uses LINQ as the query specification language. In this example, the query returns the value of every even-numbered event.

  
var myQuery = from e in mySource  
              where e % 2 == 0  
              select e;  
  

Technically, this definition translates to a filter operator that drops all events from the sequence that do not fulfill the filter predicate (where e % 2 == 0) and returns the event value. For more information about LINQ query operators, see Using StreamInsight LINQ.

Define and Deploy a Sink

Next, an output sink is created that can be bound to the query and process the resulting sequence. In this example, a simple function is created that simply writes the stream values to the console.

var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));  

The sink is then deployed to the server with a name.

mySink.Deploy("serverSink");  

Bind and Run the Query and Sink

At this point, the observable query can be bound to the observer output sink and then run in a process in the server.

var proc = myQuery.Bind(mySink).Run("serverProcess");  

In the complete example below, this process continues to run until the user stops it by typing in the console.

Complete Example

The following example combines the components described earlier to create a complete application. For simplicity, this example does not check for possible error conditions.

  
using System;  
using System.ServiceModel;  
using Microsoft.ComplexEventProcessing;  
using Microsoft.ComplexEventProcessing.Linq;  
using Microsoft.ComplexEventProcessing.ManagementService;  
using System.Reactive;  
using System.Reactive.Linq;  
  
namespace StreamInsight21_example_Server  
  
    /* This example:  
     * creates an embedded server instance and makes it available to other clients  
     * defines, deploys, binds, and runs a simple source, query, and sink  
     * waits for the user to stop the server  
     */  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            // Create an embedded StreamInsight server  
            using (var server = Server.Create("Default"))  
            {  
                // Create a local end point for the server embedded in this program  
                var host = new ServiceHost(server.CreateManagementService());  
                host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "https://localhost/MyStreamInsightServer");  
                host.Open();  
  
                /* The following entities will be defined and available in the server for other clients:  
                 * serverApp  
                 * serverSource  
                 * serverSink  
                 * serverProcess  
                 */  
  
                // CREATE a StreamInsight APPLICATION in the server  
                var myApp = server.CreateApplication("serverApp");  
  
                // DEFINE a simple SOURCE (returns a point event every second)  
                var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);  
  
                // DEPLOY the source to the server for clients to use  
                mySource.Deploy("serverSource");  
  
                // Compose a QUERY over the source (return every even-numbered event)  
                var myQuery = from e in mySource  
                              where e % 2 == 0  
                              select e;  
  
                // DEFINE a simple observer SINK (writes the value to the server console)  
                var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));  
  
                // DEPLOY the sink to the server for clients to use  
                mySink.Deploy("serverSink");  
  
                // BIND the query to the sink and RUN it  
                using (var proc = myQuery.Bind(mySink).Run("serverProcess"))  
                {  
                    // Wait for the user stops the server  
                    Console.WriteLine("----------------------------------------------------------------");  
                    Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");  
                    Console.WriteLine("----------------------------------------------------------------");  
                    Console.WriteLine(" ");  
                    Console.ReadLine();  
                }  
                host.Close();  
            }  
        }  
    }  
}  
  

See Also

StreamInsight Examples
StreamInsight Concepts