Обновление и слияние записей в Базе данных SQL Azure с помощью Функций Azure

В настоящее время Azure Stream Analytics (ASA) поддерживает только вставку (добавление) строк в выходные данные SQL (База данных SQL Azure и Azure Synapse Analytics). В этой статье обсуждаются обходные пути для включения UPDATE, UPSERT или MERGE в базах данных SQL с использованием Функций Azure в качестве промежуточного уровня.

Альтернативы Функциям Azure представлены в конце.

Требование

Запись данных в таблицу обычно осуществляется следующим образом:

Режим Эквивалентная инструкция T-SQL Требования
Добавление INSERT нет
Replace MERGE (UPSERT) Уникальный ключ
Накапливать MERGE (UPSERT) с оператором составного присваивания (+=, -=,..) Уникальный ключ и аккумулятор

Чтобы проиллюстрировать различия, ознакомьтесь с тем, что происходит при приеме следующих двух записей:

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

В режиме добавления мы вставим две записи. Эквивалентная инструкция T-SQL:

INSERT INTO [target] VALUES (...);

Результат:

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

В режиме замены мы получаем только последнее значение по ключу. Здесь мы используем Device_Id в качестве ключа. Эквивалентная инструкция T-SQL:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Результат:

Modified_Time Device_Key Measure_Value
10:05 A 20

Наконец, в режиме накопления мы будем суммировать Value с помощью составного оператора присваивания (+=). Здесь также мы используем Device_Id в качестве ключа:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Результат:

Modified_Time Device_Key Measure_Value
10:05 а 21

Из соображений производительности выходные адаптеры базы данных SQL ASA сейчас изначально поддерживают только режим добавления. В этих адаптерах используется массивная вставка, чтобы максимизировать пропускную способность и ограничить обратное давление.

В этой статье показано, как использовать Функции Azure для реализации режимов замены и накопления для ASA. При использовании функции в качестве промежуточного слоя потенциальная производительность записи не влияет на задание потоковой передачи. В этом отношении использование Функции Azure лучше всего работает с SQL Azure. При использовании Synapse SQL переключение с массового на операторы строк может привести к более высокой производительности.

Выходные данные Функций Azure

В нашем задании мы заменим выходные данные ASA SQL на выходные данные ASA Функции Azure. Возможности UPDATE, UPSERT или MERGE реализуются в функции.

Сейчас есть два варианта доступа к Базе данных SQL в функции. Во-первых, это выходная привязка Azure SQL. Сейчас этот вариант ограничен C# и предлагает только режим замены. Во-вторых, это составление запроса SQL, который будет отправлен через соответствующий драйвер SQL (Microsoft.Data.SqlClient для .NET).

Для обоих следующих примеров предполагается следующая схема таблицы. Вариант с привязкой требует, чтобы первичный ключ был установлен в целевой таблице. Это необязательно, но рекомендуется при использовании драйвера SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Функция должна соответствовать следующим ожиданиям для использования в качестве выходных данных ASA:

  • Azure Stream Analytics ожидает состояние HTTP 200 из приложения-функции для пакетов, которые были успешно обработаны.
  • Когда Azure Stream Analytics получает исключение 413 (сущность запроса HTTP слишком большая) из от Функций Azure, размер пакетов, отправляемых в Функции Azure, уменьшается.
  • Во время тестового подключения Stream Analytics отправляет запрос POST с пустым пакетом в Функции Azure и ждет возврата состояния HTTP 20x для проверки теста.

Вариант 1. Обновление по ключу с помощью привязки SQL Функций Azure.

Этот вариант использует расширение Выходная привязка SQL Функций Azure. Это расширение может заменить объект в таблице без необходимости писать инструкцию SQL. Сейчас составные операторы присваивания (накопления) не поддерживаются.

Этот пример был создан с помощью:

Чтобы лучше понять подход с привязкой, рекомендуется следовать этому руководству.

Сначала создайте приложение функции HttpTrigger по умолчанию, следуя этому руководству. Используются следующие сведения:

  • Язык: C#
  • Среда выполнения: .NET 6 (в функции или среде выполнения версии 4).
  • Шаблон: HTTP trigger.

Установите расширение привязки, выполнив следующую команду в терминале, расположенном в папке проекта:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Добавьте элемент SqlConnectionString в раздел Values файла local.settings.json, заполнив строку подключения целевого сервера:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Замените пространство имен, имя класса и имя функции на собственные:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Обновите имя целевой таблицы в разделе привязки:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Обновите раздел класса Device и сопоставления, чтобы он соответствовал вашей схеме:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Теперь можно протестировать проводку между локальной функцией и базой данных, отладив (F5 в Visual Studio Code). База данных SQL должна быть доступна с компьютера. SSMS можно использовать для проверки подключения. Затем можно использовать такой инструмент, как Postman, чтобы выдавать запросы POST к локальной конечной точке. Запрос с пустым телом должен возвращать состояние HTTP 204. Запрос с реальными полезными данными должен быть сохранен в целевой таблице (в режиме замены или обновления). Вот пример полезных данных, соответствующих схеме, используемой в этом примере:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Теперь функцию можно опубликовать в Azure. Для параметра приложения нужно задать SqlConnectionString. Брандмауэр Azure SQL Server должен разрешать службам Azure доступ к активной функции.

Затем эту функцию можно определить как результат задания ASA и использовать для замены записей вместо их вставки.

Вариант 2. Объединение с составным присвоением (накоплением) с помощью пользовательского SQL-запроса.

Примечание.

После перезапуска и восстановления ASA может повторно отправить выходные события, которые уже были отправлены. Это ожидаемое поведение, которое может привести к сбою логики накопления (удвоение отдельных значений). Чтобы избежать этого, рекомендуется выводить те же данные в таблицу через собственный вывод SQL ASA. Затем эту контрольную таблицу можно использовать для обнаружения проблем и повторной синхронизации накопления при необходимости.

Этот вариант использует Microsoft.Data.SqlClient. Эта библиотека позволяет выполнять любые запросы SQL к Базе данных SQL.

Этот пример был создан с помощью:

Сначала создайте приложение функции HttpTrigger по умолчанию, следуя этому руководству. Используются следующие сведения:

  • Язык: C#
  • Среда выполнения: .NET 6 (в функции или среде выполнения версии 4).
  • Шаблон: HTTP trigger.

Установите библиотеку SqlClient, выполнив следующую команду в терминале, расположенном в папке проекта:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Добавьте элемент SqlConnectionString в раздел Values файла local.settings.json, заполнив строку подключения целевого сервера:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Замените пространство имен, имя класса и имя функции на собственные:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Обновите раздел построения команд sqltext, чтобы он соответствовал вашей схеме (обратите внимание, как накопление достигается с помощью оператора += при обновлении):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Теперь вы можете проверить связь между локальной функцией и базой данных с помощью отладки (F5 в VS Code). База данных SQL должна быть доступна с компьютера. SSMS можно использовать для проверки подключения. Затем можно использовать такой инструмент, как Postman, чтобы выдавать запросы POST к локальной конечной точке. Запрос с пустым телом должен возвращать состояние HTTP 204. Запрос с актуальными полезными данными должен сохраняться в целевой таблице (в режиме накопления или слияния). Вот пример полезных данных, соответствующих схеме, используемой в этом примере:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

Теперь функцию можно опубликовать в Azure. Для параметра приложения нужно задать SqlConnectionString. Брандмауэр Azure SQL Server должен разрешать службам Azure доступ к активной функции.

Затем эту функцию можно определить как результат задания ASA и использовать для замены записей вместо их вставки.

Альтернативные варианты

Наряду с Функциями Azure есть и другие способы достижения ожидаемого результата. В этом разделе приведены некоторые из них.

Последующая обработка в целевой Базе данных SQL

Фоновая задача выполняется после вставки данных в базу данных с помощью стандартных выходных данных ASA SQL.

Для Azure SQL INSTEAD OFтриггеры DML можно использовать для перехвата команд INSERT, выдаваемых ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Для Synapse SQL ASA может вставлять в промежуточную таблицу. Затем повторяющаяся задача может преобразовать данные по мере необходимости в промежуточную таблицу. Наконец, данные перемещаются в производственную таблицу.

Предварительная обработка в Azure Cosmos DB

Azure Cosmos DB обеспечивает встроенную поддержку UPSERT. Здесь возможно только добавление и замена. Накопление должно управляться клиентом на стороне клиента в Azure Cosmos DB.

Если требования совпадают, можно заменить целевую базу данных SQL экземпляром Azure Cosmos DB. Это требует существенного изменения в общей архитектуре решения.

Для Synapse SQL Azure Cosmos DB можно использовать в качестве промежуточного уровня с помощью Azure Synapse Link для Azure Cosmos DB. Azure Synapse Link можно использовать для создания аналитического хранилища. Затем это хранилище данных можно запросить непосредственно в Synapse SQL.

Сравнение альтернатив

Каждый подход предлагает разные преимущества и возможности:

Тип Вариант Режимы База данных SQL Azure Azure Synapse Analytics
Постобработка
Триггеры Замена, накопление + Н/д, триггеры недоступны в Synapse SQL
Промежуточная Замена, накопление + +
Предварительная обработка
Функции Azure Замена, накопление + - (построчная производительность)
Замена Azure Cosmos DB Replace Неприменимо Неприменимо
Azure Cosmos DB Azure Synapse Link Replace Н/П +

Поддержка

За дополнительной информацией перейдите на страницу вопросов и ответов об Azure Stream Analytics.

Следующие шаги