Получение событий через конечную точку HTTP

В этой статье описано, как проверить конечную точку HTTP, чтобы получать события из подписки событий, а затем получать события и десериализировать их. В этой статье в целях демонстрации используется служба "Функции Azure", однако те же принципы применяются независимо от того, где размещается приложение.

Примечание.

Рекомендуется использовать триггер сетки событий при активации функции Azure с сеткой событий. Это обеспечивает более простую и быструю интеграцию между Сеткой событий и Функциями Azure. Однако обратите внимание, что триггер сетки событий Функции Azure не поддерживает сценарий, в котором размещенный код должен контролировать код состояния HTTP, возвращенный в сетку событий. Учитывая это ограничение, ваш код, работающий в функции Azure, не сможет вернуть ошибку 5XX, например, чтобы инициировать повторную попытку доставки события Сеткой событий.

Необходимые компоненты

Вам потребуется приложение-функция с функцией, активируемой HTTP.

Добавление зависимостей

При разработке в .NET добавьте зависимость в функцию для Azure.Messaging.EventGridпакета NuGet.

Пакеты SDK для других языков доступны по ссылке на общедоступные пакеты SDK. Эти пакеты содержат модели для собственных типов событий, таких как EventGridEvent, StorageBlobCreatedEventData и EventHubCaptureFileCreatedEventData.

Проверка конечной точки

Сначала нужно обработать событие Microsoft.EventGrid.SubscriptionValidationEvent. Каждый раз, когда пользователь подписывается на событие, служба "Сетка событий" отправляет событие проверки в конечную точку с validationCode в полезных данных. Конечная точка необходима для передачи этих данных обратно в тело ответа, чтобы подтвердить, что конечная точка допустима и вы являетесь ее владельцем. При использовании триггера Сетки событий вместо функции, активируемой веб-перехватчиком, проверка конечной точки выполняется автоматически. При использовании службы API сторонних производителей (например, Zapier или IFTTT), возможно, не удастся вывести на экран код проверки программными средствами. Для этих служб можно вручную проверить подписку с помощью URL-адреса проверки, который отправляется в событии проверки подписки. Скопируйте этот URL-адрес в свойство validationUrl и отправьте запрос GET через клиент REST или веб-браузер.

В C# ParseMany() метод используется для десериализации BinaryData экземпляра, содержащего одно или несколько событий в массив EventGridEvent. Если вы знали заранее, что десериализация выполняется только одно событие, вместо этого можно использовать Parse метод.

Чтобы вывести на экран код проверки программными средствами, используйте следующий код:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response
                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };

                        return new OkObjectResult(responseData);
                    }
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }
    }
    context.done();
};

Проверка тестового ответа

Проверьте функцию ответа проверки, вставив в примере событие в поле проверки функции:

[{
  "id": "2d1781af-3a4c-4d7c-bd0c-e34b19da4e66",
  "topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "subject": "",
  "data": {
    "validationCode": "512d38b6-c7b8-40c8-89fe-f46f9e9622b6"
  },
  "eventType": "Microsoft.EventGrid.SubscriptionValidationEvent",
  "eventTime": "2018-01-25T22:12:19.4556811Z",
  "metadataVersion": "1",
  "dataVersion": "1"
}]

При нажатии кнопки "Выполнить" выходные данные должны быть 200 ОК и {"validationResponse":"512d38b6-c7b8-40c8-89fe-f46f9e9622b6"} в тексте:

Validation request

Validation output

Обработка событий хранилища BLOB-объектов

Теперь можно расширить функцию для обработки системного события Microsoft.Storage.BlobCreated:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response

                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };
                        return new OkObjectResult(responseData);
                    }
                    // Handle the storage blob created event
                    else if (eventData is StorageBlobCreatedEventData storageBlobCreatedEventData)
                    {
                        log.LogInformation($"Got BlobCreated event data, blob URI {storageBlobCreatedEventData.Url}");
                    }
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";
    var storageBlobCreatedEvent = "Microsoft.Storage.BlobCreated";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type  
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }

        else if (body.data && body.eventType == storageBlobCreatedEvent) {
            var blobCreatedEventData = body.data;
            context.log("Relaying received blob created event payload:" + JSON.stringify(blobCreatedEventData));
        }
    }
    context.done();
};

Обработка события создания тестового большого двоичного объекта

Проверьте новую возможность функции, поместив событие хранилища BLOB-объектов в поле тестирования и выполнив следующий код:

[{
  "topic": "/subscriptions/{subscription-id}/resourceGroups/Storage/providers/Microsoft.Storage/storageAccounts/xstoretestaccount",
  "subject": "/blobServices/default/containers/testcontainer/blobs/testfile.txt",
  "eventType": "Microsoft.Storage.BlobCreated",
  "eventTime": "2017-06-26T18:41:00.9584103Z",
  "id": "831e1650-001e-001b-66ab-eeb76e069631",
  "data": {
    "api": "PutBlockList",
    "clientRequestId": "6d79dbfb-0e37-4fc4-981f-442c9ca65760",
    "requestId": "831e1650-001e-001b-66ab-eeb76e000000",
    "eTag": "0x8D4BCC2E4835CD0",
    "contentType": "text/plain",
    "contentLength": 524288,
    "blobType": "BlockBlob",
    "url": "https://example.blob.core.windows.net/testcontainer/testfile.txt",
    "sequencer": "00000000000004420000000000028963",
    "storageDiagnostics": {
      "batchId": "b68529f3-68cd-4744-baa4-3c0498ec19f0"
    }
  },
  "dataVersion": "",
  "metadataVersion": "1"
}]

В журнале функции отобразятся выходные данные URL-адреса большого двоичного объекта:

2022-11-14T22:40:45.978 [Information] Executing 'Function1' (Reason='This function was programmatically called via the host APIs.', Id=8429137d-9245-438c-8206-f9e85ef5dd61)
2022-11-14T22:40:46.012 [Information] C# HTTP trigger function processed a request.
2022-11-14T22:40:46.017 [Information] Received events: [{"topic": "/subscriptions/{subscription-id}/resourceGroups/Storage/providers/Microsoft.Storage/storageAccounts/xstoretestaccount","subject": "/blobServices/default/containers/testcontainer/blobs/testfile.txt","eventType": "Microsoft.Storage.BlobCreated","eventTime": "2017-06-26T18:41:00.9584103Z","id": "831e1650-001e-001b-66ab-eeb76e069631","data": {"api": "PutBlockList","clientRequestId": "6d79dbfb-0e37-4fc4-981f-442c9ca65760","requestId": "831e1650-001e-001b-66ab-eeb76e000000","eTag": "0x8D4BCC2E4835CD0","contentType": "text/plain","contentLength": 524288,"blobType": "BlockBlob","url": "https://example.blob.core.windows.net/testcontainer/testfile.txt","sequencer": "00000000000004420000000000028963","storageDiagnostics": {"batchId": "b68529f3-68cd-4744-baa4-3c0498ec19f0"}},"dataVersion": "","metadataVersion": "1"}]
2022-11-14T22:40:46.335 [Information] Got BlobCreated event data, blob URI https://example.blob.core.windows.net/testcontainer/testfile.txt
2022-11-14T22:40:46.346 [Information] Executed 'Function1' (Succeeded, Id=8429137d-9245-438c-8206-f9e85ef5dd61, Duration=387ms)

Вы также можете протестировать, создав учетную запись хранения BLOB-объектов или учетную запись общего назначения версии 2 служба хранилища, добавив подписку на событие и указав конечную точку URL-адрес функции:

Function URL

Обработка пользовательских событий

Наконец, давайте расширим функцию еще раз, чтобы она могла обрабатывать и пользовательские события.

Установите флажок для события Contoso.Items.ItemReceived. Итоговый код должен выглядеть следующим образом:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response

                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };
                        return new OkObjectResult(responseData);
                    }
                    // Handle the storage blob created event
                    else if (eventData is StorageBlobCreatedEventData storageBlobCreatedEventData)
                    {
                        log.LogInformation($"Got BlobCreated event data, blob URI {storageBlobCreatedEventData.Url}");
                    }
                }
                // Handle the custom contoso event
                else if (eventGridEvent.EventType == "Contoso.Items.ItemReceived")
                {
                    var contosoEventData = eventGridEvent.Data.ToObjectFromJson<ContosoItemReceivedEventData>();
                    log.LogInformation($"Got ContosoItemReceived event data, item SKU {contosoEventData.ItemSku}");
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";
    var storageBlobCreatedEvent = "Microsoft.Storage.BlobCreated";
    var customEventType = "Contoso.Items.ItemReceived";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }

        else if (body.data && body.eventType == storageBlobCreatedEvent) {
            var blobCreatedEventData = body.data;
            context.log("Relaying received blob created event payload:" + JSON.stringify(blobCreatedEventData));
        }

        else if (body.data && body.eventType == customEventType) {
            var payload = body.data;
            context.log("Relaying received custom payload:" + JSON.stringify(payload));
        }
    }
    context.done();
};

Тестирование обработки пользовательских событий

Наконец, проверьте, может ли функция теперь обрабатывать ваш тип пользовательского события:

[{
    "subject": "Contoso/foo/bar/items",
    "eventType": "Contoso.Items.ItemReceived",
    "eventTime": "2017-08-16T01:57:26.005121Z",
    "id": "602a88ef-0001-00e6-1233-1646070610ea",
    "data": { 
            "itemSku": "Standard"
            },
    "dataVersion": "",
    "metadataVersion": "1"
}]

Кроме того, эту функцию можно протестировать в реальном времени, отправив пользовательское событие с помощью CURL с портала или опубликовав в пользовательском разделе, используя любую службу или приложение, которое может выполнять оператор POST в конечной точке, например Postman. Создайте пользовательский раздел и подписку событий с помощью конечной точки, заданной в качестве URL-адреса функции.

Заголовки сообщений

Ниже приведены свойства, которые предоставляются в заголовках сообщений.

Имя свойства Description
aeg-subscription-name Имя подписки на события.
aeg-delivery-count Число попыток, выполненных для события.
aeg-event-type

Тип события.

Может иметь одно из следующих значений.

  • SubscriptionValidation
  • Notification
  • SubscriptionDeletion
aeg-metadata-version

Версия метаданных события.

Для схемы событий Сетки событий это свойство представляет версию метаданных, а для схемы событий облака — версию спецификации.

aeg-data-version

Версия данных события.

Для схемы событий Сетки событий это свойство представляет версию данных, а для схемы событий облака оно не используется.

aeg-output-event-id Идентификатор события Сетки событий.

Следующие шаги