StreamInsight Example: Client B - Creating a Subject

 

This example demonstrates the process of creating a StreamInsight client that uses a remote server and entities defined in the server. This example in particular demonstrate how to create a subject that is bound to multiple sources and sinks. The subject accepts data as it arrives from either source stream and delivers it to both sinks. For more information about StreamInsight entities, see StreamInsight Concepts.

This example uses the remote server and entities created in the server example in this section. To use the examples in this section together, do the following:

  1. Run the server example StreamInsight Example: Server - Exposing an Embedded Server

  2. Run one or both of the client examples:

Step-By-Step

In general, a typical StreamInsight client follows these basic steps:

  • Create a StreamInsight server instance

  • Create or get a StreamInsight application

  • Define or get a source

  • Compose a query over the source

  • Define or get a sink

  • Bind and run the query and sink

In this example, the client gets an existing application and entities from the server, but the program also creates an additional source, sink, and a subject.

Connect to the Server

The process of creating a StreamInsight client program begins with the instantiation of a StreamInsight server instance. In this example, the client connects to a remote server called “MyStreamInsightServer”.

var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer"))

For more information on the options available for connecting to a StreamInsight server, see Publishing and Connecting to the StreamInsight Server.

Get the Server Application

In this example, the client uses the StreamInsight application that has been created in the remote server. All the server entities this client will use have been defined within this application, and new entities created will be created within the same application.

myApp = server.Applications["serverApp"];

Get the Server Source and Define a New Source

Next, get the source that has been defined on the server and define a new source. In this example, the data in this second source is a simple temporal stream of point events generated every second.

var mySource = myApp.GetObservable<int>("serverSource");
var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);

Compose Queries Over the Sources

Next, compose queries over the two sources. In this example, the first query retrieves every even data value from the server source and returns the value + 2000, and the second query retrieves every odd data value from the second source and returns the value + 3000.

var myQuery = from e in mySource
              where e % 2 == 0
              select e + 2000;
var myQueryB = from e in mySourceB
               where e % 2 == 1
               select e + 3000;

Get the Server Sink and Define a New Sink

Next, get the sink that has been defined on the server and define a new sink. In this example, the second sink is a simple function that simply writes the stream values to the console.

var mySink = myApp.GetObserver<int>("serverSink");
var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));

Create a Subject

Next, create a subject. A subject is an object in the server that can be bound to both sources and sinks, consuming data from sources and delivering data to sinks.

var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());

Bind and Run the Queries and Sinks

Now bind the subject to each of the sinks: the sink previously defined in the server, and the sink defined by this client. Each binding is run as a separate process. When the subject is bound to the sink no data flows yet because no source has been bound. If the source had been bound to the subject before a sink was bound, then data would begin to flow immediately to the subject and be lost.

var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");

Next, bind the subjects to the queries. When these processes run then the data begins to flow from the sources through the queries to each of the sinks.

var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");

This client has now defined a number of entities in the server:

  • serverSubject_Client_B

  • serverProcess_Client_B_1

  • serverProcess_Client_B_2

  • serverProcess_Client_B_3

  • serverProcess_Client_B_4

Complete Example

The following example combines the components described earlier to create a complete application. For simplicity, this example does not check for possible error conditions, but assumes that the server from the example, StreamInsight Example: Server - Exposing an Embedded Server, is running and that the expected entities have been created.

using System;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace StreamInsight21_example_Client_B
    /* This example:
     * connects to a remote server
     * gets the app and source defined in the server
     * defines a second source
     * creates simple queries over the 2 sources
     * gets the sink defined in the server
     * defines a second sink
     * binds and runs the subject to both sinks
     * binds and runs the subject to both queries
     * waits for the user to stop the program
     */
{
    class Program
    {
        static void Main(string[] args)
        {
            // Connect to the StreamInsight server
            using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/MyStreamInsightServer")))
            {
                /* The following entities are expected to be defined in the server:
                 * serverApp
                 * serverSource
                 * serverSink
                 */
                /* The following entities will be defined in the server by this client:
                 * serverSubject_Client_B
                 * serverProcess_Client_B_1
                 * serverProcess_Client_B_2
                 * serverProcess_Client_B_3
                 * serverProcess_Client_B_4
                 */

                // Get the existing StreamInsight APPLICATION
                var myApp = server.Applications["serverApp"];

                // GET the SOURCE from the server
                var mySource = myApp.GetStreamable<long>("serverSource");

                // DEFINE a second SOURCE (returns a point event every second)
                var mySourceB = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);

                // COMPOSE a QUERY on the server source (return every even-numbered item + 2000)
                var myQuery = from e in mySource
                              where e % 2 == 0
                              select e + 2000;

                // COMPOSE a QUERY on the second source (return every odd-numbered item + 3000)
                var myQueryB = from e in mySourceB
                               where e % 2 == 1
                               select e + 3000;

                // GET the SINK from the server
                var mySink = myApp.GetObserver<long>("serverSink");

                // DEFINE a second SINK
                var mySinkB = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Client_B: {0}", x)));

                // CREATE a SUBJECT
                var mySubject = myApp.CreateSubject<long,long>("serverSubject_Client_B", () => new Subject<long>());

                // BIND the SINKS to the SUBJECT
                var procB1 = mySubject.Bind(mySink).Run("serverProcess_Client_B_1");
                var procB2 = mySubject.Bind(mySinkB).Run("serverProcess_Client_B_2");

                // BIND the SOURCES to the SUBJECT
                var procB3 = myQuery.Bind(mySubject).Run("serverProcess_Client_B_3");
                var procB4 = myQueryB.Bind(mySubject).Run("serverProcess_Client_B_4");

                // Wait for the user to stop the program
                Console.WriteLine("----------------------------------------------------------------");
                Console.WriteLine("Client B is running, press Enter to exit the client");
                Console.WriteLine("----------------------------------------------------------------");
                Console.WriteLine(" ");
                Console.ReadLine();

                // Remove the entities we created
                myApp.Entities["serverSubject_Client_B"].Delete();
                procB1.Dispose();
                procB2.Dispose();
                procB3.Dispose();
                procB4.Dispose();
            }
        }
    }
}

See Also

StreamInsight Example: Server - Exposing an Embedded Server
StreamInsight Examples
StreamInsight Concepts