使用者定義彙總和運算子

本主題的範例將示範如何使用使用者定義彙總 (UDA) 和使用者定義運算子 (UDO),擴充 StreamInsight LINQ 運算子中的一組視窗作業。這些擴充會透過事件視窗定義,並傳回零個或多個結果事件。使用者定義彙總或使用者定義運算子必須編譯成組件,而且此組件可由 StreamInsight 伺服器利用在執行階段中提供和使用配接器的相同方式存取。

StreamInsight 也會提供使用者定義資料流運算子,做為其他擴充性機制。使用者定義資料流運算子是透過事件資料流 (而非事件視窗) 直接定義。

使用者定義彙總

使用者定義彙總會使用於視窗規格之上,以便彙總該視窗中的事件並產生單一結果值。UDA 會採用包含一組 CEP 事件的 CEP 視窗 (也就是跳動、快照集視窗或計數視窗運算子的結果) 當做其輸入,並且輸出單一傳回值 (對應至其中一個 StreamInsight 基本類型的 CLR 類型)。如需有關視窗的詳細資訊,請參閱<使用事件視窗>。

您可以實作功能比簡單彙總更為複雜的 UDA,類似於 StreamInsight 提供的 count、sum 和 average。之後的章節將討論這類範例的其中一個,也就是計算時間加權平均值。

使用者定義運算子

使用者定義運算子會使用於視窗規格之上,以便處理該視窗中的事件,並產生一組或多組結果事件。UDO 會採用包含一組 CEP 事件的 CEP 視窗 (也就是跳動、快照集視窗或計數視窗運算子的結果) 當做其輸入,並且輸出一組 CEP 事件或一組 CEP 裝載。

當您需要每一個視窗的計算來產生或影響完整事件 (包括其時間戳記) 時,就可以使用 UDO。除了計算彙總以外,事件之狀態欄位的設定就是一個例子,其中狀態會取決於彙總結果和另一個參數。例如,UDO 可能會針對每個包含具有彙總結果之裝載欄位以及指出彙總結果是否違反某些條件約束之狀態欄位的視窗產生單一事件。

UDA 和 UDO 中的時間緊迫性

您可以根據您選擇來實作這些運算子的基底類別,將 UDA 和 UDO 定義為時間緊迫或不緊迫。

系統不會預期將完整事件傳遞給時間不緊迫的 UDA 和 UDO,包括其時間戳記。它們只會考量已定義之視窗中事件的一組或多組裝載欄位。此外,目前視窗的開始和結束時間也不會傳遞給它們。

時間緊迫的 UDA 和 UDO 會針對每個視窗收到一組事件,包括其時間戳記以及視窗開始和結束時間。UDA 或 UDO 是否時間緊迫是由 UDA 或 UDO 作者從中衍生實作的個別基底類別所決定。

實作使用者定義彙總

UDA 的作者具有下列責任:

  • 提供 UDA 的實際實作。

  • 提供擴充方法給 LINQ,讓查詢寫入者能夠使用 UDA。

若要實作 UDA,使用者會從適當的基底類別衍生:CepAggregate (適用於時間不緊迫的 UDA) 或 CepTimeSensitiveAggregate (適用於時間緊迫的 UDA)。

此類別衍生需要輸入和輸出類型參數的具現化。輸入類型表示完整裝載 (如果 UDA 需要能夠在其運算過程中查看整組裝載欄位) 或是對應至 StreamInsight 類型系統中之相對應基本類型的 CLR 類型 (單一欄位為 UDA 輸入的案例)。這兩種案例的輸出類型都必須是對應至對應基本類型的 CLR 類型。

除了事件資料以外,查詢開始時間的選擇性組態結構可以傳遞給 UDA 類別的建構函式 (如果 UDA 作者預期這點的話)。如果這類建構函式是由 UDA 作者所提供,引擎將會隨之在執行階段呼叫它,其組態是由 LINQ 中 UDA 的呼叫端所提供。

時間緊迫和不緊迫的 UDA 都會收到未排序之集合形式的裝載。如果是時間緊迫的 UDA,系統會額外地將事件的時間戳記與每一個裝載產生關聯。此外,定義視窗開始和結束時間的視窗描述項也會傳遞給 UDA。

使用者定義彙總範例

下列範例會實作時間不緊迫的 UDA,此範例預期有一組整數事件欄位。這個範例實作並未指定選擇性組態結構,因此,此類別不需要特定的建構函式。

public class Median : CepAggregate<int, int>
{
    public override int GenerateOutput(IEnumerable<int> eventData)
    {
        var sortedData = eventData.OrderBy(e => e.Payload);
        int medianIndex = (int)sortedData.Count() / 2;
        return sortedData.Count() % 2 == 0 ?
            (sortedData.ElementAt(medianIndex).Payload + sortedData.ElementAt(medianIndex - 1).Payload) / 2 :
            sortedData.ElementAt(medianIndex).Payload;
    }
}

除了實作 UDA 以外,您也必須提供擴充方法給 LINQ,讓查詢寫入者能夠使用 UDA。此擴充方法是讓查詢作者能夠使用彙總和編譯查詢的簽章。StreamInsight LINQ 提供者可以透過屬性參考包含 UDA 實作的實際類別,如下列範例所示。

public static class MyUDAExtensionMethods
{
    [CepUserDefinedAggregate(typeof(Median))]
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)
    {
           throw CepUtility.DoNotCall();
    }
}

此處,Median 類別中必須存在 UDA 的實作,此類別會實作可在 int 類型的單一欄位上運作的 UDA,並傳回 int 類型的值。函數簽章中的運算式代表從輸入資料流的事件類型到單一整數值的對應。請注意,擴充方法永遠不會執行,因此 CepUtility.DoNotCall() 位於方法主體中。根據這項規格,UDA 可用於 LINQ 中,如下列範例所示。

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))
select new { f1 = w.Med(e => e.val) }

Lambda 運算式引數會將事件裝載對應到整數值,這個值將會是 UDA 的輸入。在此案例中,將會針對每一個視窗計算事件欄位 val 之值的中間值。

接下來,假設有一個*「時間不緊迫」*之 UDA 的範例,此 UDA 具有組態資訊。它會預期完整裝載 Trade 類型當做輸入,並傳回 double 類型的值。此範例也包含對應的擴充方法:

    public class Trade
    {
        public double Volume { get; set; }
        public double Price { get; set; }
    }

    public class Vwap : CepAggregate<Trade, double>
    {
        double weight;

        /// <summary>
        /// Constructor for parameterized UDA
        /// </summary>
        public Vwap(double w)
        {
            weight = w;
        }

        public override double GenerateOutput(IEnumerable<Trade> events)
        {
            double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);

            return vwap * weight;
        }
    }

    static public partial class UDAExtensionMethods
    {
        [CepUserDefinedAggregate(typeof(Vwap))]
        public static double vwap(this CepWindow<Trade> window, double w)
        {
            throw CepUtility.DoNotCall();
        }
    }

由於整個裝載都是輸入,因此擴充方法不會指定任何 Lambda 運算式。UDA 的唯一參數是組態的值 (在此處屬於 double):

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
             select new { f1 = w.vwap(2.5) }

接下來,假設有一個*「時間緊迫」*之 UDA 的範例,此 UDA 具有組態資訊。UDA 是一種時間加權的平均值,其間隔事件會解譯為步階函數 (也就是說,每個間隔在下一個間隔之前都是有效的)。與上一個範例類似,這個範例不會預期完整裝載當做輸入,而是只有 double 類型的值。

請注意,即使事件裝載縮減為 double 值,輸入集仍然定義為一組間隔事件,而與時間不緊迫的 UDA 為一組裝載的情況不同。因為 UDA 指定為時間緊迫,以致需要這個定義來包含時間戳記。此外,視窗本身是以 WindowDescription 物件的形式指定,其具有開始時間和結束時間屬性。這些時間戳記是指定成 UTC 時間。此外請注意,UdaConfig 是必須可透過 DataContractSerializer 序列化的類別或結構。

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>
{
    UdaConfig _udaConfig;
    public TimeWeightedAverage(UdaConfig udaConfig)
    {
        _udaConfig = udaConfig;
    }

    public override Output GenerateOutput(IEnumerable<IntervalEvent<double>> events,
                                          WindowDescriptor windowDescriptor)
    {
        double avg = 0;
        foreach (IntervalEvent<double> intervalEvent in events)
        {
            avg += intervalEvent.Payload * (intervalEvent.EndTime - 
                                            intervalEvent.StartTime).Ticks;
        }
        avg = avg / (windowDescriptor.EndTime - 
                     windowDescriptor.StartTime).Ticks;
        return avg * udaConfig.Multiplier;
    }
}

其中 UDAConfig 為

public class UDAConfig
{
    public double Multiplier { get; set; }
}

此擴充方法現在也包含下列組態結構:

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)
{
    throw CepUtility.DoNotCall();
}

組態變成擴充方法中的另一個參數:

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
         select new w.TimeWeightedAverage (e => e.dval,
                            new UdaConfig(){ Multiplier = 5 }); 

目前為止的範例考量了具型別之事件的案例。也就是說,實作 UDA 時已經知道裝載類型。下列範例會實作具有一般輸入類型的 UDA,而輸入類型只會在執行階段傳遞給 UDA。

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>
{
    public GenericInputUda(SampleUdaConfig config)
    {
        // ...
    }

    public override bool GenerateOutput(IEnumerable<TInput> payloads)
    {
        // ...
    }
}

實作使用者定義運算子

UDO 的作者具有下列責任:

  • 提供 UDO 的實際實作。

  • 提供擴充方法給 LINQ,讓查詢寫入者能夠使用 UDO。

若要實作 UDO,使用者會從適當的基底類別中衍生:CepOperator (適用於時間不緊迫的 UDO) 或 CepTimeSensitiveOperator。此類別衍生需要輸入和輸出類型參數的具現化。輸入類型一律代表完整裝載。輸出類型是一組裝載或一組事件 (根據選取的基底類別而定)。

除了事件資料以外,您可以在查詢開始時間將選擇性組態結構傳遞給 UDO 類別的建構函式 (如果 UDO 作者預期這點的話)。如果這類建構函式是由 UDO 作者所提供,引擎將會隨之在執行階段呼叫它,其組態是由 LINQ 中 UDO 的呼叫端所提供。

時間緊迫和不緊迫的 UDO 都會收到未排序之集合形式的裝載。如果是時間緊迫的 UDO,系統會額外地將事件的時間戳記與每一個裝載產生關聯。此外,定義視窗開始和結束時間的視窗描述項也會傳遞給 UDO。

使用者定義運算子中的 CTI 行為

UDO 會以下列方式變更目前時間增量 (CTI):當某個視窗仍然處於「開啟」狀態 (也就是說,在視窗結束時間之後尚未收到任何包含時間戳記的 CTI) 時,所有落於該視窗內的 CTI 都會變更為視窗開始時間。這樣可確保只要視窗仍然開啟,UDO 的輸出 (可能包含使用者定義時間戳記) 就可以變更。

使用者定義運算子實作範例

下列範例會實作沒有組態資訊且時間不緊迫的 UDO。

public class SampleUDO : CepOperator<Input, Output>
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        Output output = new Output();
        output.total = 0;
        output.status = "good";

        foreach (Input payload in payloads)
        {
            output.total += payload.Value;
            if (payload.Flag == 4)
            {
                output.status = "bad";
                break;
            }
        }
        List<Output> outputCollection = new List<Output>();
        outputCollection.Add(output);
        return outputCollection;
    }
}

下列範例示範如何將簽章變更為可接受組態資訊且時間緊迫的 UDO。

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>
{
    public GenericOutputUdo(SampleUdoConfig config)
    {
        ...
    }

    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(
                             IEnumerable<IntervalEvent<InputEventType>> payloads,
                             WindowDescriptor windowDescriptor)
    {
        ...
    }
}

使用者定義運算子的擴充方法範例

除了實作 UDO 以外,UDO 作者必須提供擴充方法給 LINQ,讓查詢寫入者能夠使用 UDO。此擴充方法是讓查詢作者能夠使用運算子並編譯查詢的簽章。LINQ 提供者可以透過屬性參考包含 UDO 實作的實際類別,如下列範例所示。

[CepUserDefinedOperator(typeof(SampleUDO))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)
{
    throw CepUtility.DoNotCall();
}

這個 UDO 現在可以用下列方式使用:

var newstream = from w in inputStream.Snapshot()
                select w.MyUDO();

下列範例會示範具有組態結構之 UDO 的擴充方法和用法,並參考包含在名為 SampleUDOwithConfig 之類別中的實作。

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)
{
    throw CepUtility.DoNotCall();
}

var newstream = from w in inputStream.SnapshotWindow()
                select w.MyUDO(new UDOConfig());

文化特性特有的事件欄位屬性

類似 UDO、UDA 和 UDF 等延伸模組可以視為 CEP 網域與其類型系統和 .Net CLR 之間的介面。對於某些應用程式而言,必須能夠透過這個介面來傳遞文化特性資訊。對於 UDA 和 UDO 而言,延伸模組作者可以實作額外的介面 IDeclareEventProperties,此介面可允許檢查或設定事件欄位的文化特性屬性。若要實作這個介面,您必須提供 DeclareEventProperties 函數,這個函數會傳回 CepEventType 的物件,該物件可以針對其欄位夾帶文化特性資訊,如下列範例所示:

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        // assuming string field 'loc' in type Input
        // assuming string fields 'firstName' and 'location' in type Output
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;
        return outputEventType;
    }
}

這個範例 UDO 會採用輸入類型的事件,並產生輸出類型的事件。輸出類型具有字串欄位,而 UDO 作者想要明確地使用某些文化特性資訊來為此欄位加上註解。名為 zh-CN 的文化特性會套用到 firstName 輸出欄位,而 location 輸出欄位會使用與 UDO 輸入事件類型中 loc 欄位相關聯的相同文化特性來加上註解。對於 UDO 在執行階段產生的每一個事件而言,在事件插入 UDO 的輸出資料流之前,這些文化特性都會套用到它的欄位。

使用者定義彙總也有相同的介面存在。因為彙總只有單一傳回值,所以為了將文化特性特有的資訊套用到這類欄位,IDeclareEventProperties 介面會使用單一欄位來將傳回值包裝到 CepEventType 中,以便提供一個方式來以 CEP 特有的事件屬性為此欄位加上註解。

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties
{
    public override string GenerateOutput(IEnumerable<Input> events)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        return outputEventType;
    }
}

在這裡,表示彙總結果的字串會包裝到 CepEventType 中,好讓 UDA 作者可以在該欄位上設定 CultureInfo 屬性。此文化特性資訊將會傳播給實際的事件欄位,該欄位會在使用 UDA 的 LINQ 查詢中接收彙總結果。

請參閱

概念

使用事件視窗

其他資源

以 LINQ 撰寫查詢範本