Användardefinierade aggregeringar för Azure Stream Analytics JavaScript

Azure Stream Analytics stöder användardefinierade aggregeringar (UDA) som skrivits i JavaScript. Det gör att du kan implementera komplex tillståndskänslig affärslogik. Inom UDA har du fullständig kontroll över tillståndsdatastrukturen, tillståndsansamling, tillståndsdecumulation och sammanställd resultatberäkning. Artikeln beskriver de två olika JavaScript UDA-gränssnitten, steg för att skapa en UDA och hur du använder UDA med fönsterbaserade åtgärder i Stream Analytics-frågan.

Användardefinierade JavaScript-aggregeringar

En användardefinierad aggregering används ovanpå en tidsfönsterspecifikation för att aggregera över händelserna i det fönstret och generera ett enda resultatvärde. Det finns två typer av UDA-gränssnitt som Stream Analytics stöder idag, AccumulateOnly och AccumulateDeaccumulate. Båda typerna av UDA kan användas av rullande, hoppande, glidande och sessionsfönster. AccumulateDeaccumulate UDA presterar bättre än AccumulateOnly UDA när det används tillsammans med Hopping, Sliding och Session Window. Du väljer en av de två typerna baserat på den algoritm som du använder.

AccumulateOnly-aggregeringar

AccumulateOnly-aggregeringar kan bara ackumulera nya händelser till dess tillstånd, algoritmen tillåter inte deaccumulation av värden. Välj den här aggregeringstypen när det inte går att implementera en händelseinformation från tillståndsvärdet. Följande är JavaScript-mallen för AccumulatOnly-aggregat:

// Sample UDA which state can only be accumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.computeResult = function () {
        return this.state;
    }
}

AccumulateDeaccumulate-aggregeringar

AccumulateDeaccumulate-aggregeringar tillåter deaccumulation av ett tidigare ackumulerat värde från tillståndet, till exempel ta bort ett nyckel/värde-par från en lista med händelsevärden eller subtrahera ett värde från ett summaaggregat. Följande är JavaScript-mallen för AccumulateDeaccumulate-aggregeringar:

// Sample UDA which state can be accumulated and deaccumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.deaccumulate = function (value, timestamp) {
        this.state -= value;
    }

    this.deaccumulateState = function (otherState){
        this.state -= otherState.state;
    }

    this.computeResult = function () {
        return this.state;
    }
}

UDA – JavaScript-funktionsdeklaration

Varje JavaScript UDA definieras av en funktionsobjektdeklaration. Följande är de viktigaste elementen i en UDA-definition.

Funktionsalias

Funktionsalias är UDA-identifieraren. När du anropas i Stream Analytics-frågan använder du alltid UDA-alias tillsammans med prefixet "uda".

Funktionstyp

För UDA ska funktionstypen vara JavaScript UDA.

Utdatatyp

En specifik typ av Stream Analytics-jobb som stöds eller "Alla" om du vill hantera typen i din fråga.

Funktionsnamn

Namnet på det här funktionsobjektet. Funktionsnamnet ska matcha UDA-aliaset.

Metod – init()

Metoden init() initierar aggregeringstillståndet. Den här metoden anropas när fönstret startas.

Metod – accumulate()

Metoden accumulate() beräknar UDA-tillståndet baserat på det tidigare tillståndet och de aktuella händelsevärdena. Den här metoden anropas när en händelse anger ett tidsfönster (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW eller SESSIONWINDOW).

Metod – deaccumulate()

Metoden deaccumulate() beräknar om tillståndet baserat på det tidigare tillståndet och de aktuella händelsevärdena. Den här metoden anropas när en händelse lämnar EN SLIDINGWINDOW eller SESSIONWINDOW.

Metod – deaccumulateState()

Metoden deaccumulateState() beräknar om tillståndet baserat på det tidigare tillståndet och tillståndet för ett hopp. Den här metoden anropas när en uppsättning händelser lämnar en HOPPINGWINDOW.

Metod – computeResult()

Metoden computeResult() returnerar aggregerat resultat baserat på det aktuella tillståndet. Den här metoden anropas i slutet av ett tidsfönster (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW eller SESSIONWINDOW).

Indata- och utdatatyper som stöds av JavaScript UDA

Information om JavaScript UDA-datatyper finns i avsnittet Stream Analytics och JavaScript-typkonvertering av Integrera JavaScript-UDF:er.

Lägga till en JavaScript UDA från Azure-portalen

Nedan går vi igenom processen med att skapa en UDA från portalen. Exemplet vi använder här är tidsviktade medelvärden för databehandling.

Nu ska vi skapa en JavaScript UDA under ett befintligt ASA-jobb genom att följa stegen nedan.

  1. Logga in på Azure-portalen och leta reda på ditt befintliga Stream Analytics-jobb.

  2. Välj sedan funktionslänken under JOBBTOPOLOGI.

  3. Välj Lägg till för att lägga till en ny funktion.

  4. I vyn Ny funktion väljer du JavaScript UDA som funktionstyp. Sedan visas en standardmall för UDA i redigeraren.

  5. Fyll i "TWA" som UDA-alias och ändra funktionsimplementeringen enligt följande:

    // Sample UDA which calculate Time-Weighted Average of incoming values.
    function main() {
        this.init = function () {
            this.totalValue = 0.0;
            this.totalWeight = 0.0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.totalValue += value.level * value.weight;
            this.totalWeight += value.weight;
    
        }
    
        // Uncomment below for AccumulateDeaccumulate implementation
        /*
        this.deaccumulate = function (value, timestamp) {
            this.totalValue -= value.level * value.weight;
            this.totalWeight -= value.weight;
        }
    
        this.deaccumulateState = function (otherState){
            this.state -= otherState.state;
            this.totalValue -= otherState.totalValue;
            this.totalWeight -= otherState.totalWeight;
        }
        */
    
        this.computeResult = function () {
            if(this.totalValue == 0) {
                result = 0;
            }
            else {
                result = this.totalValue/this.totalWeight;
            }
            return result;
        }
    }
    
  6. När du har valt knappen Spara visas UDA i funktionslistan.

  7. Välj den nya funktionen "TWA", du kan kontrollera funktionsdefinitionen.

Anropa JavaScript UDA i ASA-fråga

I Azure-portalen och öppna jobbet redigerar du frågan och anropar funktionen TWA() med ett mandatprefix "uda". Till exempel:

WITH value AS
(
    SELECT
    NoiseLevelDB as level,
    DurationSecond as weight
FROM
    [YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
    System.Timestamp as ts,
    uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)

Testa fråga med UDA

Skapa en lokal JSON-fil med innehållet nedan, ladda upp filen till Stream Analytics-jobbet och testa frågan ovan.

[
  {"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
  {"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
  {"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
  {"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
  {"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
  {"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
  {"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
  {"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]

Få hjälp

Om du vill ha mer hjälp kan du prova vår frågesida för Microsoft Q&A för Azure Stream Analytics.

Nästa steg