Patroon Transactionele Postvak UIT met Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

Het implementeren van betrouwbare berichten in gedistribueerde systemen kan lastig zijn. In dit artikel wordt beschreven hoe u het transactionele postvak UIT-patroon gebruikt voor betrouwbare berichten en gegarandeerde levering van gebeurtenissen, een belangrijk onderdeel van het ondersteunen van idempotent berichtverwerking. Hiervoor gebruikt u transactionele batches van Azure Cosmos DB en wijzigingenfeed in combinatie met Azure Service Bus.

Overzicht

Microservicearchitecturen worden steeds populairder en tonen belofte bij het oplossen van problemen zoals schaalbaarheid, onderhoudbaarheid en flexibiliteit, met name in grote toepassingen. Maar dit architectuurpatroon introduceert ook uitdagingen bij het verwerken van gegevens. In gedistribueerde toepassingen onderhoudt elke service onafhankelijk de gegevens die nodig zijn om te werken in een toegewezen gegevensarchief in eigendom van de service. Ter ondersteuning van een dergelijk scenario gebruikt u doorgaans een berichtenoplossing zoals RabbitMQ, Kafka of Azure Service Bus die gegevens (gebeurtenissen) van de ene service distribueert via een berichtenbus naar andere services van de toepassing. Interne of externe consumenten kunnen zich vervolgens abonneren op deze berichten en op de hoogte worden gesteld van wijzigingen zodra gegevens worden gemanipuleerd.

Een bekend voorbeeld in dat gebied is een bestelsysteem: wanneer een gebruiker een order wil maken, ontvangt een Ordering service gegevens van een clienttoepassing via een REST-eindpunt. De nettolading wordt toegewezen aan een interne weergave van een Order object om de gegevens te valideren. Nadat de doorvoer naar de database is geslaagd, wordt er een OrderCreated gebeurtenis naar een berichtenbus gepubliceerd. Elke andere service die geïnteresseerd is in nieuwe orders (bijvoorbeeld een Inventory of Invoicing service), abonneert zich op OrderCreated berichten, verwerkt en opslaat in een eigen database.

De volgende pseudocode laat zien hoe dit proces er doorgaans uitziet vanuit het perspectief van de Ordering service:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Deze methode werkt goed totdat er een fout optreedt tussen het opslaan van het orderobject en het publiceren van de bijbehorende gebeurtenis. Het verzenden van een gebeurtenis kan om verschillende redenen mislukken:

  • Netwerkfouten
  • Berichtservicestoring
  • Hostfout

Wat de fout ook is, het resultaat is dat de OrderCreated gebeurtenis niet kan worden gepubliceerd naar de berichtenbus. Andere services ontvangen geen melding dat er een bestelling is gemaakt. De Ordering service moet nu voor verschillende zaken zorgen die niet betrekking hebben op het werkelijke bedrijfsproces. Het moet gebeurtenissen bijhouden die nog steeds op de berichtenbus moeten worden geplaatst zodra deze weer online is. Zelfs het ergste kan gebeuren: inconsistenties van gegevens in de toepassing vanwege verloren gebeurtenissen.

Diagram that shows event handling without the Transactional Outbox pattern.

Oplossing

Er is een bekend patroon met de naam Transactional Outbox waarmee u deze situaties kunt voorkomen. Het zorgt ervoor dat gebeurtenissen worden opgeslagen in een gegevensarchief (meestal in een Postvak UIT-tabel in uw database) voordat ze uiteindelijk naar een berichtenbroker worden gepusht. Als het bedrijfsobject en de bijbehorende gebeurtenissen worden opgeslagen binnen dezelfde databasetransactie, is het gegarandeerd dat er geen gegevens verloren gaan. Alles wordt doorgevoerd of alles wordt teruggedraaid als er een fout optreedt. Als u de gebeurtenis uiteindelijk wilt publiceren, voert een andere service of werkproces een query uit op de tabel Postvak UIT voor niet-verwerkte vermeldingen, publiceert u de gebeurtenissen en markeert u deze als verwerkt. Dit patroon zorgt ervoor dat gebeurtenissen niet verloren gaan nadat een bedrijfsobject is gemaakt of gewijzigd.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Een Visio-bestand van deze architectuur downloaden.

In een relationele database is de implementatie van het patroon eenvoudig. Als de service bijvoorbeeld Entity Framework Core gebruikt, wordt er een Entity Framework-context gebruikt om een databasetransactie te maken, het bedrijfsobject en de gebeurtenis op te slaan en de transactie vast te leggen of een terugdraaiactie uit te voeren. Bovendien is de werkrolservice die gebeurtenissen verwerkt eenvoudig te implementeren: er wordt periodiek een query uitgevoerd op de tabel Postvak UIT voor nieuwe vermeldingen, worden nieuwe ingevoegde gebeurtenissen naar de berichtenbus gepubliceerd en worden deze vermeldingen ten slotte gemarkeerd als verwerkt.

In de praktijk zijn dingen niet zo eenvoudig als ze eerst kunnen bekijken. Het belangrijkste is dat u ervoor moet zorgen dat de volgorde van de gebeurtenissen behouden blijft, zodat een OrderUpdated gebeurtenis niet vóór een OrderCreated gebeurtenis wordt gepubliceerd.

Implementatie in Azure Cosmos DB

In deze sectie wordt beschreven hoe u het transactionele postvak UIT-patroon in Azure Cosmos DB implementeert om betrouwbare berichten in volgorde te verzenden tussen verschillende services met behulp van de Azure Cosmos DB-wijzigingenfeed en Service Bus. Het demonstreert een voorbeeldservice waarmee objecten (FirstName, LastName, EmailCompany informatie enzovoort) worden beheerdContact. Het maakt gebruik van het CQRS-patroon (Command and Query Responsibility Segregation) en volgt de basisconcepten van het domeingestuurde ontwerp. U vindt de voorbeeldcode voor de implementatie op GitHub.

Een Contact object in de voorbeeldservice heeft de volgende structuur:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Zodra een Contact is gemaakt of bijgewerkt, worden gebeurtenissen verzonden die informatie over de huidige wijziging bevatten. Domeinevenementen kunnen onder andere het volgende zijn:

  • ContactCreated. Verhoogd wanneer een contactpersoon wordt toegevoegd.
  • ContactNameUpdated. Verhoogd wanneer FirstName of LastName wordt gewijzigd.
  • ContactEmailUpdated. Wordt gegenereerd wanneer het e-mailadres wordt bijgewerkt.
  • ContactCompanyUpdated. Wordt gegenereerd wanneer een van de bedrijfseigenschappen wordt gewijzigd.

Transactionele batches

Als u dit patroon wilt implementeren, moet u ervoor zorgen dat het Contact bedrijfsobject en de bijbehorende gebeurtenissen worden opgeslagen in dezelfde databasetransactie. In Azure Cosmos DB werken transacties anders dan in relationele databasesystemen. Azure Cosmos DB-transacties, transactionele batches genoemd, werken op één logische partitie, zodat ze atomiciteits-, consistentie-, isolatie- en duurzaamheidseigenschappen (ACID) garanderen. U kunt twee documenten niet opslaan in een transactionele batchbewerking in verschillende containers of logische partities. Voor de voorbeeldservice betekent dit dat zowel het bedrijfsobject als de gebeurtenis of gebeurtenissen in dezelfde container en logische partitie worden geplaatst.

Context, opslagplaatsen en UnitOfWork

De kern van de voorbeeld-implementatie is een containercontext waarmee objecten worden bijgehouden die in dezelfde transactionele batch worden opgeslagen. Het onderhoudt een lijst met gemaakte en gewijzigde objecten en werkt op één Azure Cosmos DB-container. De interface voor deze ziet er als volgt uit:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

De lijst in het containercontextonderdeel houdt bij Contact en DomainEvent objecten. Beide worden in dezelfde container geplaatst. Dit betekent dat meerdere typen objecten worden opgeslagen in dezelfde Azure Cosmos DB-container en een Type eigenschap gebruiken om onderscheid te maken tussen een bedrijfsobject en een gebeurtenis.

Voor elk type is er een toegewezen opslagplaats die de gegevenstoegang definieert en implementeert. De interface van de Contact opslagplaats biedt de volgende methoden:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

De Event opslagplaats ziet er ongeveer als volgt uit, behalve dat er slechts één methode is, waarmee nieuwe gebeurtenissen in de store worden gemaakt:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

De implementaties van beide opslagplaatsinterfaces krijgen een verwijzing via afhankelijkheidsinjectie naar één IContainerContext exemplaar om ervoor te zorgen dat beide werken in dezelfde Azure Cosmos DB-context.

Het laatste onderdeel is UnitOfWork, waarmee de wijzigingen in het IContainerContext exemplaar worden doorgevoerd in Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Gebeurtenisafhandeling: maken en publiceren

Telkens wanneer een Contact object wordt gemaakt, gewijzigd of (voorlopig verwijderd), genereert de service een bijbehorende gebeurtenis. De kern van de geboden oplossing is een combinatie van domeingestuurd ontwerp (DDD) en het bemiddelaarpatroon dat door Jimmy Bogard wordt voorgesteld. Hij stelt voor een lijst met gebeurtenissen te onderhouden die zijn opgetreden vanwege wijzigingen van het domeinobject en het publiceren van deze gebeurtenissen voordat u het werkelijke object opslaat in de database.

De lijst met wijzigingen wordt bewaard in het domeinobject zelf, zodat er geen ander onderdeel de keten van gebeurtenissen kan wijzigen. Het gedrag van het onderhouden van gebeurtenissen (IEvent instanties) in het domeinobject wordt gedefinieerd via een interface IEventEmitter<IEvent> en geïmplementeerd in een abstracte DomainEntity klasse:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Het Contact object genereert domein gebeurtenissen. De Contact entiteit volgt de basisconcepten van DDD en configureert de setters van de domeineigenschappen als privé. Er bestaan geen openbare setters in de klasse. In plaats daarvan biedt het methoden om de interne status te bewerken. In deze methoden kunnen de juiste gebeurtenissen voor een bepaalde wijziging (bijvoorbeeld ContactNameUpdated of ContactEmailUpdated) worden gegenereerd.

Hier volgt een voorbeeld voor updates van de naam van een contactpersoon. (De gebeurtenis wordt aan het einde van de methode gegenereerd.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Het bijbehorende ContactNameUpdatedEvent, waarmee de wijzigingen worden bijgehouden, ziet er als volgt uit:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Tot nu toe worden gebeurtenissen alleen vastgelegd in het domeinobject en wordt er niets opgeslagen in de database of zelfs gepubliceerd naar een berichtenbroker. Na de aanbeveling wordt de lijst met gebeurtenissen direct verwerkt voordat het zakelijke object wordt opgeslagen in het gegevensarchief. In dit geval gebeurt dit in de SaveChangesAsync methode van het IContainerContext exemplaar, dat wordt geïmplementeerd in een privémethode RaiseDomainEvents . (dObjs is de lijst met bijgehouden entiteiten van de containercontext.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Op de laatste regel wordt het MediatR-pakket , een implementatie van het bemiddelaarpatroon in C#, gebruikt om een gebeurtenis in de toepassing te publiceren. Dit is mogelijk omdat alle gebeurtenissen, zoals ContactNameUpdatedEvent het implementeren van de INotification interface van het MediatR-pakket, mogelijk zijn.

Deze gebeurtenissen moeten worden verwerkt door een bijbehorende handler. Hier komt de IEventsRepository implementatie in het spel. Hier volgt het voorbeeld van de NameUpdated gebeurtenis-handler:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Een IEventRepository exemplaar wordt via de constructor in de handlerklasse geïnjecteerd. Zodra een ContactNameUpdatedEvent in de service wordt gepubliceerd, wordt de Handle methode aangeroepen en wordt de instantie van de gebeurtenisopslagplaats gebruikt om een meldingsobject te maken. Dat meldingsobject wordt op zijn beurt ingevoegd in de lijst met bijgehouden objecten in het IContainerContext object en voegt de objecten toe die in dezelfde transactionele batch zijn opgeslagen in Azure Cosmos DB.

Tot nu toe weet de containercontext welke objecten moeten worden verwerkt. Als u de bijgehouden objecten uiteindelijk wilt behouden in Azure Cosmos DB, maakt de IContainerContext implementatie de transactionele batch, voegt u alle relevante objecten toe en voert u de bewerking uit op de database. Het beschreven proces wordt verwerkt in de SaveInTransactionalBatchAsync methode, die wordt aangeroepen door de SaveChangesAsync methode.

Hier volgen de belangrijke onderdelen van de implementatie die u nodig hebt om de transactionele batch te maken en uit te voeren:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Hier volgt een overzicht van hoe het proces tot nu toe werkt (voor het bijwerken van de naam van een contactobject):

  1. Een client wil de naam van een contactpersoon bijwerken. De SetName methode wordt aangeroepen op het contactobject en de eigenschappen worden bijgewerkt.
  2. De ContactNameUpdated gebeurtenis wordt toegevoegd aan de lijst met gebeurtenissen in het domeinobject.
  3. De methode van de opslagplaats voor Update contactpersonen wordt aangeroepen, waarmee het domeinobject wordt toegevoegd aan de containercontext. Het object wordt nu bijgehouden.
  4. CommitAsync wordt aangeroepen op het UnitOfWork exemplaar, dat op zijn beurt de containercontext aanroept SaveChangesAsync .
  5. Binnen SaveChangesAsyncworden alle gebeurtenissen in de lijst met het domeinobject gepubliceerd door een MediatR exemplaar en worden ze via de gebeurtenisopslagplaats toegevoegd aan dezelfde containercontext.
  6. Er SaveChangesAsyncwordt een TransactionalBatch gemaakt. Zowel het contactobject als de gebeurtenis worden opgeslagen.
  7. De TransactionalBatch uitvoeringen en de gegevens worden doorgevoerd in Azure Cosmos DB.
  8. SaveChangesAsync en CommitAsync keert terug.

Persistentie

Zoals u in de voorgaande codefragmenten kunt zien, worden alle objecten die zijn opgeslagen in Azure Cosmos DB, verpakt in een DataObject exemplaar. Dit object biedt algemene eigenschappen:

  • ID.
  • PartitionKey.
  • Type.
  • State. Net als Createdin Updated Azure Cosmos DB wordt dit niet bewaard.
  • Etag. Voor optimistische vergrendeling.
  • TTL. Time To Live-eigenschap voor het automatisch opschonen van oude documenten.
  • Data. Algemeen gegevensobject.

Deze eigenschappen worden gedefinieerd in een algemene interface die wordt aangeroepen IDataObject en wordt gebruikt door de opslagplaatsen en de containercontext:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Objecten die in een DataObject exemplaar zijn verpakt en in de database worden opgeslagen, zien er dan uit als dit voorbeeld (Contact en ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

U kunt zien dat de Contact en ContactNameUpdatedEvent (type domainEvent) documenten dezelfde partitiesleutel hebben en dat beide documenten in dezelfde logische partitie worden bewaard.

Verwerking van wijzigingenfeeds

Als u de stroom gebeurtenissen wilt lezen en naar een berichtenbroker wilt verzenden, gebruikt de service de Wijzigingenfeed van Azure Cosmos DB.

De wijzigingenfeed is een permanent logboek met wijzigingen in uw container. Het werkt op de achtergrond en houdt wijzigingen bij. Binnen één logische partitie wordt de volgorde van de wijzigingen gegarandeerd. De handigste manier om de wijzigingenfeed te lezen, is door een Azure-functie te gebruiken met een Azure Cosmos DB-trigger. Een andere optie is om de processorbibliotheek voor wijzigingenfeeds te gebruiken. Hiermee kunt u de verwerking van wijzigingenfeeds in uw web-API integreren als een achtergrondservice (via de IHostedService interface). In het voorbeeld hier wordt een eenvoudige consoletoepassing gebruikt waarmee de abstracte klasse BackgroundService wordt geïmplementeerd voor het hosten van langlopende achtergrondtaken in .NET Core-toepassingen.

Als u de wijzigingen van de Azure Cosmos DB-wijzigingenfeed wilt ontvangen, moet u een ChangeFeedProcessor object instantiëren, een handlermethode registreren voor berichtverwerking en beginnen met het luisteren naar wijzigingen:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Een handlermethode (HandleChangesAsync hier) verwerkt vervolgens de berichten. In dit voorbeeld worden gebeurtenissen gepubliceerd naar een Service Bus-onderwerp dat is gepartitioneerd voor schaalbaarheid en waarvoor de functie voor duplicatie is ingeschakeld. Elke service die geïnteresseerd is in wijzigingen in Contact objecten, kan zich vervolgens abonneren op dat Service Bus-onderwerp en de wijzigingen ontvangen en verwerken voor zijn eigen context.

De geproduceerde Service Bus-berichten hebben een SessionId eigenschap. Wanneer u sessies in Service Bus gebruikt, garandeert u dat de volgorde van de berichten behouden blijft (FIFO). Het behouden van de volgorde is noodzakelijk voor deze use case.

Dit is het codefragment waarmee berichten uit de wijzigingenfeed worden verwerkt:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Foutafhandeling

Als er een fout optreedt tijdens het verwerken van de wijzigingen, start de bibliotheek met wijzigingenfeeds het lezen van berichten opnieuw op de positie waar de laatste batch is verwerkt. Als de toepassing bijvoorbeeld 10.000 berichten heeft verwerkt, werkt de toepassing nu aan batch 10.001 tot 10.025 en treedt er een fout op, waarna het werk opnieuw kan worden opgestart en opgehaald op positie 10.001. De bibliotheek houdt automatisch bij wat er is verwerkt via gegevens die zijn opgeslagen in een Leases container in Azure Cosmos DB.

Het is mogelijk dat de service al enkele berichten heeft verzonden die opnieuw worden verwerkt naar Service Bus. Normaal gesproken zou dat scenario leiden tot dubbele berichtverwerking. Zoals eerder vermeld, heeft Service Bus een functie voor dubbele berichtdetectie die u voor dit scenario moet inschakelen. De service controleert of er al een bericht is toegevoegd aan een Service Bus-onderwerp (of wachtrij) op basis van de door de toepassing beheerde MessageId eigenschap van het bericht. Deze eigenschap is ingesteld op het ID gebeurtenisdocument. Als hetzelfde bericht opnieuw naar Service Bus wordt verzonden, negeert de service het en zet het neer.

Housekeeping

In een typische transactionele implementatie van Postvak UIT werkt de service de afgehandelde gebeurtenissen bij en stelt een Processed eigenschap truein op , waarmee wordt aangegeven dat een bericht is gepubliceerd. Dit gedrag kan handmatig worden geïmplementeerd in de handlermethode. In het huidige scenario is dit proces niet nodig. Azure Cosmos DB houdt gebeurtenissen bij die zijn verwerkt met behulp van de wijzigingenfeed (in combinatie met de Leases container).

Als laatste stap moet u af en toe de gebeurtenissen uit de container verwijderen, zodat u alleen de meest recente records/documenten bewaart. Als u periodiek een opschoning wilt uitvoeren, past de implementatie een andere functie van Azure Cosmos DB: Time To Live (TTL) toe op documenten. Azure Cosmos DB kan documenten automatisch verwijderen op basis van een TTL eigenschap die kan worden toegevoegd aan een document: een tijdsduur in seconden. De service controleert voortdurend de container op documenten met een TTL eigenschap. Zodra een document verloopt, wordt het uit de database verwijderd door Azure Cosmos DB.

Wanneer alle onderdelen werken zoals verwacht, worden gebeurtenissen snel verwerkt en gepubliceerd: binnen enkele seconden. Als er een fout optreedt in Azure Cosmos DB, worden gebeurtenissen niet verzonden naar de berichtenbus, omdat zowel het bedrijfsobject als de bijbehorende gebeurtenissen niet kunnen worden opgeslagen in de database. Het enige wat u moet overwegen, is het instellen van een geschikte TTL waarde voor de DomainEvent documenten wanneer de achtergrondmedewerker (wijzigingenfeedprocessor) of de servicebus niet beschikbaar is. In een productieomgeving kunt u het beste een periode van meerdere dagen kiezen. Bijvoorbeeld 10 dagen. Alle betrokken onderdelen hebben dan voldoende tijd om wijzigingen binnen de toepassing te verwerken/publiceren.

Samenvatting

Het patroon Transactional Outbox lost het probleem op van betrouwbaar publicerende domeingebeurtenissen in gedistribueerde systemen. Door de status van het bedrijfsobject en de bijbehorende gebeurtenissen in dezelfde transactionele batch vast te leggen en een achtergrondprocessor als een berichtenrelay te gebruiken, zorgt u ervoor dat andere services, intern of extern, uiteindelijk de informatie ontvangen die ze afhankelijk zijn. Dit voorbeeld is geen traditionele implementatie van het patroon Transactional Outbox. Het maakt gebruik van functies zoals de Wijzigingenfeed van Azure Cosmos DB en Time To Live die het eenvoudig en schoon houden.

Hier volgt een samenvatting van de Azure-onderdelen die in dit scenario worden gebruikt:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Een Visio-bestand van deze architectuur downloaden.

De voordelen van deze oplossing zijn:

  • Betrouwbare berichten en gegarandeerde levering van gebeurtenissen.
  • De volgorde van gebeurtenissen en berichtontduplicatie via Service Bus behouden.
  • U hoeft geen extra Processed eigenschap te onderhouden die aangeeft dat een gebeurtenisdocument is verwerkt.
  • Verwijdering van gebeurtenissen uit Azure Cosmos DB via TTL. Het proces verbruikt geen aanvraageenheden die nodig zijn voor het verwerken van gebruikers-/toepassingsaanvragen. In plaats daarvan wordt gebruikgemaakt van 'leftover'-aanvraageenheden in een achtergrondtaak.
  • Fout-proof verwerking van berichten via ChangeFeedProcessor (of een Azure-functie).
  • Optioneel: Meerdere processors voor wijzigingenfeeds, die elk een eigen aanwijzer in de wijzigingenfeed onderhouden.

Overwegingen

De voorbeeldtoepassing die in dit artikel wordt besproken, laat zien hoe u het transactionele postvak UIT-patroon in Azure kunt implementeren met Azure Cosmos DB en Service Bus. Er zijn ook andere methoden die gebruikmaken van NoSQL-databases. Om ervoor te zorgen dat het bedrijfsobject en de gebeurtenissen betrouwbaar worden opgeslagen in de database, kunt u de lijst met gebeurtenissen insluiten in het document met bedrijfsobjecten. Het nadeel van deze aanpak is dat het opschoonproces elk document met gebeurtenissen moet bijwerken. Dat is niet ideaal, met name wat betreft kosten van aanvraageenheden, in vergelijking met het gebruik van TTL.

Houd er rekening mee dat u niet rekening moet houden met de voorbeeldcode die hier is opgegeven voor productieklare code. Het heeft enkele beperkingen met betrekking tot multithreading, met name de manier waarop gebeurtenissen in de DomainEntity klasse worden verwerkt en hoe objecten worden bijgehouden in de CosmosContainerContext implementaties. Gebruik het als uitgangspunt voor uw eigen implementaties. U kunt ook bestaande bibliotheken gebruiken waarvoor deze functionaliteit al is ingebouwd, zoals NServiceBus of MassTransit.

Dit scenario implementeren

U vindt de broncode, implementatiebestanden en instructies voor het testen van dit scenario op GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Bijdragers

Dit artikel wordt onderhouden door Microsoft. De tekst is oorspronkelijk geschreven door de volgende Inzenders.

Hoofdauteur:

Als u niet-openbare LinkedIn-profielen wilt zien, meldt u zich aan bij LinkedIn.

Volgende stappen

Raadpleeg deze artikelen voor meer informatie: