提前應用程式時間

StreamInsight 開發人員必須透過高度即時的方式,讓可能具有失序資料的資料來源需求與處理事件的需求達到平衡。雖然快速提前應用程式時間有助於減少延遲,這樣卻也會縮減視窗而排擠了晚抵達的資料 (亦即弱化資料未按順序抵達的能力)。StreamInsight 提供各種方法來推理應用程式時間。本主題描述可在配接器層級設定與使用查詢繫結來提前應用程式時間的不同等級和原則。

了解暫時模型

StreamInsight 的暫時模型只會根據應用程式時間,絕對不會根據系統時間。這表示,所有暫時運算子都會參考事件的時間戳記,而不會參考主機的系統時鐘。因此,應用程式必須將其目前的應用程式時間傳達給 StreamInsight 伺服器。給定應用程式的應用程式時間取決於應用程式內容中的許多不同層面。歸根究底,應用程式開發人員有責任提供適當的應用程式時間給 StreamInsight 伺服器。應用程式時間的主要考量如下:

  • 資料來源

    當資料來源傳達暫時資訊時,該資料可用來識別已經收到資料來源中所有事件的時間點。這個時間點會構成與這個資料來源相關的目前應用程式時間。請注意,不同的資料來源可能會以不同的速度來進行。

  • 失序的資料

    使用某些資料來源時,事件不一定會依照其時間戳記的順序抵達。也就是說,資料會失序。StreamInsight 可以容納失序的資料,並確保結果不會取決於事件抵達 StreamInsight 伺服器的順序。StreamInsight 開發人員可以使用一些寬限時間來提前應用程式時間,以便針對具有晚抵達之事件的這些資料來源緩慢移動失序的事件。

  • 結果的有效性

    StreamInsight 查詢會輸出結果,已知這些結果到目前應用程式時間為止都很精確。這表示,當整體應用程式時間的進度完成結果時,將會從 StreamInsight 查詢形成結果。

目前時間累加 (CTI)

在查詢處理期間,應用程式時間是由目前時間增量 (CTI) 事件所驅使。CTI 是一種標點符號事件,它是 StreamInsight 暫時模型的中心元件。CTI 會用來認可事件的順序,並將計算的結果釋放到查詢輸出,其方式是對 StreamInsight 伺服器宣稱,時間表的某些部分將不會再變更。因此,一定要將 CTI 連同事件加入輸入事件資料流的佇列中,這樣才會產生任何結果並排清具有狀態之運算子的狀態。

將 CTI 加入佇列之後,輸入會保證不產生任何影響 CTI 時間戳記之前時段的任何後續事件。這表示,當 CTI 已經加入輸入的佇列之後:

  • 對於圖形為點、間隔或邊緣開頭的事件而言:事件的開始時間必須是 CTI 或 CTI 之後。

  • 對於圖形為邊緣結尾的事件而言:事件的結束時間必須是 CTI 或 CTI 之後。

如果違反了上述規則,我們就稱為 CTI 違規。接著,我們將描述這些違規的處理方式。

有三個方法可以將 CTI 插入輸入資料流。

  1. 透過輸入配接器以程式設計方式將 CTI 加入佇列中,類似於將事件加入佇列中。

  2. 依照給定的頻率來以宣告方式產生 CTI。這可以透過配接器 Factory 中的 AdvanceTimeGenerationSettings 或是當做查詢繫結的一部分來指定。

  3. 將個別的輸入資料流定義為 CTI 來源。這只能在查詢繫結中指定。

每當實作方法 2 和 3 時,也必須實作 CTI 違規的原則。在下列章節中,將會描述 AdvanceTimeGenerationSettings 和違規原則。後續章節將會描述如何使用配接器 Factory 以及查詢繫結中的提前時間設定。

產生 CTI

CTI 的產生 (之前的方法 2 和 3 所述) 具有兩個維度:

  1. 產生頻率 (指定為正整數 N 或是時間範圍 T)。產生頻率原則會在發生事件計數 (N) 或時間範圍 (T) 之後插入 CTI。

  2. 產生之 CTI 的時間戳記 (指定為與最後收到之事件有關的延遲)。

此外,您也可以使用布林值旗標來指示當關閉查詢時,是否應該插入具有正無限值之時間戳記的最終 CTI。這是用來從查詢的運算子中排清所有剩餘的事件。

CTI 產生作業是透過 AdvanceTimeGenerationSettings 類別所定義,此類別的建構函式會採用下列範例所示的頻率、延遲和旗標。

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);

此範例會指示引擎在每 10 個來自事件來源的事件之後插入 CTI。CTI 會夾帶上一次事件時間減去 5 秒鐘的時間戳記。這個延遲機制會有效地實作寬限期間,讓事件來源可以將延遲事件加入佇列中,而不會違反 CTI 語意 (前提是事件的延遲時間絕對不會超過 5 秒鐘)。當對應的查詢關閉時,便會將具有無限時間的 CTI 加入佇列中。

請注意,透過 AdvanceTimeSettings 指定 CTI 產生頻率時,不會將結束邊緣納入考量。該項因素在使用持續時間做為頻率的情況下,亦將忽略不計。以邊緣事件而言,頻率和持續時間都只考慮開始邊緣。

CTI 違規原則

如果傳送的事件包含早於插入之 CTI 的時間戳記,事件來源就有可能違反 CTI 語意。提前時間設定允許指定原則來處理這類情況的發生,此原則可以具有下列兩個值:

  • Drop

    違反插入之 CTI 的事件將會捨棄而且不會排入查詢的佇列中。

  • Adjust

    違反插入之 CTI 的事件將會進行修改 (前提是其存留期間與 CTI 時間戳記重疊)。也就是說,事件的開始時間戳記會設定為最新的 CTI 時間戳記,讓這些事件變成有效。如果事件的開始和結束時間落在 CTI 時間戳記之前,就會捨棄該事件。

配接器提前時間設定

提前應用程式時間的設定可以在配接器 Factory 的定義中指定。每當具現化配接器時,便會呼叫此 Factory 的 Create() 方法,也會以同樣的方式呼叫對應的方法來定義配接器執行個體的提前時間設定。若要這樣做,請將 ITypedDeclareAdvanceTimeProperties 介面用於具類型的配接器 (或是將 IDeclareAdvanceTimeProperties 用於不具類型的配接器),如下列範例所示。

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>

此介面需要將下列方法實作為此 Factory 的一部分。

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
    return ats;
}

DeclareAdvanceTimeProperties() 方法會針對每一個新具現化的配接器來呼叫,該配接器在對應的 Create() 方法呼叫中有指定相同的組態結構及事件圖形參數。如此可讓配接器作者從組態資訊衍生正確的 CTI 產生設定,而不用要求查詢寫入者和繫結者留意提前時間設定的細節。

AdapterAdvanceTimeSettings 建構函式需要 AdvanceTimeGenerationSettings 物件及之前所述的違規原則。

查詢繫結中的 CTI 產生

類似於 AdapterAdvanceTimeSettings,發出 CTI 可以在查詢繫結中以宣告方式指定,如下列範例所示。如此可讓繫結查詢的使用者定義 CTI 應用程式時間行為,而與配接器實作無關。

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

AdvanceTimeSettings 建構函式會採用下列三個引數:

  1. AdvanceTimeGenerationSettings 物件

  2. AdvanceTimeImportSettings 物件

  3. 違規原則

請注意,產生設定或匯入設定引數可以設定為 null,但不能兩者都設定為 null。此外,也可以一起指定這兩者。下一節會介紹 AdvanceTimeImportSettings 類別。

上述範例指定隨著每一個事件產生及插入 CTI,且包含事件的時間戳記 (無延遲)。AdvanceTimeSettings 物件可以當做最後一個選擇性參數傳遞給 CepStream.Create() 方法,如下列範例所示。

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

var inputstream = CepStream<MyPayloadType>.Create("inputStream",
                                                  typeof(MyInputAdapterFactory),
                                                  new MyConfiguration(),
                                                  EventShape.Point,
                                                  ats);

它也可以用於查詢繫結器開發模型中:

queryBinder.BindProducer<MyPayloadType>("filterInput",
                                        inputAdapter,
                                        new MyConfiguration(),
                                        EventShape.Point,
                                        ats);

與另一個資料流同步

在查詢繫結期間使用時,除了根據頻率產生 CTI 之外,也可以使用 AdvanceTimeImportSettings 從另一個輸入資料流將其複製到查詢中。這項功能可將兩個資料流同步,如下列範例所示。

var dataStream = CepStream<DataType>.Create("dataStream ",
                                            typeof(DataInputAdapterFactory),
                                            new MyDataAdapterConfiguration(),
                                            EventShape.Point);

var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);

var lookupStream = CepStream<LookupType>.Create("lookupStream",
                                                typeof(ReferenceInputAdapterFactory),
                                                new MyReferenceConfiguration(),
                                                EventShape.Edge,
                                                ats);

var joined = from eLeft in dataStream
             join eRight in lookupStream
             where ...

此範例示範一個典型的使用案例,其中「快速」資料流必須與「緩慢」參考資料流聯結在一起。緩慢資料流可能是查閱資料,其變更頻率少於快速資料流許多。為了讓此聯結能夠與它的最快速輸入一樣快速產生輸出,緩慢輸入資料流會藉由匯入其 CTI 來與快速資料流同步。在這個範例中,處理快速資料流的應用程式時間會視為在配接器中發生。

結果的有效性

提前時間產生設定的延遲參數會指定插入之 CTI 的時間戳記。請務必了解 StreamInsight 架構中 CTI 的精確語意,才能達成所要的輸出有效性。CTI 對引擎宣稱,系統會認可時間表上完全在 CTI 時間戳記之前的所有內容。這對於結果的有效性而言,將會有不同的含意。

例如,假設有時間點事件的輸入資料流及頻率為 1 的 CTI 產生設定 (每一個事件) 且延遲為 0。這樣會產生 CTI,且每一個時間點事件的時間戳記都完全相同。但是,這表示最後一個時間點事件將只會隨著下一個 CTI 一起認可,因為它的時間戳記並沒有完全在對應的 CTI 之前。若要在配接器發出每一個時間點事件時都能夠加以認可,CTI 的時間戳記必須緊接在時間點事件之後。這樣會轉譯成一個刻度的延遲負值,如下列範例所示。

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);

CTI 與查詢運算子

如前所述,CTI 是由輸入配接器加入佇列或插入。CTI 會透過查詢進行傳播,而交由特定運算子各以不同的方式處理。例如,聯結運算子釋出的結果為兩側當中較舊的 CTI。聯集運算子會釋出兩側最新的 CTI 當中較舊的結果。整體查詢釋出的結果則僅止於最新的 CTI。

另一方面,某些運算子會對 CTI 時間戳記造成影響。跳動視窗會將視窗中的 CTI 拉回到視窗起始處,因為儘管事件仍落在該視窗中,視窗頂端的運算其結果尚有可能變更。ShiftEventTime() 和 AlterEventLifeTime() 方法都將變更事件的開始時間,並且會對 CTI 套用同樣的轉換。

請參閱

概念

建立輸入和輸出配接器

StreamInsight 伺服器概念

變更記錄

更新的內容

新增<CTI 與查詢運算子>一節。

在<產生 CTI>一節中新增資訊,敘述透過 AdvanceTimeSettings 指定 CTI 頻率時,不會將結束邊緣納入考量。