在執行階段撰寫查詢

在執行階段撰寫 StreamInsight 查詢會提供查詢彈性、重複可用性、資源的有效使用性及維護的方便性。它可讓您:

  • 在相同伺服器上提供一個查詢的查詢結果給其他查詢。

  • 取用其他執行中查詢的輸出,就像取用輸入配接器中的事件一樣。

假設有兩個已撰寫的查詢,送入查詢 2 的查詢 1 在隔離模式中執行。如果查詢 1 失敗,查詢 2 的狀態不會受到影響,反之亦然。查詢 1 和查詢 2 可以單獨啟動和停止。例如,您可以停止查詢 1、使用另一個查詢來取代它,然後再次啟動它。

本主題描述幾個使用案例以及在執行階段動態撰寫查詢的範例。

重複使用現有查詢的輸出

多個查詢的一個常見使用案例是需要設計及部署一個主要查詢來前置處理資料並將資料傳送給輸出配接器,而其他查詢則會取用這個查詢的結果,並將其本身的結果傳送給其他輸出配接器。這個案例顯示於下圖中。

查詢 2 從查詢 1 取用資料

下列範例表示一個查詢,這個查詢建立在 StreamInsight 伺服器上的現有應用程式 myApp 中。

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

若要將這個查詢的結果以資料流形式傳送給第二個查詢,將會使用 Query.ToStream() 方法。符合主要查詢之輸出裝載的類型會指定為泛型參數,如下列範例所示。

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

在這個範例中會存取主要查詢的輸出資料流,而且會套用投射運算子來引進名為 Status 的新欄位。第二個 ToQuery() 呼叫不再需要應用程式物件,因為它可以從主要查詢來推斷它。

如果必須在該點插入目前時間增量 (CTI),ToStream() 方法會採用選擇性 AdvanceTimeSettings 物件。插入 CTI 有助於增加某些查詢組態的有效性。

請注意,主要查詢物件的建立方式並不重要。上一個模型會示範使用 CepStream.ToQuery() API 的範例。其他可能性是建立查詢:

  • 透過查詢繫結器。例如,myApp.CreateQuery("filterQuery", queryBinder, "description");

  • 從伺服器透過物件模型 API 來擷取它。例如,myApp.Queries["filterQuery"]

解除繫結查詢輸出

上一個範例示範如何重複使用現有查詢的結果 (在此查詢中,已經將它的輸出繫結到輸出配接器)。查詢也可以擁有解除繫結的輸出資料流,如此就不會產生任何輸出 (除非一個或多個其他查詢取用它的結果),這是替代方法。這個案例顯示於下圖中。

查詢 1 有解除繫結查詢資料流

這會藉由使用不需要配接器的 CepStream.ToQuery() 多載來達成:

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

可以啟動這個查詢。第二個查詢之後可以取用它的結果資料流,其方式是依照上一個範例所示的方式針對查詢 validationQuery 來指定它。如果沒有任何取用者,就會捨棄主要查詢的結果。

這個模式也可讓您將查詢結果當做資料流傳送給多個輸出配接器。在最簡單的案例中,可以在解除繫結的查詢上層使用通過查詢來達成這個目的,每一個輸出配接器使用一個通過查詢 (上圖中的查詢 2 和 3)。

發行的資料流

到目前為止,範例會使用實際查詢物件,以便為另一個查詢建立新的輸入資料流。為了擷取用戶端物件,您可以使用發行的資料流 URI 當做一個或多個其他查詢的輸入,如下圖所示。

查詢使用已發行的資料流做為輸入。

每一個查詢都有預設的已發行資料流統一資源識別碼 (URI),這是查詢名稱本身。此外,您也可以透過適當的 CepStream 類別成員,明確地將自訂的已發行資料流名稱指派給查詢。

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

這樣會建立一個查詢,其中包含解除繫結但是已明確命名的輸出。請注意,已發行的資料流名稱必須遵循以下慣例:"<application_name>/PublishedStream/<stream_name>"。

另一個查詢現在可以參考這個 URI 當做它的輸入資料流,如下列範例所示。

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

請注意,已發行之資料流的取用者必須指定輸入事件圖形,這個圖形必須符合參考之查詢的輸出圖形。

透過已發行的資料流名稱連接到主要查詢比起透過查詢物件來連接較不緊密。因此,當定義次要查詢時,必須提供下列項目給應用程式:

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

發行的資料流配接器

當擷取撰寫之查詢的配接器時 (例如透過 Query.InputStreamBindings),您會注意到用來連接它們的特殊內建配接器。透過 CepStream.ToQuery, Query.ToStream() 撰寫查詢等類似功能 (如上面所示) 是這些內建配接器上層的方便介面。也可以使用與一般配接器類似的方式來明確使用這些配接器,並擁有其本身的組態結構 (其中包含已發行的資料流名稱),如下列範例所示:

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

查詢可以依照相同的方法來使用已發行的資料流輸出配接器,其功能與 CepStream.toPublishedStreamQuery() 相同:

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

使用查詢繫結器

查詢繫結器開發模型允許對各種 StreamInsight 中繼資料物件進行完整控制,並將查詢繫結和使用方式清楚地與查詢範本設計階段分隔開來。此模型也允許輸入繫結及輸出繫結端的動態查詢撰寫。如需詳細資訊,請參閱<使用查詢繫結器>。

當做輸入繫結到另一個查詢

就像查詢繫結器可以將查詢範本當做事件產生者繫結到輸入配接器一樣,它也可以繫結到現有的查詢。假設有主要查詢 (包含繫結或解除繫結的輸出) 存在,就像第一個範例一樣。

var query = filtered.ToQuery(myApp, ...);

然後,可依照下列方式使用查詢繫結器,參考適當 BindProducer 多載中的上一個查詢。

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

另外,查詢繫結器也可以當做事件產生者來參考發行的資料流。

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

就如同 Query.ToStream() 簽章一樣,可以在 BindProducer() 中指定選擇性 AdvanceTimeSettings 物件。

當做輸出繫結到發行的資料流

在輸出端,查詢繫結器允許以資料流形式傳送到明確定義的已發行資料流。

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

一旦啟動根據這個查詢繫結器的查詢之後,其他查詢就可以繫結到發行的資料流 (如同之前的範例所述),並取用其結果事件。

繫結到發行的資料流配接器

發行的資料流配接器也可以用於查詢繫結器模型中。您可以從應用程式物件擷取這些配接器,並將其用於 BindProducer 和 AddConsumer 中,就像一般配接器一樣:

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

請參閱

概念

StreamInsight 端對端範例

提前應用程式時間