Megosztás a következőn keresztül:


Rekordok frissítése vagy egyesítése az Azure SQL Database-ben az Azure Functions használatával

Az Azure Stream Analytics (ASA) jelenleg csak az SQL-kimenetek (Azure SQL Databases és Azure Synapse Analytics) sorainak beszúrását (hozzáfűzését) támogatja. Ez a cikk az UPDATE, UP Standard kiadás RT vagy MERGE sql-adatbázisokon való engedélyezésének megkerülő megoldásait ismerteti, amelyek köztes rétegként az Azure Functionst használja.

Az Azure Functions alternatív lehetőségei a végén jelennek meg.

Követelmény

Az adatok táblázatba írása általában a következő módon végezhető el:

Mód Egyenértékű T-SQL utasítás Követelmények
Hozzáfűzés INSERT Egyik sem
Replace EGYESÍTÉS (FELFELÉ Standard kiadás RT) Egyedi kulcs
Felhalmozódnak MERGE (UP Standard kiadás RT) összetett hozzárendelési operátorral (+=, -=...) Egyedi kulcs és akkumulátor

A különbségek szemléltetéséhez tekintse meg, mi történik a következő két rekord betöltésekor:

Arrival_Time Device_Id Measure_Value
10:00 A 0
10:05 A 20

Hozzáfűzési módban két rekordot szúrunk be. Az egyenértékű T-SQL utasítás a következő:

INSERT INTO [target] VALUES (...);

Ennek eredménye:

Modified_Time Device_Id Measure_Value
10:00 A 0
10:05 A 20

Csere módban csak az utolsó értéket kapjuk meg kulcs szerint. Itt Device_Id használjuk kulcsként. Az egyenértékű T-SQL utasítás a következő:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Ennek eredménye:

Modified_Time Device_Key Measure_Value
10:05 A 20

Végül halmozásimódban egy összetett hozzárendelési operátorral (+=) összegzünkValue. Itt is a Device_Id használjuk kulcsként:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Ennek eredménye:

Modified_Time Device_Key Measure_Value
10:05 A 21

A teljesítmény szempontjából az ASA SQL-adatbázis kimeneti adapterei jelenleg csak natív módon támogatják a hozzáfűzési módot. Ezek az adapterek tömeges beszúrással maximalizálják az átviteli sebességet, és korlátozzák a visszanyomást.

Ez a cikk bemutatja, hogyan valósíthat meg csere- és halmozási módokat az Azure Functions használatával az ASA-hoz. Ha egy függvényt közvetítő rétegként használ, a lehetséges írási teljesítmény nem befolyásolja a streamelési feladatot. Ebben a tekintetben az Azure Functions használata a legjobban az Azure SQL-sel működik. A Synapse SQL esetén a tömegesről sorról sorra történő váltás nagyobb teljesítményproblémákat eredményezhet.

Azure Functions-kimenet

Feladatunkban az ASA SQL-kimenetet az ASA Azure Functions-kimenetre cseréljük. Az UPDATE, UP Standard kiadás RT vagy MERGE képességek a függvényben vannak implementálva.

Jelenleg két lehetőség áll rendelkezésre az SQL Database-hez való hozzáférésre egy függvényben. Az első az Azure SQL kimeneti kötése. Jelenleg C#-ra van korlátozva, és csak csere módot kínál. A második egy SQL-lekérdezés összeállítása, amelyet a megfelelő SQL-illesztőprogramon keresztül kell elküldeni (Microsoft.Data.SqlClient for .NET).

A következő minták esetében a következő táblázatsémát feltételezzük. A kötési beállításhoz egy elsődleges kulcsot kell beállítani a céltáblán. SQL-illesztő használata esetén nem szükséges, de ajánlott.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

A függvénynek meg kell felelnie a következő elvárásoknak, amelyeket az ASA kimeneteként kell használnia:

  • Az Azure Stream Analytics 200-os HTTP-állapotot vár a Functions alkalmazástól a sikeresen feldolgozott kötegekhez
  • Amikor az Azure Stream Analytics 413 ("http Request Entity Too Large") kivételt kap egy Azure-függvénytől, csökkenti az Azure-függvénynek küldött kötegek méretét
  • A tesztelési kapcsolat során a Stream Analytics egy üres köteggel rendelkező POST-kérelmet küld az Azure Functionsnek, és 20-szor újabb HTTP-állapotot vár a teszt ellenőrzéséhez

1. lehetőség: Frissítés kulccsal az Azure-függvény SQL-kötésével

Ez a beállítás az Azure Function SQL Kimeneti kötést használja. Ez a bővítmény lecserélhet egy objektumot egy táblában, anélkül, hogy SQL-utasítást kellene írnia. Jelenleg nem támogatja az összetett hozzárendelési operátorokat (akkumulációkat).

Ez a minta a következőre épült:

A kötési megközelítés jobb megértése érdekében javasoljuk, hogy kövesse ezt az oktatóanyagot.

Először hozzon létre egy alapértelmezett HttpTrigger-függvényalkalmazást az oktatóanyag követésével. A rendszer a következő információkat használja:

  • Nyelv: C#
  • Futtatókörnyezet: .NET 6 (a function/runtime v4 alatt)
  • Sablon: HTTP trigger

Telepítse a kötésbővítményt a következő parancs futtatásával a projektmappában található terminálon:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Adja hozzá az SqlConnectionString elemet a Values célkiszolgáló kapcsolati sztring kitöltése szakaszáhozlocal.settings.json:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Cserélje le a teljes függvényt (.cs fájlt a projektben) a következő kódrészletre. Frissítse saját névterét, osztálynevét és függvénynevét:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Frissítse a céltábla nevét a kötési szakaszban:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Frissítse az Device osztály- és leképezési szakaszt a saját sémájának megfelelően:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Mostantól hibakereséssel tesztelheti a helyi függvény és az adatbázis közötti vezetékeket (F5 a Visual Studio Code-ban). Az SQL-adatbázisnak elérhetőnek kell lennie a gépről. Az SSMS használható a kapcsolat ellenőrzésére. Ezután egy olyan eszköz, mint a Postman , post-kérelmeket adhat ki a helyi végpontnak. Egy üres törzsű kérésnek http 204-et kell visszaadnia. A tényleges hasznos adattal rendelkező kéréseket a céltáblában kell őrizni (csere/frissítés módban). Íme egy minta hasznos adat, amely megfelel az ebben a mintában használt sémának:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

A függvény mostantól közzétehető az Azure-ban. Egy alkalmazásbeállítást kell beállítani a következőhöz SqlConnectionString: . Az Azure SQL Server tűzfalának engedélyeznie kell , hogy az Azure-szolgáltatások elérjék az élő függvényt.

A függvény ezután kimenetként definiálható az ASA-feladatban, és a rekordok beszúrása helyett rekordok cseréjére használható.

2. lehetőség: Egyesítés összetett hozzárendeléssel (halmozódás) egyéni SQL-lekérdezésen keresztül

Feljegyzés

Újraindításkor és helyreállításkor az ASA újra küldheti a már kibocsátott kimeneti eseményeket. Ez egy olyan elvárt viselkedés, amely a felhalmozási logika meghiúsulását okozhatja (az egyes értékek megduplázása). Ennek megakadályozása érdekében javasoljuk, hogy ugyanazokat az adatokat adja ki egy táblában a natív ASA SQL-kimeneten keresztül. Ez a vezérlőtábla ezután a problémák észlelésére és szükség esetén a felhalmozás újraszinkronizálására használható.

Ez a beállítás a Microsoft.Data.SqlClientet használja. Ez a kódtár lehetővé teszi, hogy sql-lekérdezéseket adjunk ki egy SQL Database-nek.

Ez a minta a következőre épült:

  • Az Azure Functions 4-es futtatókörnyezete
  • .NET 6.0
  • Microsoft.Data.SqlClient 4.0.0

Először hozzon létre egy alapértelmezett HttpTrigger-függvényalkalmazást az oktatóanyag követésével. A rendszer a következő információkat használja:

  • Nyelv: C#
  • Futtatókörnyezet: .NET 6 (a function/runtime v4 alatt)
  • Sablon: HTTP trigger

Telepítse az SqlClient-kódtárat a következő parancs futtatásával a projektmappában található terminálon:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Adja hozzá az SqlConnectionString elemet a Values célkiszolgáló kapcsolati sztring kitöltése szakaszáhozlocal.settings.json:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Cserélje le a teljes függvényt (.cs fájlt a projektben) a következő kódrészletre. Frissítse saját névterét, osztálynevét és függvénynevét:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Frissítse a sqltext parancsépítési szakaszt a saját sémájának megfelelően (figyelje meg, hogyan érhető el a felhalmozás az operátoron keresztül a += frissítés során):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Mostantól hibakereséssel tesztelheti a helyi függvény és az adatbázis közötti vezetékeket (F5 a VS Code-ban). Az SQL-adatbázisnak elérhetőnek kell lennie a gépről. Az SSMS használható a kapcsolat ellenőrzésére. Ezután egy olyan eszköz, mint a Postman , post-kérelmeket adhat ki a helyi végpontnak. Egy üres törzsű kérésnek http 204-et kell visszaadnia. A tényleges hasznos adattal rendelkező kéréseket a céltáblában kell őrizni (halmozási/egyesítési módban). Íme egy minta hasznos adat, amely megfelel az ebben a mintában használt sémának:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

A függvény mostantól közzétehető az Azure-ban. Egy alkalmazásbeállítást kell beállítani a következőhöz SqlConnectionString: . Az Azure SQL Server tűzfalának engedélyeznie kell , hogy az Azure-szolgáltatások elérjék az élő függvényt.

A függvény ezután kimenetként definiálható az ASA-feladatban, és a rekordok beszúrása helyett rekordok cseréjére használható.

Alternatívák

Az Azure Functionsen kívül többféleképpen is el lehet érni a várt eredményt. Ez a szakasz néhányat tartalmaz.

Utófeldolgozás a cél SQL Database-ben

A háttérfeladat akkor működik, ha az adatok a standard ASA SQL-kimeneteken keresztül kerülnek be az adatbázisba.

Az Azure SQL INSTEAD OFesetében DML-eseményindítók használhatók az ASA által kiadott IN Standard kiadás RT-parancsok elfogására:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

A Synapse SQL esetében az ASA beszúrhat egy előkészítési táblába. Az ismétlődő tevékenységek ezután szükség szerint átalakíthatják az adatokat egy köztes táblává. Végül az adatok átkerülnek az éles táblába.

Előfeldolgozás az Azure Cosmos DB-ben

Az Azure Cosmos DB natív módon támogatja az UP Standard kiadás RT-t. Itt csak hozzáfűzés/csere lehetséges. A felhalmozásokat ügyféloldali felügyelet mellett kell kezelni az Azure Cosmos DB-ben.

Ha a követelmények egyeznek, a cél SQL-adatbázist egy Azure Cosmos DB-példányra kell cserélni. Ehhez fontos változásra van szükség az általános megoldásarchitektúra terén.

A Synapse SQL esetében az Azure Cosmos DB az Azure Cosmos DB-hez készült Azure Synapse Linken keresztül használható közvetítő rétegként. Az Azure Synapse Link használható elemzési tár létrehozásához. Ez az adattár ezután közvetlenül a Synapse SQL-ben kérdezhető le.

Az alternatívák összehasonlítása

Minden megközelítés különböző értékajánlatokat és képességeket kínál:

Típus Lehetőség Módok Azure SQL Database Azure Synapse Analytics
Feldolgozás utáni
Triggerek Csere, halmozódás + N/A, az eseményindítók nem érhetők el a Synapse SQL-ben
Előkészítés Csere, halmozódás + +
Előzetes feldolgozás
Azure Functions Csere, halmozódás + - (sorról sorra teljesítmény)
Az Azure Cosmos DB cseréje Replace N.A. N.A.
Azure Cosmos DB Azure Synapse Link Replace n/a +

Támogatás kérése

További segítségért próbálja ki a Microsoft Q&A kérdésoldalát az Azure Stream Analyticshez.

Következő lépések