時間戳記修改

本主題的範例示範如何使用運算子來修改事件的時間戳記。您可以藉由變更事件時間戳記,變更事件對於後續作業的影響,例如聯結、視窗的彙總等等。下列 LINQ 擴充方法會顯示這個功能。

事件時間移位

ShiftEventTime() 運算子會根據指定的運算式來變更資料流中每一個事件的開始時間。

下列範例會將資料流中每一個事件的時間移位 15 分鐘到未來。

// shift events by 15 minutes into the future.
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromMinutes(15)); 

下列範例會將資料流中每一個事件的時間移位 1 小時到過去。

// shift events by 1 hour into the past.
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromHours(-1));

指定時間移位的運算式可以參考目前的事件開始時間,但不是參考其結束時間或裝載。此移位不會影響事件的存留期間或裝載。

DateTime.MinValue 值會被視為編碼負無限的時間值。如果事件開始時間具有此值,而且在指定的運算式中參考此值 (而非常數),系統就不會評估運算式,而且開始時間將維持 DateTime.MinValue。否則,系統將在執行階段中評估運算式,而這可能仍然會產生溢位例外狀況。

請注意,指定的時間移位也將套用至傳遞給這個運算子的 CTI 事件,因為 ShiftEventTime 會影響資料流中所有事件的開始時間。

變更事件的持續時間

AlterEventDuration() 運算子會變更事件的存留期間。事件存留期間指定了事件有效的時間間隔。這段持續時間會定義為事件的函數,以便可從事件的開始時間、結束時間或裝載來計算得出此時間。

下列範例會將事件持續時間設定為 1 小時。

// set event duration to 1 hour.
var onehour = inputStream.AlterEventDuration(e => TimeSpan.FromHours(1));

下列範例會將事件持續時間設定為其目前存留期間的兩倍。

// double event duration. 
var doubled = inputStream.AlterEventDuration(e => (e.EndTime - e.StartTime) * 2);

DateTime.MaxValue 值會被視為編碼正無限的時間值。如果事件結束時間具有此值,而且在指定的運算式中參考此值,系統就不會評估運算式,而且結束時間將維持 DateTime.MaxValue。

變更事件的移位和持續時間

AlterEventLifetime() 運算子會結合 AlterEventDuration 和 ShiftEventTime 函數來取得最大表現度。

AlterEventLifeTime() 方法的第一個參數指定新的開始時間戳記,而且可以參考目前事件的開始時間。這個參數必須指定成 UTC 時間。第二個參數則指定新的存留期間,而且可以參考目前事件的開始時間、結束時間和裝載欄位。

下列範例會將事件時間移位 1 分鐘到過去,但是在將新的存留期間指定為第二個參數時,事件的結束時間則維持不變 (在原始存留期間中增加額外的 1 分鐘)。

// shift event 1 minute into the past, but leave the end time (event duration) unchanged.
var newStream = inputStream.AlterEventLifetime(e => e.StartTime - TimeSpan.FromMinutes(1),
                                               e => e.EndTime - e.StartTime + TimeSpan.FromMinutes(1));]

請注意,指定的開始時間移位也將套用至傳遞給這個運算子的 CTI 事件。

請參閱本主題前面有關 DateTime.MinValue 和 DateTime.MaxValue 的備註。

將資料流轉換成時間點事件資料流

ToPointEventStream 運算子是一個方便的功能,可將邊緣和間隔事件轉換成時間點事件 (將事件的存留期間更改為超過事件開始時間的單一刻度),如下列範例所示。

var pointStream = inputStream.ToPointEventStream();

當間隔事件轉換成時間點事件時,只會保留事件的開始時間。

裁剪事件的持續時間

ClipEventDuration 運算子會採用兩個資料流做為參數,並且根據第二個資料流中下一個相符事件的開始時間變更第一個資料流中每個事件的存留期間。

到目前為止,我們已經看過可讓您依照固定時間範圍變更事件存留期間的運算子。ClipEventDuration 運算子會提供一種非常彈性的方法,讓您以相對於其他事件的方式調整事件的存留期間。一般而言,這個運算子是針對某個資料流所指定,並且採用另一個資料流做為參數,以及比對條件。此運算子會將第一個資料流中每個事件的存留期間裁剪至其他資料流中滿足比對條件之「下一個」事件的開始時間 (就應用程式時間而言)。

例如,假設有兩個資料流 stream1 和 stream2,而且它們帶有包含裝載欄位 "Id" 的事件。下列陳述式會將 stream1 中的所有事件裁剪至 stream2 中具有相同 "Id" 值的下一個事件:

var clipped = stream1.ClipEventDuration(stream2, (e1, e2) => e1.Id == e2.Id);

系統會將比對條件提供成這兩個輸入裝載的運算式。下列圖表將說明這個陳述式的語意:

ClipEventDuration 的語意

此圖表顯示出如何將 stream1 中具有 Id = A 的第一個事件裁剪至 stream2 中具有 Id = A 的下一個事件。系統不會裁剪 stream1 中的其他事件 (Id = B),因為 stream2 中的下一個相符事件只會出現在 stream1 中的事件結尾後面。

這種裁剪行為會開啟廣泛應用。它可滿足的其中一種常見需求是將點的資料流轉換成連續間隔的資料流,也稱為「訊號」。

點對訊號轉換

在此情況下,您必須先擴充所有時間點事件,讓它們實際到達下一個事件。換言之,您必須套用逾時,以便決定某個事件應該在下一個事件發生之前持續的時間長度。這個逾時可以是有限或無限的時間範圍。讓我們假設逾時為 60 秒:

var extended = input.AlterEventDuration(e => TimeSpan.FromSeconds(60));

做好這項準備之後,我們就可以使用 ClipEventDuration 運算子,並提供資料流本身做為其參數。這樣做會導致系統將每個事件裁剪至相同資料流中的下一個事件,並建立間隔事件的連續序列。因為只有第二個資料流的開始時間會影響裁剪作業,所以我們也可以使用原始的點資料流:

var signal = extended.ClipEventDuration(input, (e1, e2) => true);

此處,比對條件一律評估為 true,不過前提是我們著眼於單一邏輯資料流,亦即,資料流中的所有事件都與單一資料來源相關聯。

下列圖表將說明透過 ClipEventDuration 運算子進行點對訊號轉換的影響:

使用 ClipEventDuration 的點對訊號轉換

您可以將這兩個 LINQ 陳述式結合成單一陳述式:

var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => true);

如果資料流包含多個邏輯資料流 (例如,來自多個裝置的度量或多個股票的價值),您就必須在布林運算式中比對個別索引鍵 (裝置識別碼或股票代號)。

var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => e1.Symbol == e2.Symbol);

建立工作階段

ClipEventDuration 的另一個使用案例是建立工作階段事件,以便為在這類工作階段期間發生的事件加上註解。讓我們假設下列事件結構描述,其中描述某些使用者互動的事件:

public class EventType
{
    public int UserId;
    public string Type;
    public DateTime Time;
    public byte[] data;
};

在此範例中,裝載欄位 Type 可以是 “start”、“end” 或 “other”,其中分別描述使用者工作階段的開始、工作階段的結尾,或是在工作階段期間發生的使用者事件。欄位 Time 包含互動的時間戳記,而 data 包含進一步資訊。此工作的目的是要使用發生事件期間之工作階段的開始時間,為每個事件加上註解。此外,我們假設每個工作階段都會在 10 分鐘之後逾時。

下列圖表將顯示這種案例中的一系列範例事件:

使用 ClipEventDuration 建立工作階段事件

首先,根據逾時擴充的持續時間會套用至類型為 “start” 的所有事件:

var sessionStarts = from e in input
                    where e.Type == “start”
                    select e;
var sessionStartsExt = sessionStarts.AlterEventDuration(e => TimeSpan.FromMinutes(10));

接著,系統必須針對每個使用者識別碼裁剪這些工作階段事件,直到其個別結束為止:

var sessionEnds = from e in input
                  where e.Type == “end”
                  select e;
var sessions = sessionStartsExt.ClipEventDuration(sessionEnds, (e1, e2) => e1.UserId == e2.UserId);

此圖表將說明這些陳述式:

使用 ClipEventDuration 裁剪工作階段事件

現在,您可以將工作階段事件聯結至其餘事件:

var sessionActivity = from e in input
                      where e.Type == “other”
                      select e;
var annotated = from s1 in sessions
                join s2 in sessionActivity
                on s1.UserId equals s2.UserId
                select new {
                    s2.UserId,
                    s2.Type,
                    s2.Time,
                    s2.Data,
                    SessionStart = s1.Time
                }

在此聯結中,我們可以參考 sessionActivity 事件以及工作階段事件的欄位,以便組合加上註解的 sessionActivity 事件,並將工作階段開始時間納入每個 sessionActivity 事件:

將工作階段事件聯結至其他事件

因為聯結條件為 UserId 是否相等,所以對於 UserId=X 的這個特定工作階段而言,不會將 sessionActivity 中具有 UserId=Y 的事件納入考量。

您可以將這些 LINQ 陳述式壓縮成更精確的集合:

var sessions = input
                 .Where(e => e.Type == “start”)
                 .AlterEventDuration(e => TimeSpan.FromMinutes(10))
                 .ClipEventDuration(input.Where(e => e.Type == “end”), (e1, e2) => e1.UserId == e2.UserId);
var annotated = from s1 in sessions
                join s2 in input.Where(e => e.Type == “other”)
                on s1.UserId equals s2.UserId
                select new {
                    s2.UserId,
                    s2.Type,
                    s2.Time,
                    s2.Data,
                    SessionStart = s1.Time
                }

請參閱

概念

StreamInsight 伺服器概念