Editar

Padrão de caixa de saída transacional com o Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

Implementar mensagens confiáveis em sistemas distribuídos pode ser um desafio. Este artigo descreve como usar o padrão Caixa de saída transacional para mensagens confiáveis e entrega garantida de eventos, uma parte importante do suporte ao processamento idempotente de mensagens. Para fazer isso, você usará lotes transacionais do Azure Cosmos DB e alterará o feed em combinação com o Barramento de Serviço do Azure.

Descrição geral

As arquiteturas de microsserviços estão se tornando cada vez mais populares e se mostram promissoras na solução de problemas como escalabilidade, manutenibilidade e agilidade, especialmente em grandes aplicações. Mas esse padrão arquitetônico também introduz desafios quando se trata de manipulação de dados. Em aplicativos distribuídos, cada serviço mantém independentemente os dados de que precisa para operar em um armazenamento de dados dedicado de propriedade do serviço. Para dar suporte a esse cenário, você normalmente usa uma solução de mensagens como RabbitMQ, Kafka ou Azure Service Bus que distribui dados (eventos) de um serviço por meio de um barramento de mensagens para outros serviços do aplicativo. Os consumidores internos ou externos podem então subscrever essas mensagens e ser notificados das alterações assim que os dados forem manipulados.

Um exemplo bem conhecido nessa área é um sistema de ordenação: quando um usuário deseja criar um pedido, um serviço recebe dados de um aplicativo cliente por meio de um Ordering ponto de extremidade REST. Ele mapeia a carga para uma representação interna de um Order objeto para validar os dados. Depois de uma confirmação bem-sucedida no banco de dados, ele publica um evento em um OrderCreated barramento de mensagens. Qualquer outro serviço interessado em novas encomendas (por exemplo, um Inventory ou Invoicing serviço) subscreveria OrderCreated mensagens, processá-las-ia e armazená-las-ia na sua própria base de dados.

O pseudocódigo a seguir mostra como esse processo normalmente se parece da perspetiva do Ordering serviço:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Essa abordagem funciona bem até que ocorra um erro entre salvar o objeto order e publicar o evento correspondente. O envio de um evento pode falhar neste momento por vários motivos:

  • Erros de rede
  • Interrupção do serviço de mensagens
  • Falha do anfitrião

Seja qual for o erro, o resultado é que o OrderCreated evento não pode ser publicado no barramento de mensagens. Outros serviços não serão notificados de que um pedido foi criado. O Ordering serviço agora tem que cuidar de várias coisas que não se relacionam com o processo de negócios real. Ele precisa acompanhar os eventos que ainda precisam ser colocados no ônibus de mensagens assim que ele voltar a ficar online. Mesmo o pior caso pode acontecer: inconsistências de dados no aplicativo devido a eventos perdidos.

Diagram that shows event handling without the Transactional Outbox pattern.

Solution

Há um padrão bem conhecido chamado Caixa de saída transacional que pode ajudá-lo a evitar essas situações. Ele garante que os eventos sejam salvos em um armazenamento de dados (normalmente em uma tabela de caixa de saída em seu banco de dados) antes de serem enviados para um agente de mensagens. Se o objeto de negócios e os eventos correspondentes forem salvos na mesma transação de banco de dados, é garantido que nenhum dado será perdido. Tudo será comprometido, ou tudo será revertido se houver um erro. Para eventualmente publicar o evento, um serviço ou processo de trabalho diferente consulta a tabela Caixa de Saída em busca de entradas não tratadas, publica os eventos e os marca como processados. Esse padrão garante que os eventos não serão perdidos depois que um objeto de negócios for criado ou modificado.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Transfira um ficheiro do Visio desta arquitetura.

Em um banco de dados relacional, a implementação do padrão é simples. Se o serviço usar o Entity Framework Core, por exemplo, ele usará um contexto do Entity Framework para criar uma transação de banco de dados, salvar o objeto de negócios e o evento e confirmar a transação ou fazer uma reversão. Além disso, o serviço de trabalho que está processando eventos é fácil de implementar: ele consulta periodicamente a tabela Caixa de Saída em busca de novas entradas, publica eventos recém-inseridos no barramento de mensagens e, finalmente, marca essas entradas como processadas.

Na prática, as coisas não são tão fáceis como poderiam parecer à primeira vista. Mais importante ainda, você precisa garantir que a ordem dos eventos seja preservada para que um evento não seja publicado antes de um OrderUpdatedOrderCreated evento.

Implementação no Azure Cosmos DB

Esta seção mostra como implementar o padrão de Caixa de Saída Transacional no Azure Cosmos DB para obter mensagens confiáveis e em ordem entre diferentes serviços com a ajuda do feed de alterações do Azure Cosmos DB e do Service Bus. Ele demonstra um serviço de exemplo que gerencia Contact objetos (FirstName, , , LastNameEmailCompany informações e assim por diante). Ele usa o padrão CQRS (Command and Query Responsibility Segregation) e segue conceitos básicos de design orientados por domínio. Você pode encontrar o código de exemplo para a implementação no GitHub.

Um Contact objeto no serviço de exemplo tem a seguinte estrutura:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Assim que um Contact é criado ou atualizado, ele emite eventos que contêm informações sobre a alteração atual. Entre outros, os eventos de domínio podem ser:

  • ContactCreated. Gerado quando um contato é adicionado.
  • ContactNameUpdated. Levantado quando FirstName ou LastName é alterado.
  • ContactEmailUpdated. Gerado quando o endereço de e-mail é atualizado.
  • ContactCompanyUpdated. Levantado quando qualquer uma das propriedades da empresa é alterada.

Lotes transacionais

Para implementar esse padrão, você precisa garantir que o Contact objeto de negócios e os eventos correspondentes sejam salvos na mesma transação de banco de dados. No Azure Cosmos DB, as transações funcionam de forma diferente do que nos sistemas de banco de dados relacional. As transações do Azure Cosmos DB, chamadas lotes transacionais, operam em uma única partição lógica, portanto, garantem propriedades ACID (Atomicidade, Consistência, Isolamento e Durabilidade). Não é possível salvar dois documentos em uma operação de lote transacional em contêineres ou partições lógicas diferentes. Para o serviço de exemplo, isso significa que o objeto de negócios e o evento ou eventos serão colocados no mesmo contêiner e partição lógica.

Contexto, repositórios e UnitOfWork

O núcleo da implementação de exemplo é um contexto de contêiner que controla os objetos salvos no mesmo lote transacional. Ele mantém uma lista de objetos criados e modificados e opera em um único contêiner do Azure Cosmos DB. A interface para ele se parece com isto:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

A lista no componente de contexto do contêiner rastreia Contact e DomainEvent objetos. Ambos serão colocados no mesmo recipiente. Isso significa que vários tipos de objetos são armazenados no mesmo contêiner do Azure Cosmos DB e usam uma Type propriedade para distinguir entre um objeto de negócios e um evento.

Para cada tipo, há um repositório dedicado que define e implementa o acesso aos dados. A Contact interface do repositório fornece estes métodos:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

O Event repositório é semelhante, exceto que há apenas um método, que cria novos eventos na loja:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

As implementações de ambas as interfaces de repositório obtêm uma referência por meio da injeção de dependência em uma única IContainerContext instância para garantir que ambas operem no mesmo contexto do Azure Cosmos DB.

O último componente é UnitOfWork, que confirma as alterações mantidas na IContainerContext instância no Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Tratamento de eventos: Criação e publicação

Toda vez que um objeto é criado, modificado ou (soft-) excluído, o serviço gera um Contact evento correspondente. O núcleo da solução fornecida é uma combinação de design orientado a domínio (DDD) e o padrão mediador proposto por Jimmy Bogard. Ele sugere manter uma lista de eventos que aconteceram devido a modificações do objeto de domínio e publicar esses eventos antes de salvar o objeto real no banco de dados.

A lista de alterações é mantida no próprio objeto de domínio para que nenhum outro componente possa modificar a cadeia de eventos. O comportamento de manutenção de eventos (IEvent instâncias) no objeto de domínio é definido através de uma interface IEventEmitter<IEvent> e implementado em uma classe abstrata DomainEntity :

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

O Contact objeto gera eventos de domínio. A Contact entidade segue conceitos básicos de DDD, configurando os setters das propriedades do domínio como privados. Não existem setters públicos na classe. Em vez disso, oferece métodos para manipular o estado interno. Nesses métodos, eventos apropriados para uma determinada modificação (por exemplo ContactNameUpdated ou ContactEmailUpdated) podem ser levantados.

Eis um exemplo de atualizações ao nome de um contacto. (O evento é gerado no final do método.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

O correspondente ContactNameUpdatedEvent, que acompanha as alterações, tem esta aparência:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Até agora, os eventos são apenas registrados no objeto de domínio e nada é salvo no banco de dados ou mesmo publicado em um agente de mensagens. Seguindo a recomendação, a lista de eventos será processada antes que o objeto comercial seja salvo no armazenamento de dados. Neste caso, isso acontece no SaveChangesAsync método da IContainerContext instância, que é implementado em um método privado RaiseDomainEvents . dObjs( é a lista de entidades controladas do contexto de contêiner.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Na última linha, o pacote MediatR , uma implementação do padrão mediador em C#, é usado para publicar um evento dentro do aplicativo. Fazer isso é possível porque todos os eventos como ContactNameUpdatedEvent implementar a INotification interface do pacote MediatR.

Esses eventos precisam ser processados por um manipulador correspondente. Aqui, a IEventsRepository implementação entra em jogo. Aqui está o exemplo do manipulador de NameUpdated eventos:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Uma IEventRepository instância é injetada na classe handler por meio do construtor. Assim que um é publicado no serviço, o método é invocado Handle e usa a instância do repositório de eventos para criar um ContactNameUpdatedEvent objeto de notificação. Esse objeto de notificação, por sua vez, é inserido na lista de objetos rastreados no IContainerContext objeto e une os objetos salvos no mesmo lote transacional ao Azure Cosmos DB.

Até agora, o contexto do contêiner sabe quais objetos processar. Para eventualmente persistir os objetos rastreados no Azure Cosmos DB, a implementação cria o lote transacional, adiciona todos os objetos relevantes e executa a IContainerContext operação no banco de dados. O processo descrito é tratado no SaveInTransactionalBatchAsync método, que é invocado SaveChangesAsync pelo método.

Aqui estão as partes importantes da implementação que você precisa para criar e executar o lote transacional:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Aqui está uma visão geral de como o processo funciona até agora (para atualizar o nome em um objeto de contato):

  1. Um cliente deseja atualizar o nome de um contato. O SetName método é invocado no objeto de contato e as propriedades são atualizadas.
  2. O ContactNameUpdated evento é adicionado à lista de eventos no objeto de domínio.
  3. O método do repositório de contatos é invocado, o que adiciona o objeto de domínio ao contexto do Update contêiner. O objeto agora é rastreado.
  4. CommitAsync é invocado UnitOfWork na instância, que, por sua vez, chama SaveChangesAsync o contexto do contêiner.
  5. No SaveChangesAsync, todos os eventos na lista do objeto de domínio são publicados por uma MediatR instância e adicionados por meio do repositório de eventos ao mesmo contexto de contêiner.
  6. Em SaveChangesAsync, a TransactionalBatch é criado. Ele armazenará o objeto de contato e o evento.
  7. As TransactionalBatch execuções e os dados são confirmados no Azure Cosmos DB.
  8. SaveChangesAsync e CommitAsync retornar com sucesso.

Persistência

Como você pode ver nos trechos de código anteriores, todos os objetos salvos no Azure Cosmos DB são encapsulados em uma DataObject instância. Este objeto fornece propriedades comuns:

  • ID.
  • PartitionKey.
  • Type.
  • State. Como Created, Updated não será persistido no Azure Cosmos DB.
  • Etag. Para um bloqueio otimista.
  • TTL. Propriedade Time To Live para limpeza automática de documentos antigos.
  • Data. Objeto de dados genérico.

Essas propriedades são definidas em uma interface genérica chamada e usada pelos repositórios IDataObject e pelo contexto do contêiner:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Os objetos encapsulados em uma DataObject instância e salvos no banco de dados terão a aparência deste exemplo (Contact e ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Você pode ver que os Contact documentos e (tipodomainEvent) têm a mesma chave de partição e ContactNameUpdatedEvent que ambos os documentos serão mantidos na mesma partição lógica.

Alterar o processamento de feed

Para ler o fluxo de eventos e enviá-los para um agente de mensagens, o serviço usará o feed de alterações do Azure Cosmos DB.

O feed de alterações é um log persistente de alterações em seu contêiner. Ele opera em segundo plano e rastreia modificações. Dentro de uma partição lógica, a ordem das alterações é garantida. A maneira mais conveniente de ler o feed de alterações é usar uma função do Azure com um gatilho do Azure Cosmos DB. Outra opção é usar a biblioteca do processador de feed de alterações. Ele permite que você integre o processamento de feed de alterações em sua API da Web como um serviço em segundo plano (através da IHostedService interface). O exemplo aqui usa um aplicativo de console simples que implementa a classe abstrata BackgroundService para hospedar tarefas em segundo plano de longa execução em aplicativos .NET Core.

Para receber as alterações do feed de alterações do Azure Cosmos DB, você precisa instanciar um objeto, registrar um ChangeFeedProcessor método de manipulador para processamento de mensagens e começar a ouvir as alterações:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Em seguida, um método manipulador (HandleChangesAsync aqui) processa as mensagens. Neste exemplo, os eventos são publicados em um tópico do Service Bus que é particionado para escalabilidade e tem o recurso de eliminação de duplicação habilitado. Qualquer serviço interessado em alterações em Contact objetos pode se inscrever nesse tópico do Service Bus e receber e processar as alterações para seu próprio contexto.

As mensagens do Service Bus produzidas têm uma SessionId propriedade. Ao usar sessões no Service Bus, você garante que a ordem das mensagens seja preservada (FIFO). Preservar a ordem é necessário para este caso de uso.

Aqui está o trecho que lida com mensagens do feed de alterações:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Processamento de erros

Se houver um erro enquanto as alterações estão sendo processadas, a biblioteca de feed de alterações reiniciará a leitura de mensagens na posição em que processou com êxito o último lote. Por exemplo, se o aplicativo processou com êxito 10.000 mensagens, agora está trabalhando no lote 10.001 a 10.025 e um erro acontece, ele pode reiniciar e retomar seu trabalho na posição 10.001. A biblioteca rastreia automaticamente o que foi processado por meio de informações salvas em um Leases contêiner no Azure Cosmos DB.

É possível que o serviço já tenha enviado algumas das mensagens que são reprocessadas para o Service Bus. Normalmente, esse cenário levaria ao processamento duplicado de mensagens. Como observado anteriormente, o Service Bus tem um recurso para deteção de mensagens duplicadas que você precisa habilitar para esse cenário. O serviço verifica se uma mensagem já foi adicionada a um tópico (ou fila) do Service Bus com base na propriedade controlada MessageId pelo aplicativo da mensagem. Essa propriedade é definida como o ID do documento do evento. Se a mesma mensagem for enviada novamente para o Service Bus, o serviço irá ignorá-la e soltá-la.

Manutenção

Em uma implementação típica de Caixa de Saída Transacional, o serviço atualiza os eventos manipulados e define uma propriedade como true, indicando que uma Processed mensagem foi publicada com êxito. Esse comportamento pode ser implementado manualmente no método manipulador. No cenário atual, não há necessidade de tal processo. O Azure Cosmos DB rastreia eventos que foram processados usando o feed de alterações (em combinação com o Leases contêiner).

Como última etapa, você ocasionalmente precisa excluir os eventos do contêiner para manter apenas os registros/documentos mais recentes. Para fazer uma limpeza periodicamente, a implementação aplica outro recurso do Azure Cosmos DB: Time To Live (TTL) em documentos. O Azure Cosmos DB pode excluir automaticamente documentos com base em uma TTL propriedade que pode ser adicionada a um documento: um período de tempo em segundos. O serviço irá verificar constantemente o contentor em busca de documentos que tenham um TTL imóvel. Assim que um documento expirar, o Azure Cosmos DB o removerá do banco de dados.

Quando todos os componentes funcionam conforme o esperado, os eventos são processados e publicados rapidamente: em segundos. Se houver um erro no Azure Cosmos DB, os eventos não serão enviados para o barramento de mensagens, porque o objeto de negócios e os eventos correspondentes não podem ser salvos no banco de dados. A única coisa a considerar é definir um valor apropriado TTL nos documentos quando o DomainEvent trabalhador em segundo plano (alterar processador de feed) ou o barramento de serviço não estiverem disponíveis. Em um ambiente de produção, é melhor escolher um período de tempo de vários dias. Por exemplo, 10 dias. Todos os componentes envolvidos terão tempo suficiente para processar/publicar alterações dentro do aplicativo.

Resumo

O padrão de caixa de saída transacional resolve o problema de publicar eventos de domínio de forma confiável em sistemas distribuídos. Ao confirmar o estado do objeto de negócios e seus eventos no mesmo lote transacional e usar um processador em segundo plano como retransmissão de mensagens, você garante que outros serviços, internos ou externos, eventualmente receberão as informações das quais dependem. Este exemplo não é uma implementação tradicional do padrão Caixa de Saída Transacional. Ele usa recursos como o feed de alterações do Azure Cosmos DB e o Time To Live que mantêm as coisas simples e limpas.

Aqui está um resumo dos componentes do Azure usados neste cenário:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Transfira um ficheiro do Visio desta arquitetura.

As vantagens desta solução são:

  • Mensagens confiáveis e entrega garantida de eventos.
  • Ordem preservada de eventos e eliminação de duplicação de mensagens via Service Bus.
  • Não há necessidade de manter uma propriedade extra Processed que indique o processamento bem-sucedido de um documento de evento.
  • Exclusão de eventos do Azure Cosmos DB via TTL. O processo não consome unidades de solicitação necessárias para lidar com solicitações de usuário/aplicativo. Em vez disso, ele usa unidades de solicitação "remanescentes" em uma tarefa em segundo plano.
  • Processamento à prova de erros de mensagens através ChangeFeedProcessor (ou de uma função do Azure).
  • Opcional: vários processadores de feed de alteração, cada um mantendo seu próprio ponteiro no feed de alterações.

Considerações

O aplicativo de exemplo discutido neste artigo demonstra como você pode implementar o padrão de Caixa de Saída Transacional no Azure com o Azure Cosmos DB e o Service Bus. Há também outras abordagens que usam bancos de dados NoSQL. Para garantir que o objeto de negócios e os eventos serão salvos de forma confiável no banco de dados, você pode incorporar a lista de eventos no documento do objeto de negócios. A desvantagem dessa abordagem é que o processo de limpeza precisará atualizar cada documento que contém eventos. Isso não é o ideal, especialmente em termos de custo unitário de solicitação, em comparação com o uso de TTL.

Lembre-se de que você não deve considerar o código de exemplo fornecido aqui código pronto para produção. Ele tem algumas limitações em relação ao multithreading, especialmente a maneira como os eventos são manipulados na classe e como os DomainEntityCosmosContainerContext objetos são rastreados nas implementações. Use-o como ponto de partida para suas próprias implementações. Como alternativa, considere o uso de bibliotecas existentes que já têm essa funcionalidade incorporada, como NServiceBus ou MassTransit.

Implementar este cenário

Você pode encontrar o código-fonte, os arquivos de implantação e as instruções para testar esse cenário no GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Contribuidores

Este artigo é mantido pela Microsoft. Foi originalmente escrito pelos seguintes contribuidores.

Autor principal:

Para ver perfis não públicos do LinkedIn, inicie sessão no LinkedIn.

Próximos passos

Reveja estes artigos para saber mais: