Agrégats JavaScript définis par l’utilisateur Azure Stream AnalyticsAzure Stream Analytics JavaScript user-defined aggregates

Azure Stream Analytics prend en charge les agrégats définis par l’utilisateur (UDA) écrits en JavaScript, ce qui vous permet d’implémenter une logique métier avec état complexe.Azure Stream Analytics supports user-defined aggregates (UDA) written in JavaScript, it enables you to implement complex stateful business logic. Au sein de l’agrégat défini par l’utilisateur, vous avez un contrôle total sur la structure de données avec état, le cumul d’états, le non-cumul d’états et le calcul des résultats d’agrégation.Within UDA you have full control of the state data structure, state accumulation, state decumulation, and aggregate result computation. Cet article présente les deux interfaces UDA JavaScript, les étapes pour créer un agrégat défini par l’utilisateur et l’emploi d’un agrégat avec des opérations basées sur une fenêtre dans une requête Stream Analytics.The article introduces the two different JavaScript UDA interfaces, steps to create a UDA, and how to use UDA with window-based operations in Stream Analytics query.

Agrégats JavaScript définis par l’utilisateurJavaScript user-defined aggregates

Un agrégat défini par l’utilisateur est employé en plus d’une spécification de fenêtre de temps pour agréger les événements de cette fenêtre et produire une valeur à résultat unique.A user-defined aggregate is used on top of a time window specification to aggregate over the events in that window and produce a single result value. Il existe deux types d’interfaces UDA actuellement pris en charge par Stream Analytics : AccumulateOnly et AccumulateDeaccumulate.There are two types of UDA interfaces that Stream Analytics supports today, AccumulateOnly and AccumulateDeaccumulate. Les deux types d’agrégats définis par l’utilisateur peuvent être utilisés par la fenêtre bascule, la fenêtre récurrente, la fenêtre glissante et la fenêtre de session.Both types of UDA can be used by Tumbling, Hopping, Sliding and Session Window. L’agrégat défini par l’utilisateur AccumulateDeaccumulate est plus performant que l’agrégat défini par l’utilisateur AccumulateOnly lorsqu’il est utilisé avec la fenêtre récurrente, la fenêtre glissante et la fenêtre de session.AccumulateDeaccumulate UDA performs better than AccumulateOnly UDA when used together with Hopping, Sliding and Session Window. Vous choisissez un des deux types en fonction de l’algorithme que vous utilisez.You choose one of the two types based on the algorithm you use.

Agrégats AccumulateOnlyAccumulateOnly aggregates

Les agrégats AccumulateOnly ne peuvent accumuler que les nouveaux événements à leur état ; l’algorithme n’autorise pas désaccumulation de valeurs.AccumulateOnly aggregates can only accumulate new events to its state, the algorithm does not allow deaccumulation of values. Choisissez ce type d’agrégation lorsque la désaccumulation des informations d’événement à partir de la valeur d’état est impossible à mettre en œuvre.Choose this aggregate type when deaccumulate an event information from the state value is impossible to implement. Voici le modèle JavaScript pour les agrégats AccumulateOnly :Following is the JavaScript template for AccumulatOnly aggregates:

// Sample UDA which state can only be accumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.computeResult = function () {
        return this.state;
    }
}

Agrégats AccumulateDeaccumulateAccumulateDeaccumulate aggregates

Les agrégats AccumulateDeaccumulate permettent la désaccumulation d’une valeur cumulée précédemment à partir de l’état (par exemple, supprimer une paire clé-valeur dans la liste des valeurs d’événement ou soustraire une valeur d’un état de somme d’agrégation).AccumulateDeaccumulate aggregates allow deaccumulation of a previous accumulated value from the state, for example, remove a key-value pair from a list of event values, or subtract a value from a state of sum aggregate. Voici le modèle JavaScript pour les agrégats AccumulateDeaccumulate :Following is the JavaScript template for AccumulateDeaccumulate aggregates:

// Sample UDA which state can be accumulated and deaccumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.deaccumulate = function (value, timestamp) {
        this.state -= value;
    }

    this.deaccumulateState = function (otherState){
        this.state -= otherState.state;
    }

    this.computeResult = function () {
        return this.state;
    }
}

UDA - Déclaration de fonction JavaScriptUDA - JavaScript function declaration

Chaque UDA JavaScript est défini par une déclaration d’objet de fonction.Each JavaScript UDA is defined by a Function object declaration. Voici les principaux éléments d’une définition d’UDA.Following are the major elements in a UDA definition.

Alias de fonctionFunction alias

L’alias de fonction est l’identificateur de l’UDA.Function alias is the UDA identifier. En cas d’appel dans une requête Stream Analytics, utilisez toujours des alias d’UDA avec le préfixe « uda.».When called in Stream Analytics query, always use UDA alias together with a "uda." .prefix.

Type de fonctionFunction type

Pour un UDA, le type de fonction doit être UDA Javascript.For UDA, the function type should be Javascript UDA.

Type de sortieOutput type

Type particulier pris en charge par le travail Stream Analytics, ou « Tout » si vous souhaitez gérer le type dans votre requête.A specific type that Stream Analytics job supported, or "Any" if you want to handle the type in your query.

Nom de la fonctionFunction name

Nom de cet objet de fonction.The name of this Function object. Le nom de la fonction doit correspondre à l’alias de l’agrégat défini par l’utilisateur.The function name should match the UDA alias.

Méthode - init()Method - init()

La méthode init() initialise l’état de l’agrégat.The init() method initializes state of the aggregate. Cette méthode est appelée au démarrage de la fenêtre.This method is called when window starts.

Méthode – accumulate()Method – accumulate()

La méthode accumulate() calcule l’état de l’UDA en fonction de l’état précédent et des valeurs d’événement en cours.The accumulate() method calculates the UDA state based on the previous state and the current event values. Cette méthode est appelée lorsqu’un événement entre dans une fenêtre de temps (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW ou SESSIONWINDOW).This method is called when an event enters a time window (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW or SESSIONWINDOW).

Méthode – deaccumulate()Method – deaccumulate()

La méthode deaccumulate() recalcule l’état en fonction de l’état précédent et des valeurs d’événement en cours.The deaccumulate() method recalculates state based on the previous state and the current event values. Cette méthode est appelée lorsqu’un événement quitte une fenêtre SLIDINGWINDOW ou une fenêtre SESSIONWINDOW.This method is called when an event leaves a SLIDINGWINDOW or SESSIONWINDOW.

Méthode – deaccumulateState()Method – deaccumulateState()

La méthode deaccumulateState() recalcule l’état en fonction de l’état précédent et de l’état d’un tronçon.The deaccumulateState() method recalculates state based on the previous state and the state of a hop. Cette méthode est appelée lorsqu’un ensemble d’événements quitte une fenêtre récurrente HOPPINGWINDOW.This method is called when a set of events leave a HOPPINGWINDOW.

Méthode – computeResult()Method – computeResult()

La méthode computeResult() renvoie le résultat de l’agrégat en fonction de l’état actuel.The computeResult() method returns aggregate result based on the current state. Cette méthode est appelée à la fin d’une fenêtre de temps (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW ou SESSIONWINDOW).This method is called at the end of a time window (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW or SESSIONWINDOW).

Types de données d’entrée et de sortie pris en charge pour les UDA JavaScriptJavaScript UDA supported input and output data types

Pour les types de données des UDA JavaScript, reportez-vous à la section Conversion de type Stream Analytics et JavaScript de la rubrique Intégration d’UDF JavaScript.For JavaScript UDA data types, refer to section Stream Analytics and JavaScript type conversion of Integrate JavaScript UDFs.

Ajout d’un UDA JavaScript à partir du portail AzureAdding a JavaScript UDA from the Azure portal

Vous trouverez ci-dessous la procédure de création d’un UDA à partir du portail.Below we walk through the process of creating a UDA from Portal. L’exemple utilisé ici concerne le calcul de moyennes pondérées de durée.The example we use here is computing time weighted averages.

Maintenant, nous allons créer un UDA JavaScript sous un travail ASA existant en suivant les étapes.Now let's create a JavaScript UDA under an existing ASA job by following steps.

  1. Connectez-vous au portail Azure et recherchez le travail Stream Analytics existant.Log on to Azure portal and locate your existing Stream Analytics job.

  2. Cliquez ensuite sur le lien de fonctions sous Topologie de la tâche.Then click on functions link under JOB TOPOLOGY.

  3. Cliquez sur l’icône Ajouter pour ajouter une nouvelle fonction.Click on the Add icon to add a new function.

  4. Dans la vue Nouvelle fonction, sélectionnez le type de fonction UDA JavaScript. Vous verrez alors un modèle d’UDA par défaut apparaître dans l’éditeur.On the New Function view, select JavaScript UDA as the Function Type, then you see a default UDA template show up in the editor.

  5. Indiquez l’alias d’UDA « TWA » et modifiez l’implémentation de la fonction de la façon suivante :Fill in "TWA" as the UDA alias and change the function implementation as the following:

    // Sample UDA which calculate Time-Weighted Average of incoming values.
    function main() {
        this.init = function () {
            this.totalValue = 0.0;
            this.totalWeight = 0.0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.totalValue += value.level * value.weight;
            this.totalWeight += value.weight;
    
        }
    
        // Uncomment below for AccumulateDeaccumulate implementation
        /*
        this.deaccumulate = function (value, timestamp) {
            this.totalValue -= value.level * value.weight;
            this.totalWeight -= value.weight;
        }
    
        this.deaccumulateState = function (otherState){
            this.state -= otherState.state;
            this.totalValue -= otherState.totalValue;
            this.totalWeight -= otherState.totalWeight;
        }
        */
    
        this.computeResult = function () {
            if(this.totalValue == 0) {
                result = 0;
            }
            else {
                result = this.totalValue/this.totalWeight;
            }
            return result;
        }
    }
    
  6. Une fois que vous avez cliqué sur le bouton « Enregistrer », votre UDA s’affiche dans la liste des fonctions.Once you click the "Save" button, your UDA shows up on the function list.

  7. Cliquez sur la nouvelle fonction « TWA » pour lire sa définition.Click on the new function "TWA", you can check the function definition.

Appel d’un UDA JavaScript dans une requête ASACalling JavaScript UDA in ASA query

Dans le portail Azure, ouvrez votre projet, modifiez la requête, puis appelez la fonction TWA() avec un préfixe de mandat « uda. ».In Azure portal and open your job, edit the query and call TWA() function with a mandate prefix "uda.". Par exemple :For example:

WITH value AS
(
    SELECT
    NoiseLevelDB as level,
    DurationSecond as weight
FROM
    [YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
    System.Timestamp as ts,
    uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)

Test de requête avec UDATesting query with UDA

Créez un fichier JSON local avec le contenu ci-dessous, téléchargez le fichier dans le travail Stream Analytics et testez la requête ci-dessus.Create a local JSON file with below content, upload the file to Stream Analytics job, and test above query.

[
  {"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
  {"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
  {"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
  {"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
  {"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
  {"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
  {"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
  {"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]

Obtenir de l’aideGet help

Pour obtenir de l’aide, consultez notre page de questions Microsoft Q&A sur Azure Stream Analytics.For additional help, try our Microsoft Q&A question page for Azure Stream Analytics.

Étapes suivantesNext steps