Реализация шаблона транзакционной папки исходящих сообщений с Azure Cosmos DB

Azure Cosmos DB
Служебная шина Azure
Функции Azure

Реализация надежного обмена сообщениями в распределенных системах может быть непростой задачей. В этой статье описывается, как использовать шаблон папки "Исходящие транзакции" для надежного обмена сообщениями и гарантированной доставки событий, важной частью поддержки идемпотентной обработки сообщений. Для этого будут применяться транзакционные пакеты и канал изменений Azure Cosmos DB в сочетании со Служебной шиной Azure.

Обзор

Архитектуры микрослужб становятся все популярнее и являются перспективным способом решения таких проблем, как обеспечение масштабируемости, удобства поддержки и гибкости, особенно больших приложений. Однако этот архитектурный шаблон также создает ряд проблем, связанных с обработкой данных. В распределенных приложениях каждая служба хранит свои данные, необходимые для ее работы, в выделенном хранилище данных, принадлежащем службе. Для поддержки такого сценария обычно используется решение для обмена сообщениями, например RabbitMQ, Kafka или Служебная шина Azure, которое передает данные (события) из одной службы через шину обмена сообщениями в другие службы приложения. Внутренние или внешние получатели могут подписываться на эти сообщения и получать уведомления об изменениях сразу после обработки данных.

Хорошим примером может служить система размещения заказов: когда пользователь хочет создать заказ, служба Ordering получает данные из клиентского приложения через конечную точку REST. Затем она сопоставляет полезные данные с внутренним представлением объекта Order для проверки данных. После успешной фиксации в базе данных она публикует событие OrderCreated в шине сообщений. Любая другая служба, которой нужно получать новые заказы (например, служба Inventory или Invoicing), подписывается на сообщения OrderCreated, обрабатывает их и сохраняет в собственной базе данных.

Следующий псевдокод показывает, как этот процесс обычно выглядит с точки зрения службы Ordering:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Этот подход отлично работает, пока не происходит ошибка в период между сохранением объекта заказа и публикацией соответствующего события. Отправка события может завершиться сбоем на этом этапе по многим причинам:

  • Ошибки сети.
  • недоступность службы сообщений;
  • сбой узла.

Какой бы ни была причина, в результате событие OrderCreated невозможно опубликовать в шине сообщений. Другие службы не будут уведомлены о создании заказа. Поэтому службе Ordering приходится решать дополнительные задачи, которые не связаны напрямую с бизнес-процессом. Ей необходимо отслеживать события, которые нужно будет разместить в канале сообщений, когда он снова станет доступен. Может случиться и самое худшее: несоответствие данных в приложении из-за потерянных событий.

Diagram that shows event handling without the Transactional Outbox pattern.

Решение

Существует хорошо известный шаблон, который может помочь избежать подобных ситуаций: транзакционная папка исходящих сообщений. Он обеспечивает сохранение событий в хранилище данных (как правило, в таблице исходящих сообщений в базе данных), прежде чем они будут в конечном итоге переданы в брокер сообщений. Если бизнес-объект и соответствующие события сохраняются в рамках одной транзакции базы данных, это гарантирует, что данные не будут потеряны. Либо будет зафиксировано все, либо при возникновении ошибки произойдет полный откат. Чтобы в конечном итоге опубликовать событие, другая служба или другой рабочий процесс запрашивает необработанные записи из таблицы исходящих сообщений, публикует события и помечает их как обработанные. Этот шаблон гарантирует, что события не будут потеряны после создания или изменения бизнес-объекта.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Скачайте файл Visio для этой архитектуры.

В реляционной базе данных такой шаблон реализуется очень просто. Например, если служба применяет Entity Framework Core, она будет использовать контекст Entity Framework для создания транзакции базы данных, сохранения бизнес-объекта и события и последующей фиксации транзакции (или ее отката). Кроме того, легко реализовать рабочую службу, обрабатывающую события: она периодически запрашивает новые записи из таблицы исходящих сообщений, публикует новые добавленные события в шине сообщений и, наконец, помечает эти записи как обработанные.

На практике все не так просто, как может показаться на первый взгляд. В первую очередь необходимо следить за тем, чтобы сохранялась очередность событий: чтобы событие OrderUpdated не публиковалось перед событием OrderCreated.

Реализация в Azure Cosmos DB

В этом разделе показано, как реализовать шаблон транзакционной папки исходящих сообщений в Azure Cosmos DB для обеспечения надежного, упорядоченного обмена сообщениями между различными службами с помощью канала изменений Azure Cosmos DB и Служебной шины. В нем демонстрируется пример службы, которая управляет объектами Contact (значениями FirstName, LastName, Email, Company и т. д.). В примере применяется шаблон разделения команд и запросов (CQRS), и он соответствует основным концепциям предметно-ориентированного проектирования. Пример кода для реализации можно найти на GitHub.

Объект Contact в примере службы имеет следующую структуру:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Как только объект Contact создается или изменяется, создаются события со сведениями о текущем изменении. В частности, возможны перечисленные ниже события предметной области.

  • ContactCreated. Создается при добавлении контакта.
  • ContactNameUpdated. Создается при изменении FirstName или LastName.
  • ContactEmailUpdated. Создается при изменении адреса электронной почты.
  • ContactCompanyUpdated. Создается при изменении любого из свойств компании.

Транзакционные пакеты

Чтобы реализовать этот шаблон, необходимо сделать так, чтобы бизнес-объект Contact и соответствующие события сохранялись в рамках одной и той же транзакции базы данных. В Azure Cosmos DB транзакции работают иначе, чем в системах реляционных баз данных. Транзакции Azure Cosmos DB, называемые транзакционными пакетами, работают с одной логической секцией, поэтому они гарантируют атомарность, согласованность, изоляцию и устойчивость (ACID). В рамках транзакционной пакетной операции нельзя сохранить два документа в разных контейнерах или логических секциях. Для нашего примера службы это означает, что и бизнес-объект, и событие (или события) будут помещаться в один и тот же контейнер и логическую секцию.

Контекст, репозитории и UnitOfWork

Основой примера реализации является контекст контейнера, который отслеживает объекты, сохраняемые в одном транзакционном пакете. Он ведет список созданных и измененных объектов и работает с одним контейнером Azure Cosmos DB. Его интерфейс выглядит следующим образом:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Список в компоненте контекста контейнера отслеживает объекты Contact и DomainEvent. Оба объекта будут помещаться в одном контейнере. Это означает, что в одном контейнере Azure Cosmos DB хранятся объекты разных типов, а для различения бизнес-объекта и события используется свойство Type.

Для каждого типа существует выделенный репозиторий, который определяет и реализует доступ к данным. Интерфейс репозитория Contact предоставляет следующие методы:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Репозиторий Event выглядит аналогично, за исключением того, что он имеет только один метод, который создает события в хранилище:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Реализации интерфейсов обоих репозиториев посредством внедрения зависимостей получают ссылку на один экземпляр IContainerContext, что обеспечивает их работу в одном и том же контексте Azure Cosmos DB.

Последний компонент — UnitOfWork. Он фиксирует изменения, содержащиеся в экземпляре IContainerContext, в Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Обработка событий: создание и публикация

При каждом создании, изменении или удалении (обратимом) объекта Contact служба создает соответствующее событие. В основе представленного решения лежит сочетание предметно-ориентированного проектирования (DDD) и шаблона посредника, предложенного Джимми Богардом (Jimmy Bogard). Он предлагает вести список событий, произошедших из-за изменений объекта предметной области, и публиковать эти события перед сохранением самого объекта в базе данных.

Список изменений хранится в самом объекте предметной области, поэтому другие компоненты не могут изменять цепочку событий. Порядок обслуживания событий (экземпляров IEvent) в объекте предметной области определяется через интерфейс IEventEmitter<IEvent> и реализуется в абстрактном классе DomainEntity:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Объект Contact создает события предметной области. Сущность Contact соответствует основным принципам DDD: методы задания свойств предметной области настраиваются как закрытые. В классе нет открытых методов задания. Вместо этого он предоставляет методы для управления внутренним состоянием. В этих методах могут создаваться соответствующие события для определенного изменения (например, ContactNameUpdated или ContactEmailUpdated).

Ниже приведен пример изменения имени контакта. (Событие создается в конце метода.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Соответствующий класс ContactNameUpdatedEvent, который отслеживает изменения, выглядит следующим образом:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

До сих пор события просто записывались в объект предметной области, и ничего не сохранялось в базе данных и даже не публиковалось в брокере сообщений. Согласно рекомендациям список событий будет обработан непосредственно перед сохранением бизнес-объекта в хранилище данных. В данном случае это происходит в методе SaveChangesAsync экземпляра IContainerContext, который реализован в закрытом методе RaiseDomainEvents. (dObjs представляет собой список сущностей, отслеживаемых контекстом контейнера.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

В последней строке пакет MediatR (реализация шаблона посредника в C#) используется для публикации события в приложении. Это возможно по той причине, что все события, например ContactNameUpdatedEvent, реализуют интерфейс INotification пакета MediatR.

События должны обрабатываться соответствующим обработчиком. И здесь задействуется реализация IEventsRepository. Вот пример обработчика событий NameUpdated:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Экземпляр IEventRepository внедряется в класс обработчика посредством конструктора. Как только событие ContactNameUpdatedEvent публикуется в службе, вызывается метод Handle, который использует экземпляр репозитория событий для создания объекта уведомления. Этот объект уведомления в свою очередь добавляется в список отслеживаемых объектов в объекте IContainerContext и присоединяется к объектам, которые сохраняются в Azure Cosmos DB в том же транзакционном пакете.

До сих пор контекст контейнера отслеживал то, какие объекты следует обрабатывать. Чтобы в конечном итоге сохранить отслеживаемые объекты в Azure Cosmos DB, реализация IContainerContext создает транзакционный пакет, добавляет все необходимые объекты и выполняет операцию в базе данных. Описанный процесс выполняется в методе SaveInTransactionalBatchAsync, который вызывается методом SaveChangesAsync.

Вот важные части реализации, необходимые для создания и выполнения транзакционного пакета:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Ниже приведен обзор процесса (для изменения имени в объекте контакта) до данного момента.

  1. Клиенту нужно изменить имя контакта. Для объекта контакта вызывается метод SetName, и свойства изменяются.
  2. Событие ContactNameUpdated добавляется в список событий в объекте предметной области.
  3. Вызывается метод Update репозитория контакта, который добавляет объект предметной области в контекст контейнера. Этот объект теперь отслеживается.
  4. Вызывается метод CommitAsync экземпляра UnitOfWork и в свою очередь вызывает метод SaveChangesAsync контекста контейнера.
  5. В методе SaveChangesAsync все события из списка объекта предметной области публикуются экземпляром MediatR и добавляются посредством репозитория событий в тот же контекст контейнера.
  6. В методе SaveChangesAsync создается объект TransactionalBatch. Он будет содержать как объект контакта, так и событие.
  7. TransactionalBatch выполняется и данные фиксируются в Azure Cosmos DB.
  8. Методы SaveChangesAsync и CommitAsync успешно возвращают управление.

Сохраняемость

Как видно из приведенных выше фрагментов кода, все объекты, сохраняемые в Azure Cosmos DB, инкапсулируются в экземпляре DataObject. Этот объект предоставляет общие свойства:

  • ID.
  • PartitionKey.
  • Type.
  • State. Как и Created, Updated не сохраняется в Azure Cosmos DB.
  • Etag. Для оптимистической блокировки.
  • TTL. Свойство срока жизни для автоматической очистки старых документов.
  • Data. Универсальный объект данных.

Эти свойства определяются в универсальном интерфейсе, который называется IDataObject и используется репозиториями и контекстом контейнера:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Объекты, инкапсулированные в экземпляре DataObject и сохраненные в базе данных, будут выглядеть, как в следующем примере (Contact и ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Как видно в этом примере, документы Contact и ContactNameUpdatedEvent (тип domainEvent) имеют один и тот же ключ секции и будут сохранены в одной логической секции.

Обработка канала изменений

Для считывания потока событий и их отправки в брокер сообщений служба будет использовать канал изменений Azure Cosmos DB.

Канал изменений — это постоянный журнал изменений в контейнере. Он работает в фоновом режиме и отслеживает изменения. В пределах одной логической секции гарантируется очередность изменений. Самый удобный способ считывания канала изменений — использование функции Azure с триггером Azure Cosmos DB. Другой вариант — использовать библиотеку обработчика канала изменений. Она позволяет интегрировать обработку канала изменений в веб-интерфейсе API в качестве фоновой службы (через интерфейс IHostedService). В этом примере используется простое консольное приложение, реализующее абстрактный класс BackgroundService для размещения длительных фоновых задач в приложениях .NET Core.

Чтобы получить изменения из канала изменений Azure Cosmos DB, необходимо создать экземпляр объекта ChangeFeedProcessor, зарегистрировать метод обработчика для обработки сообщений и начать прослушивание изменений:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Затем метод обработчика (в данном случае HandleChangesAsync) обрабатывает сообщения. В этом примере события публикуются в разделе Служебной шины, который разбит на секции для обеспечения масштабируемости и имеет функцию дедупликации. Любая служба, которой нужно получать сведения об изменениях в объектах Contact, может затем подписаться на этот раздел Служебной шины и обрабатывать изменения в собственном контексте.

Создаваемые сообщения Служебной шины имеют свойство SessionId. При использовании сеансов в Служебной шине гарантируется сохранение очередности сообщений (FIFO). В данном сценарии необходимо сохранять заказ.

Вот фрагмент кода, который обрабатывает сообщения из канала изменений:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Обработка ошибок

Если во время обработки изменений возникает ошибка, то библиотека канала изменений перезапускает чтение сообщений с того места, где был успешно обработан последний пакет. Например, если приложение успешно обработало 10 000 сообщений, сейчас работает с пакетом 10 001–10 025 и возникает ошибка, приложение может возобновить работу с позиции 10 001. Библиотека автоматически отслеживает обработанные сообщения с помощью сведений, сохраненных в контейнере Leases в Azure Cosmos DB.

Может случиться так, что служба уже отправила некоторые сообщения и они обрабатываются в Служебной шине повторно. Как правило, такая ситуация приводит к повторяющейся обработке сообщений. Как отмечалось ранее, Служебная шина имеет функцию обнаружения повторяющихся сообщений, которую необходимо включить, чтобы избежать такой ситуации. Служба проверяет, было ли сообщение добавлено ранее в раздел (или очередь) Служебной шины на основе контролируемого приложением свойства MessageId сообщения. Этому свойству присваивается ID документа события. Если сообщение отправляется в Служебную шину повторно, служба игнорирует и удаляет его.

Действия по обслуживанию

В типичной реализации транзакционной папки исходящих сообщений служба обновляет обработанные события и присваивает свойству Processed значение true, указывающее, что сообщение успешно опубликовано. Такую схему работы можно реализовать вручную в методе обработчика. В текущем сценарии такой процесс не требуется. Azure Cosmos DB отслеживает обработанные события с помощью канала изменений (в сочетании с контейнером Leases).

Иногда в самом конце требуется удалить события из контейнера, чтобы оставались только самые последние записи или документы. Для реализации периодической очистки к документам применяется еще одна функция Azure Cosmos DB: срок жизни (TTL). Azure Cosmos DB может автоматически удалять документы в соответствии со свойством TTL, которое можно добавить к документу. Его значением является интервал времени в секундах. Служба будет постоянно проверять контейнер на наличие документов со свойством TTL. Как только срок действия документа истечет, Azure Cosmos DB удалит его из базы данных.

Когда все компоненты работают должным образом, события обрабатываются и публикуются быстро, в течение нескольких секунд. Если в Azure Cosmos DB возникнет ошибка, события не будут отправляться в шину сообщений, так как бизнес-объект и соответствующие события невозможно сохранить в базе данных. Единственное, что можно сделать, — задать соответствующее значение TTL для документов DomainEvent, если фоновая рабочая роль (обработчик канала изменений) или служебная шина недоступны. В рабочей среде лучше выбрать временной интервал в несколько дней, например 10. Тогда у всех вовлеченных в процесс компонентов будет достаточно времени для обработки или публикации изменений в приложении.

Итоги

Шаблон транзакционной папки исходящих сообщений решает проблему надежной публикации событий предметной области в распределенных системах. Фиксируя состояние бизнес-объекта и его события в одном транзакционном пакете и используя фоновый обработчик в качестве ретранслятора сообщений, вы гарантируете получение нужной информации другими службами, как внутренними, так и внешними. Этот пример не является традиционной реализацией шаблона транзакционной папки исходящих сообщений. В нем используются такие функции, как канал изменений Azure Cosmos DB и срок жизни, которые упрощают работу.

Ниже приведена сводка компонентов Azure, применяемых в этом сценарии.

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Скачайте файл Visio для этой архитектуры.

Преимущества этого решения:

  • надежный обмен сообщениями и гарантированная доставка событий;
  • сохранение очередности событий и дедупликация сообщений с помощью Служебной шины;
  • отсутствие необходимости в дополнительном свойстве Processed, которое указывает на успешную обработку документа события;
  • удаление событий из Azure Cosmos DB при истечении срока жизни; этот процесс не расходует единицы запросов, необходимые для обработки запросов пользователей и приложений; вместо этого используются "оставшиеся" единицы запросов в фоновой задаче;
  • безошибочная обработка сообщений с помощью ChangeFeedProcessor (или функции Azure);
  • (необязательно) несколько обработчиков канала изменений, у каждого из которых собственный указатель в канале.

Рекомендации

Пример приложения, рассмотренный в этой статье, демонстрирует, как можно реализовать шаблон транзакционной папки исходящих сообщений в Azure с помощью Azure Cosmos DB и Служебной шины. Существуют и другие подходы с применением баз данных NoSQL. Чтобы гарантировать надежное сохранение бизнес-объекта и событий в базе данных, можно внедрить список событий в документ бизнес-объекта. Недостаток этого подхода в том, что в процессе очистки потребуется обновлять каждый документ, содержащий события. Это не идеальный вариант по сравнению с использованием срока жизни, особенно с точки зрения затрачиваемых единиц запросов.

Имейте в виду, что приведенный здесь пример кода не следует рассматривать как рабочий. Он имеет ряд ограничений в плане многопоточности, особенно что касается способа обработки событий в классе DomainEntity и способа отслеживания объектов в реализациях CosmosContainerContext. Используйте его в качестве отправной точки для собственных реализаций. Кроме того, рекомендуется использовать существующие библиотеки, которые уже имеют эту функцию, встроенную в них, например NServiceBus или MassTransit.

Развертывание этого сценария

Исходный код, файлы для развертывания и инструкции по тестированию этого сценария можно найти на GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Соавторы

Эта статья поддерживается корпорацией Майкрософт. Первоначально он был написан следующими участник.

Автор субъекта:

Чтобы просмотреть недоступные профили LinkedIn, войдите в LinkedIn.

Следующие шаги

Дополнительные сведения см. в следующих статьях: