Mettre à jour ou fusionner des enregistrements dans Azure SQL Database avec Azure Functions

Actuellement, Azure Stream Analytics (ASA) prend en charge seulement l’insertion (l’ajout) de lignes dans les sorties SQL (Bases de données Azure SQL et Azure Synapse Analytics). Cet article présente des solutions de contournement pour activer UPDATE, UPSERT ou MERGE sur les bases de données SQL, avec Azure Functions comme couche intermédiaire.

D’autres options d’Azure Functions sont présentées à la fin.

Condition requise

L’écriture de données dans une table peut généralement être effectuée de la manière suivante :

Mode Instruction T-SQL équivalente Spécifications
Ajouter INSERT Aucun
Remplacer MERGE (UPSERT) Clé unique
Accumuler MERGE (UPSERT) avec opérateur d’assignation composé (+=, -=…) Clé unique et accumulateur

Pour illustrer les différences, regardons ce qui se passe lors de l’ingestion des deux enregistrements suivants :

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

Dans le mode ajout, nous insérons deux enregistrements. L’instruction T-SQL équivalente est la suivante :

INSERT INTO [target] VALUES (...);

Ce qui donne :

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

En mode de remplacement, nous obtenons uniquement la dernière valeur par clé. Nous utilisons ici Device_Id comme clé. L’instruction T-SQL équivalente est la suivante :

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Ce qui donne :

Modified_Time Device_Key Measure_Value
10:05 A 20

Enfin, en mode d’accumulation, nous additionnons Value à un opérateur d’assignation composé (+=). Ici aussi, nous utilisons Device_Id comme clé :

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Ce qui donne :

Modified_Time Device_Key Measure_Value
10:05 A 21

Pour des raisons de performances, les adaptateurs de sortie de base de données SQL d’Azure Stream Analytics ne prennent actuellement en charge que le mode d’ajout en mode natif. Ces adaptateurs utilisent l’insertion en bloc pour maximiser le débit et limiter la sollicitation.

Cet article explique comment utiliser Azure Functions pour implémenter les modes Remplacer et Accumuler pour Azure Stream Analytics. Quand vous utilisez une fonction comme couche intermédiaire, les performances d’écriture potentielles n’auront pas d’impact sur le travail de diffusion en continu. À cet égard, l’utilisation d’Azure Functions fonctionne mieux avec Azure SQL. Avec Synapse SQL, passer d’une instruction en bloc à une instruction ligne par ligne peut créer des problèmes de performance plus importants.

Sortie Azure Functions

Dans notre travail, nous remplaçons la sortie SQL d’Azure Stream Analytics par la sortie Azure Functions d’Azure Stream Analytics. Les capacités UPDATE, UPSERT ou MERGE seront implémentées dans la fonction.

Il existe actuellement deux options pour accéder à une base de données SQL dans une fonction. La première est la liaison de sortie Azure SQL. Elle est actuellement limitée à C# et ne propose que le mode de remplacement. La seconde consiste à composer une requête SQL qui sera soumise via le pilote SQL approprié (Microsoft.Data.SqlClient for .NET).

Pour les deux exemples suivants, nous supposons le schéma de table suivant. L’option de liaison exige qu’une clé primaire soit définie sur la table cible. Ce n’est pas obligatoire, mais recommandé, lorsque vous utilisez un pilote SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Une fonction doit répondre aux attentes suivantes pour être utilisée comme une sortie d’Azure Stream Analytics :

  • Azure Stream Analytics attend l’état HTTP 200 de l’application Functions pour les lots qui ont été traités avec succès.
  • Lorsqu’Azure Stream Analytics reçoit une exception 413 (qui indique que l’entité de requête HTTP est trop volumineuse) de la part d’une fonction Azure, il réduit la taille des lots envoyés à Azure Functions.
  • Pendant le test de connexion, Stream Analytics envoie une requête POST avec un lot vide à Azure Functions et s’attend à recevoir l’état HTTP 20x en retour pour valider le test.

Option 1 : Mettre à jour par clé avec la liaison SQL Azure Functions

Cette option utilise la liaison de sortie SQL Azure Functions. Cette extension peut remplacer un objet dans une table, sans avoir à écrire une instruction SQL. Pour l’instant, elle ne prend pas en charge les opérateurs d’assignation composés (accumulations).

Cet exemple a été construit sur :

Pour mieux comprendre l’approche de liaison, il est recommandé de suivre ce tutoriel.

Tout d’abord, créez une application de fonction HttpTrigger par défaut en suivant ce tutoriel. Les informations suivantes sont utilisées :

  • Langage : C#
  • Runtime : .NET 6 (sous fonction/runtime v4)
  • Modèle : HTTP trigger

Installez l’extension de liaison en exécutant la commande suivante dans un terminal situé dans le dossier du projet :

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Ajoutez l’élément SqlConnectionString dans la section Values de votre fichier local.settings.json, en renseignant la chaîne de connexion du serveur de destination :

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Remplacez l’ensemble de la fonction (fichier .cs dans le projet) par l’extrait de code suivant. Mettez à jour l’espace de noms, le nom de la classe et le nom de la fonction par les vôtres :

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Mettez à jour le nom de la table de destination dans la section de liaison :

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Mettez à jour la classe Device et la section de mappage pour qu’elles correspondent à votre propre schéma :

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Vous pouvez maintenant tester la liaison entre la fonction locale et la base de données en déboguant (F5 dans Visual Studio Code). La base de données SQL doit être accessible depuis votre machine. SSMS peut être utilisé pour vérifier la connectivité. Ensuite, un outil comme Postman peut être utilisé pour émettre des requêtes POST vers le point de terminaison local. Une requête avec un corps vide doit renvoyer http 204. Une requête avec une charge utile réelle doit être rendue persistante dans la table de destination (en mode de remplacement/mise à jour). Voici un exemple de charge utile correspondant au schéma utilisé dans cet exemple :

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

La fonction peut désormais être publiée sur Azure. Un paramètre d’application doit être défini pour SqlConnectionString. Le pare-feu du serveur SQL Azure doit autoriser les services Azure à entrer pour que la fonction en direct puisse l’atteindre.

La fonction peut ensuite être définie en tant que sortie dans le travail Azure Stream Analytics et utilisée pour remplacer des enregistrements au lieu de les insérer.

Option 2 : Fusionner avec un opérateur d’assignation composé (accumuler) à l’aide d’une requête SQL personnalisée

Notes

Lors du redémarrage et de la récupération, Azure Stream Analytics peut renvoyer des événements de sortie qui ont déjà été émis. Il s’agit d’un comportement attendu qui peut provoquer l’échec de la logique d’accumulation (doublement des valeurs individuelles). Pour éviter cela, il est recommandé de générer les mêmes données dans une table par le biais de la sortie SQL native d’Azure Stream Analytics. Cette table de contrôle peut alors être utilisée pour détecter les problèmes et resynchroniser l’accumulation le cas échéant.

Cette option utilise Microsoft.Data.SqlClient. Cette bibliothèque nous permet d’émettre n’importe quelle requête SQL vers une base de données SQL.

Cet exemple a été construit sur :

Tout d’abord, créez une application de fonction HttpTrigger par défaut en suivant ce tutoriel. Les informations suivantes sont utilisées :

  • Langage : C#
  • Runtime : .NET 6 (sous fonction/runtime v4)
  • Modèle : HTTP trigger

Installez la bibliothèque SqlClient en exécutant la commande suivante dans un terminal situé dans le dossier du projet :

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Ajoutez l’élément SqlConnectionString dans la section Values de votre fichier local.settings.json, en renseignant la chaîne de connexion du serveur de destination :

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Remplacez l’ensemble de la fonction (fichier .cs dans le projet) par l’extrait de code suivant. Mettez à jour l’espace de noms, le nom de la classe et le nom de la fonction par les vôtres :

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Mettez à jour la section de liaison de la commande sqltext pour qu’elle corresponde à votre propre schéma (notez la manière dont l’accumulation est obtenue via l’opérateur += lors de la mise à jour) :

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Vous pouvez maintenant tester la liaison entre la fonction locale et la base de données en déboguant (F5 dans VS Code). La base de données SQL doit être accessible depuis votre machine. SSMS peut être utilisé pour vérifier la connectivité. Ensuite, un outil comme Postman peut être utilisé pour émettre des requêtes POST vers le point de terminaison local. Une requête avec un corps vide doit renvoyer http 204. Une requête avec une charge utile réelle doit être rendue persistante dans la table de destination (en mode d’accumulation/de fusion). Voici un exemple de charge utile correspondant au schéma utilisé dans cet exemple :

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

La fonction peut désormais être publiée sur Azure. Un paramètre d’application doit être défini pour SqlConnectionString. Le pare-feu du serveur SQL Azure doit autoriser les services Azure à entrer pour que la fonction en direct puisse l’atteindre.

La fonction peut ensuite être définie en tant que sortie dans le travail Azure Stream Analytics et utilisée pour remplacer des enregistrements au lieu de les insérer.

Autres solutions

En dehors d’Azure Functions, il existe plusieurs façons d’obtenir le résultat escompté. Cette section fournit certaines d’entre elles.

Post-traitement dans la base de données SQL cible

Une tâche en arrière-plan fonctionne une fois que les données seront insérées dans la base de données via les sorties standard d’Azure Stream Analytics.

Pour Azure SQL, les déclencheurs DMLINSTEAD OF peuvent être utilisés pour intercepter les commandes INSERT émises par Azure Stream Analytics :

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Pour Synapse SQL, Azure Stream Analytics peut insérer une table de mise en lots. Une tâche récurrente peut ensuite transformer les données selon les besoins dans une table intermédiaire. Enfin, les données sont déplacées vers la table de production.

Prétraitement dans Azure Cosmos DB

Azure Cosmos DB prend en charge UPSERT en mode natif. Ici, seul l’ajout/le remplacement est possible. Les accumulations doivent être gérées côté client dans Azure Cosmos DB.

Si les spécifications correspondent, une option consiste à remplacer la base de données SQL cible par une instance Azure Cosmos DB. Cela nécessite une modification importante de l’architecture globale de la solution.

Pour Synapse SQL, Azure Cosmos DB peut être utilisé comme couche intermédiaire via Azure Synapse Link pour Azure Cosmos DB. Azure Synapse Link peut être utilisé pour créer un magasin analytique. Ce magasin de données peut ensuite être interrogé directement dans Synapse SQL.

Comparaison des alternatives

Chaque approche offre une proposition de valeur et des capacités différentes :

Type Option Modes Azure SQL Database Azure Synapse Analytics
Post-traitement
Déclencheurs Remplacer, Accumuler + s.o., les déclencheurs ne sont pas disponibles dans Synapse SQL
Staging Remplacer, Accumuler + +
Pré-traitement
Azure Functions Remplacer, Accumuler + - (performances ligne par ligne)
Remplacement Azure Cosmos DB Remplacer N/A N/A
Azure Cosmos DB Azure Synapse Link Replace N/A +

Obtenir de l’aide

Pour obtenir de l’aide supplémentaire, essayez notre page de questions Microsoft Q&A pour Azure Stream Analytics.

Étapes suivantes