本文章是由機器翻譯。

StreamInsight

利用 Microsoft StreamInsight 掌握大型資料流

Rob Pierry

[下載代碼示例](https://archive.msdn.microsoft.com/mag201106streaminsig)

生產線的產量下降後,將容易出現使用者媒體流跳過這些流程,或者您的一個產品成為了“必需產品”的情況。真正的竅門是在這些情況發生時進行識別,或根據以往趨勢對其做出預測。

成功預測這些情況需要使用近乎即時的方法。在對相關資料進行提取、轉換並載入到 SQL Server Analysis Services (SSAS) 等傳統商業智慧 (BI) 解決方案中時,情況早已發生改變。同樣,一些系統依靠請求-回應模式來從事務性資料存儲(如 SQL Server Reporting Services 或 SSRS、報告)中請求已更新的資料,像這樣的系統總是在接近請求-輪詢間隔結束時運行陳舊資料。輪詢間隔通常是固定的,因此即使突然發生有趣的活動,消耗系統也不會知道,直到進入下一個間隔。相反,消耗系統應該在滿足趣味條件時連續收到通知。

在檢測新興趨勢時,時間間隔至關重要 - 在過去的五分鐘內,一個特定專案發生了 100 次購買,顯而易見,這比過去五個月間的持續購買更能指示新興趨勢。SSAS 和 SSRS 等傳統系統需要開發人員通過事務性存儲中多維資料集或時間戳記列中的單獨維度來自行跟蹤資料的及時性。理論上,用於識別新興情況的工具可能具有時間內置的概念,並能提供使用該工具所需的豐富 API。

最後,對未來的準確指示來源於對過去的分析。實際上,這就是傳統 BI 的所有功能 - 對大量的歷史資料進行匯總和分析,從而識別趨勢。遺憾的是,與更多的事務性系統相比,在使用這些系統時需要不同的工具和查詢語言。成功識別新興情況需要實現過去資料和當前資料的無縫關聯。只有當對這兩種資料使用相同的工具和查詢語言時,才可能實現這種緊密集成。

對於生產線監視等特定情況,可通過存在的針對性極強的自訂工具來執行這些功能,但是這些工具通常比較昂貴且用途並不廣泛。

為了防止生產線產量下降或確保您的產品定價合適,關鍵在於具有足夠的回應能力,能夠根據情況的更改而進行識別與調整。若要輕鬆快速地識別這些情況,歷史查詢和即時查詢應使用相同的開發人員友好的工具集和查詢語言,系統應該以近乎即時的方式來處理大量的資料(大約為每秒成百上千個事件),同時引擎應該足夠靈活,能夠處理跨越多個問題域的情況。

幸運的是,存在這樣的工具。它稱為 Microsoft StreamInsight。

StreamInsight 體系結構概述

StreamInsight 是一種複雜事件處理引擎,它每秒能夠處理成百上千的事件,且延遲極低。它可以由任何進程(如 Windows 服務)託管,也可以直接嵌入任何應用程式。StreamInsight 具有簡單的適配器模型,用於輸入和輸出資料,並且即時資料和歷史資料的查詢像任何其他來自任何 Microsoft .NET Framework 語言的程式集一樣使用獲取的相同 LINQ 語法。其作為 SQL Server 2008 R2 的一部分授予許可。

StreamInsight 的高級體系結構非常簡單:通過輸入適配器從各種源收集事件。這些事件均通過查詢進行分析和轉換,並且查詢結果通過輸出適配器分發給其他系統和人。圖 1顯示了這一簡單結構。

圖 1 Microsoft StreamInsight 高級體系結構

就像面向服務的體系結構關注消息,而資料庫系統關注行一樣,StreamInsight 等複雜事件處理系統按照事件進行組織。事件是簡單的資料段以及與該資料相關的時間 - 與一天中特定時間的感測器讀數或股票行情價格相似。事件所攜帶的資料稱為它的負載。

StreamInsight 支援三種類型的事件。點事件是即時且不持續的事件。間隔事件是其負載與特定時間段相關的事件。邊緣事件與間隔事件相似,但當邊緣事件到達時,其持續時間未知。而系統設置了開始時間,且事件實際上具有無限持續時間,直到另一個邊緣事件到達才會為這一事件設置結束時間。例如,速度計讀數可能為點事件,因為它不斷更改,但是超市的牛奶價格可能為邊緣事件,因為其關聯時間較長。當牛奶的零售價格更改時(比如,由於分銷商定價發生更改),新價格的持續時間未知,因此,與間隔事件相比,邊緣事件要更為合適。稍後,當分銷商再次更新其定價時,新的邊緣事件將覆蓋先前定價更改的持續時間,而另一個邊緣事件將設置新的價格以便繼續。

StreamInsight 中的輸入適配器和輸出適配器是適配器設計模式的抽象示例。StreamInsight 引擎在其自有的事件表示上運行,但是這些事件的實際來源可能有較大差異,範圍從專有介面到硬體感測器到由企業的應用程式生成的狀態訊息。輸入適配器將源事件轉換為引擎能夠理解的事件流。

來自 StreamInsight 查詢的結果表示特定商業知識,且能夠高度專業化。將這些結果路由至最合適的地點,這點至關重要。輸出適配器可用於將事件的內部表示轉換為列印到主控台的文本、通過 Windows Communication Foundation (WCF) 發送到另一個系統以供處理的消息,甚至 Windows Presentation Foundation 應用程式中圖表上的點。有關使用文字檔、WCF 和 SQL 等的示例適配器可從streaminsight.codeplex.com獲得。

StreamInsight Queries by Example

乍一看,StreamInsight 查詢似乎與從資料庫中查詢行相似,但是兩者之間存在重大差異。查詢資料庫時,系統會構造並執行查詢,同時返回結果。如果基礎資料發生更改,輸出並不會因為已運行查詢而受影響。資料庫查詢結果表示某一時刻的快照,可以通過請求-回應模式使用。

StreamInsight 查詢為現有查詢。隨著新輸入事件的到達,查詢不斷回應,並且根據需要創建新的輸出事件。

本文中的查詢示例來自可供下載的示例解決方案。這些示例開始較簡單,但隨著查詢語言新功能的引入,功能變得更加強大。所有查詢都使用同一負載類。以下是一個簡單類的定義,該類具有 Region 屬性和 Value 屬性:

public class EventPayload {
  public string Region { get; set; }
  public double Value { get; set; }

  public override string ToString() {
    return string.Format("{0}\t{1:F4}", Region, Value);
  }
}

示例應用程式中的查詢使用一台輸入適配器和一台輸出適配器來進行,輸入適配器可隨機生成資料,輸出適配器只需將各事件寫入主控台。 為清晰起見,對示例應用程式中的適配器進行了簡化。

若要運行每個查詢,請在示例解決方案中取消注釋 Program.cs 檔中的行,該示例解決方案可將查詢分配給稱為“template”的本地變數。

以下是一個基本查詢,它通過 Value 屬性來篩選事件:

var filtered =
  from i in inputStream
  where i.Value > 0.5
  select i;

具有使用 LINQ 經驗的任何開發人員應該非常熟悉此查詢。 因為 StreamInsight 使用 LINQ 作為它的查詢語言,因此此查詢與 LINQ to SQL 查詢類似,訪問資料庫或對 IList 進行記憶體中篩選。 當事件從輸入適配器到達時,其負載將受到檢查,並且如果 Value 屬性的值大於 0.5,事件將被傳遞到輸出適配器,並在此將其列印到主控台。

應用程式運行時,可以看到事件不斷到達輸出中。 這實際上是一個推模型。 當事件到達時,StreamInsight 會計算來自輸入的新輸出事件,這與資料庫等拉模型不同,在拉模型中,應用程式必須定期輪詢資料來源,以查看新資料是否已經到達。 這能與 Microsoft .NET Framework 4 中可用的 IObservable 支援完美結合,我們將在後續章節中對此進行介紹。

使用推模型代替輪詢來處理連續資料是個非常好的主意,但是 StreamInsight 的真正功能體現在查詢時間相關的屬性上。 當事件通過輸入適配器到達時,它們獲得了一個時間戳記。 該時間戳記可能來自資料來源本身(假設事件表示歷史資料,且帶有用於存儲時間的顯示列),或者可以設置為事件到達的時間。 實際上,時間是 StreamInsight 查詢語言中的第一個類。

查詢通常與標準資料庫查詢類似,標準資料庫查詢在尾部粘貼有時間限制符,如“每五秒”或“五秒的時間跨度上每三秒”。例如,以下是一個簡單查詢,它每五秒查詢一次 Value 屬性的平均值:

var aggregated =
  from i in inputStream
    .TumblingWindow(TimeSpan.FromSeconds(5), 
    HoppingWindowOutputPolicy.ClipToWindowEnd)
  select new { Avg = i.Avg(p => p.Value)};

資料視窗

因為時間概念是複雜事件處理系統的基礎必需概念,因此應以簡單的方式來使用系統中查詢邏輯的時間元件,這點非常重要。StreamInsight 使用視窗概念來表示按時間分組。之前的查詢使用翻轉視窗。應用程式運行時,查詢將每五秒生成單個輸出事件(視窗的大小)。輸出事件表示前五秒的平均值。像 LINQ to SQL 或 LINQ to Object 一樣,聚合方法(如 Sum 和 Average)能夠將按時間分組的事件匯總為單個值,或可以使用 Select 將輸出投影成不同格式。

翻轉視窗只是另一種視窗類型的特例:跳躍視窗。跳躍視窗也有大小,但是它們也具有不等於其視窗大小的跳躍大小。這表示跳躍視窗可以互相重疊。

例如,視窗大小為五秒、跳躍大小為三秒的跳躍視窗將每三秒生成輸出(跳躍大小),提供前五秒的平均值(視窗大小)。它一次向前跳躍三秒,且持續五秒。圖 2顯示分組為翻轉視窗和跳躍視窗的事件流。

圖 2 翻轉視窗和跳躍視窗

請注意,翻轉視窗並不重疊,但是對於跳躍視窗,如果跳躍大小小於視窗大小,則可以重疊。如果視窗重疊,事件將可能在多個視窗中結束,如同時存在於視窗 1 和視窗 2 中的第三個事件。邊緣事件(具有持續時間)也可能在視窗邊緣重疊,並在多個視窗中結束,如翻轉視窗中的倒數第二個事件。

另一種常見視窗類型為計數視窗。計數視窗包含特定數量的事件,而不是某一時間點或時間段內的事件。要查詢最後三個到達的事件的平均數,可能需要使用計數視窗。計數視窗當前的一個限制是不支援 Sum 和 Average 等內置聚合方法。您必須創建使用者定義的聚合。下文會對這一簡單流程進行介紹。

最後一種視窗類型為快照視窗。在邊緣事件的環境下,快照視窗最容易理解。每次事件的開始或結束即表示當前視窗的完成和新視窗的開始。圖 3顯示如何將邊緣事件分組為快照視窗。請注意每個事件邊界觸發視窗邊界的方式。E1 開始,w1 也開始。當 E2 開始時,w1 完成,而 w2 開始。下個邊緣是 E1 結束,使得 w2 完成,而 w3 開始。結果為三個視窗: w1 containing E1, w2 containing E1 and E2, and w3 containing E3.事件分組為視窗後,它們會受到拉伸,從而使事件的開始與結束時間與視窗的相同。

圖 3 快照視窗

更多複雜查詢

在提供可用視窗與基本查詢方法(如地點、分組依據和排序依據)的情況下,可以進行多種查詢。以下是一個查詢,其將輸入事件按地區分組,然後使用跳躍視窗來輸出最後一分鐘各個 Region 的負載 Value 的總和:

var payloadByRegion =
  from i in inputStream
  group i by i.Region into byRegion
  from c in byRegion.HoppingWindow(
    TimeSpan.FromMinutes(1),
    TimeSpan.FromSeconds(2), 
    HoppingWindowOutputPolicy.ClipToWindowEnd)
  select new { 
    Region = byRegion.Key, 
    Sum = c.Sum(p => p.Value) };

這些視窗使用兩秒的跳躍大小,因此引擎每兩秒發送輸出事件。

因為查詢運算子是在 IQueryable 介面中定義的,因此可以撰寫查詢。 以下代碼使用上一個查詢,其按地區查找總和,並計算總和最高的地區。 快照視窗允許事件流按總和分類,因此可以使用 Take 方法獲取總和最高的地區:

var highestRegion = 
  // Uses groupBy query 
  (from i in payloadByRegion.SnapshotWindow(
    SnapshotWindowOutputPolicy.Clip)
    from sumByRegion in i
    orderby sumByRegion.Sum descending
    select sumByRegion).Take(1);

一般情況是有關快速移動事件(如感測器中的讀數)到慢速移動或靜態參考資料(如感測器的固定位置)流的查詢。 查詢使用聯接來實現此目的。

StreamInsight 聯接語法與任何其他 LINQ 聯接相同,但有一點需要注意:當事件的持續時間重疊時,它們才會聯接在一起。 如果感測器 1 在時間 t1 報告了一個值,但是有關感測器 1 位置的參考資料僅對時間 t2 到 t3 有效,那麼聯接將不匹配。 持續時間的聯結條件並沒有明確寫入查詢定義中;這是 StreamInsight 引擎的基本屬性。 使用靜態資料時,通常情況下,輸入適配器實際上將資料處理為帶有無限持續時間的邊緣事件。 這樣將能成功完成到快速移動事件流的所有聯接。

通過聯接來關聯多個事件流是一個非常強大的概念。 裝配線、石油生產設施或高容量網站通常不會因為隔離的事件而發生故障。 一個用於觸發溫度警報的設備部件通常不會導致生產線癱瘓;生產線癱瘓可能由於多個原因造成,如溫度在某一持續時間段內過高,同時某一工具使用過多,而操作員正在換班。

如果沒有聯接,隔離事件將不會有這麼多的商業價值。 通過對歷史資料使用聯接和 StreamInsight 查詢,使用者可以將隔離流與非常具體的監控條件相關聯,然後進行即時監控。 現有查詢能夠查找可能導致故障的情況,並自動生成可路由至系統的輸出事件,該系統知道如何使過熱的設備部件離線,而不是等到該部件造成整條生產線停產。

在零售情況中,有關某段時間按專案劃分的銷售量的事件可以輸入到定價系統和客戶訂單歷史記錄中,從而確保每個專案具有最佳的定價,或決定在使用者結帳前向其推薦的專案。 由於查詢易於創建、修改和撰寫,因此您可以從簡單的情況開始,並隨時間的流逝進行優化,從而增加業務價值。

使用者定義的聚合

StreamInsight 附帶最常見的彙總函式,包括 Count、Sum 和 Average。 當這些函數不夠時(或您需要在前文提到的計數視窗進行聚合),StreamInsight 支援使用者定義的彙總函式。

要創建使用者定義的聚合,其流程包括兩個步驟:編寫實際聚合方法,然後通過擴展方法將該方法公佈到 LINQ。

進行第一步時,如果聚合與時間無關,則從 CepAggregate<TInput, TOutput> 繼承,如果聚合與時間有關,則從 CepTimeSensitiveAggregate<TInput,TOutput> 繼承。 這些抽象類別具有單獨的實現方法,稱為 GenerateOutput。 圖 4顯示 EveryOtherSum 聚合的實現,這種聚合將每個其他事件加起來。

圖 4 EveryOtherSum 聚合

public class EveryOtherSum : 
  CepAggregate<double, double> {

  public override double GenerateOutput(
    IEnumerable<double> payloads) {

    var sum = default(double);
    var include = true;
    foreach (var d in payloads) {
      if (include) sum += d;
      include = !include;
    }
    return sum;
  }
}

進行第二步時,需要在 CepWindow<TPayload> 上創建擴展方法,以便可以在查詢中使用您的聚合。 CepUserDefinedAggregateAttribute 適用于擴展方法,以便通知 StreamInsight 在哪裡可以找到聚合的實現(在這種情況下,類是在第一步中創建的)。 在可下載的示例應用程式中,本流程兩個步驟的代碼均可在 EveryOtherSum.cs 檔中找到。

更多適配器資訊

查詢表示對適配器提供的資料進行操作的業務邏輯。 示例應用程式使用一台簡單輸入適配器和一台輸出適配器來進行,輸入適配器可生成亂數據,輸出適配器可將資料寫入主控台。 它們均遵循相似的模式,CodePlex 網站上提供的適配器也遵循這一模式。

StreamInsight 使用 Factory 模式來創建適配器。 給定配置類後,工廠可創建相應適配器的實例。 在示例應用程式中,輸入適配器和輸出適配器的配置類都非常簡單。 輸出適配器配置具有保存格式字串的單個欄位,可在編寫輸出時使用。 輸入適配器配置具有填寫生成隨機事件之間睡眠時間的欄位,也具有另一個稱為 CtiFrequency 的欄位。

CtiFrequency 中的 Cti 代表當前時間增量。 StreamInsight 使用 Cti 事件來説明確保事件以正確的順序傳遞。 預設情況下,StreamInsight 支援不按順序到達的事件。 當通過查詢傳遞事件時,引擎將自動對事件進行相應的排序。 然而,這一重新排序具有一定的限制。

假設事件真的能夠以任意順序到達。 那麼怎麼能夠確定最早的事件已經到達,並因此通過查詢來推送? 這不可能,因為下一個事件的時間可能比您收到最早事件的時間更早。 StreamInsight 使用 Cti 事件來通知引擎比已接收事件更早的事件將不會到達。 Cti 事件實際上提示引擎去處理已經到達的事件,隨後忽略或調整任何帶有早于當前時間的時間戳記的事件。

示例輸入適配器生成排序事件流,因此它在每個生成的事件後自動插入一個 Cti 事件,以便保持流程的進行。 如果您已編寫輸入適配器,而您的程式沒有產生輸出,則請確保您的適配器插入了 Cti,因為如果沒有 Cti,引擎將一直等下去。

StreamInsight 附帶了適配器的各種基本類:特型、泛型、點型、間隔型和邊緣型。 特型適配器總是產生帶有常見負載類型的事件 - 在示例案例中,為 RandomPayload 類。 泛型適配器適用于可產生多種事件類型的事件源,或不能提前得知行佈局和內容的事物,如 CSV 檔。

示例輸入適配器具有常見負載類型,可生成點事件,因此其繼承自 TypedPointInputAdapter<RandomPayload>。 基本類具有兩個必須實現的抽象方法: Start and Resume. 在示例中,Start 方法使得計時器在配置指定的間隔內觸發。 計時器的 Elapsed 事件運行 ProduceEvent 方法,該方法完成適配器的主要工作。 此方法的主體遵循通用模式。

首先,適配器檢查引擎自上次運行後是否已停止而現在仍在運行。 然後,調用基本類中的一種方法來創建點事件的實例,其負載已設置且事件已排列在流中。 在示例中,SetRandomEventPayload 方法可代替任何真實適配器邏輯 - 例如,讀取檔、與感測器對話或查詢資料庫。

輸入適配器工廠也非常簡單。 它實現了介面 ITypedInputAdapterFactory<RandomPayloadConfig>,因為它是特性適配器的工廠。 本工廠的唯一特點在於它也實現了 ITypedDeclareAdvanceTimeProperties<RandomPayloadConfig> 介面。 此介面允許工廠處理前文所述的 Cti 插入操作。

示例應用程式的輸出適配器遵循的模式與輸入適配器基本相同。 包括配置類、工廠與輸出適配器本身。 適配器類與輸入適配器十分相似。 主要區別是適配器從佇列中移除事件,而不是對其進行排隊。 因為 Cti 事件與其他事件相似,它們也到達輸出適配器,並很容易被忽略。

可觀察量

雖然適配器模型十分簡單,但還可以使用以下一種更簡單的方式來將事件輸入和輸出引擎。 如果應用程式使用的是 StreamInsight 的內嵌部署模型,則您可以使用 IEnumerable 和 IObservable 作為引擎的輸入和輸出。 給定一個 IEnumerable 或 IObservable,您可以通過調用所提供的擴展方法(如 ToStream、ToPointStream、ToIntervalStream 或 ToEdgeStream)之一創建輸入流。 這將創建一個看上去與輸入適配器創建的事件流極為相似的事件流。

同樣,給定一個查詢,擴展方法(如 ToObservable/Enumerable、ToPointObservable/Enumerable、ToIntervalObservable/Enumerable 或 ToEdgeObservableEnumerable)會分別將查詢輸出路由至 IObservable 或 IEnumerable。 這些模式特別適用于重播保存在資料庫中的歷史資料。

使用 Entity Framework 或 LINQ to SQL 創建資料庫查詢。 使用 ToStream 擴展方法將資料庫結果轉換為事件流,並定義關於該事件流的 StreamInsight 查詢。 最後,使用 ToEnumerable 將 StreamInsight 結果路由至方便您 foreach 並列印的位置。

Deployment Model and Other Tools

若要使用 Observable 和 Enumerable 支援,必須在您的應用程式中嵌入 StreamInsight。 但是 StreamInsight 不支援獨立模型。 在安裝時,系統會詢問您是否創建 Windows 服務以託管預設實例。 該服務可隨後託管 StreamInsight,允許多個應用程式連接到相同的實例並共用適配器和查詢。

通過共用伺服器而非嵌入的伺服器來進行的通信會使用 Server 類上的一種不同的靜態方法。 不調用具有實例名稱的 Create,而是調用 Connect,其帶有指向共用實例的 EndpointAddress。 此部署策略更適用于企業情況,在此情況下,多個應用程式可能需要使用共用的查詢或適配器。

在兩種情況下,有時需要弄清楚為什麼 StreamInsight 生成的輸出不是應該生成的輸出。 該產品附帶名為 Event Flow Debugger 的工具,以用於此用途。 本文不介紹該工具的使用方法,但總而言之,該工具允許您連接到實例並通過查詢跟蹤輸入和輸出事件。

靈活、反應迅速的工具

靈活的部署選項、熟悉的程式設計模型和可輕鬆創建的適配器使得 StreamInsight 成為各種情況下的好選擇。 從查詢並在一秒內關聯數以千計的感測器輸入的集中式實例到在單個應用程式中監控當前事件和歷史事件的嵌入式實例,StreamInsight 均採用開發人員友好的框架(如 LINQ)來實現高度自訂的解決方案。

易於創建的適配器以及用於在事件流與 IEnumerable 和 IObservable 之間進行轉換的內置支援使得它能夠快速找到解決方案並運行,從而增加封裝了特定商業知識的查詢的創建和完善工作。 在完善過程中,這些查詢提供越來越多的值,使得應用程式和組織能夠在發生有趣情況時進行識別並做出反應,而不錯過處理的機會。

Rob Pierry 是 Captura (capturaonline.com) 的首席顧問,其中 Captura 是一家諮詢公司,提供由可擴展技術支援的創新使用者體驗。您可以通過rpierry+msdn@gmail.com 與他聯繫。

衷心感謝以下技術專家對本文的審閱:Ramkumar Krishnan、Douglas LaudenschlagerRoman Schindlauer