Model transakční doručené pošty se službou Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

Implementace spolehlivého zasílání zpráv v distribuovaných systémech může být náročná. Tento článek popisuje, jak používat model Transakční pošta k odeslání pro spolehlivé zasílání zpráv a zaručené doručení událostí, důležitou součástí podpůrného zpracování idempotentní zprávy. K tomu použijete transakční dávky služby Azure Cosmos DB a kanál změn v kombinaci se službou Azure Service Bus.

Přehled

Architektury mikroslužeb jsou stále oblíbenější a ukazují slib při řešení problémů, jako je škálovatelnost, udržovatelnost a flexibilita, zejména ve velkých aplikacích. Tento model architektury ale přináší také výzvy, pokud jde o zpracování dat. V distribuovanýchaplikacích Pro podporu takového scénáře obvykle používáte řešení pro zasílání zpráv, jako je RabbitMQ, Kafka nebo Azure Service Bus, které distribuuje data (události) z jedné služby přes sběrnici zasílání zpráv do jiných služeb aplikace. Interní nebo externí spotřebitelé se pak můžou přihlásit k odběru těchto zpráv a dostávat oznámení o změnách hned po manipulaci s daty.

Známým příkladem v této oblasti je systém objednávání: když chce uživatel vytvořit objednávku, Ordering služba přijímá data z klientské aplikace prostřednictvím koncového bodu REST. Namapuje datovou část na interní reprezentaci Order objektu, aby ověřila data. Po úspěšném potvrzení databáze publikuje OrderCreated událost do sběrnice zpráv. Jakákoli jiná služba, která má zájem o nové objednávky (například InventoryInvoicing službu), by se přihlásila k OrderCreated odběru zpráv, zpracovávala je a ukládala do vlastní databáze.

Následující pseudokód ukazuje, jak tento proces obvykle vypadá z Ordering pohledu služby:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Tento přístup funguje dobře, dokud nedojde k chybě mezi uložením objektu objednávky a publikováním odpovídající události. Odesílání události může v tuto chvíli selhat z mnoha důvodů:

  • Chyby sítě
  • Výpadek služby zpráv
  • Selhání hostitele

Ať je chyba jakákoli, výsledkem je, že OrderCreated událost nemůže být publikována do sběrnice zpráv. Ostatní služby nebudou upozorněny na vytvoření objednávky. Služba Ordering se teď musí postarat o různé věci, které nesouvisí se skutečným obchodním procesem. Musí mít přehled o událostech, které je stále potřeba umístit do sběrnice zpráv, jakmile bude zase online. Dokonce i nejhorší případ může nastat: nekonzistence dat v aplikaci kvůli ztraceným událostem.

Diagram that shows event handling without the Transactional Outbox pattern.

Řešení

Existuje dobře známý vzor s názvem Transakční pošta k odeslání, který vám pomůže vyhnout se těmto situacím. Zajišťuje, aby se události ukládaly do úložiště dat (obvykle v tabulce Pošta k odeslání ve vaší databázi), než se nakonec nasdílí do zprostředkovatele zpráv. Pokud jsou obchodní objekt a odpovídající události uloženy ve stejné databázové transakci, je zaručeno, že se neztratí žádná data. Všechno se potvrdí, jinak se všechno vrátí, pokud dojde k chybě. Pokud chcete událost nakonec publikovat, jiná služba nebo pracovní proces dotazuje tabulku Pošta k odeslání na neošetřené položky, publikuje události a označí je jako zpracované. Tento model zajišťuje, že po vytvoření nebo úpravě obchodního objektu nedojde ke ztrátě událostí.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

Stáhněte si soubor aplikace Visio s touto architekturou.

V relační databázi je implementace modelu jednoduchá. Pokud služba například používá Entity Framework Core, použije kontext Entity Framework k vytvoření databázové transakce, uloží obchodní objekt a událost a potvrdí transakci nebo provede vrácení zpět. Pracovní služba, která zpracovává události, se také snadno implementuje: pravidelně dotazuje tabulku Pošta k odeslání pro nové položky, publikuje nově vložené události do sběrnice zpráv a nakonec označí tyto položky jako zpracované.

V praxi nejsou věci tak snadné, jak by se mohly podívat na první pohled. Nejdůležitější je zajistit, aby se pořadí událostí zachovalo, aby OrderUpdated se událost nepublikovala před událostí OrderCreated .

Implementace ve službě Azure Cosmos DB

Tato část ukazuje, jak implementovat model Transakční pošta k odeslání ve službě Azure Cosmos DB, aby bylo možné dosáhnout spolehlivého zasílání zpráv v pořadí mezi různými službami pomocí kanálu změn služby Azure Cosmos DB a služby Service Bus. Ukazuje ukázkovou službu, která spravuje Contact objekty (FirstName, LastName, Email, Company informace atd.). Používá model CQRS (Command and Query Responsibility Segregation) a řídí se základními koncepty návrhu řízeného doménou. Vzorový kód pro implementaci najdete na GitHubu.

Objekt Contact v ukázkové službě má následující strukturu:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Jakmile Contact se vytvoří nebo aktualizuje, vygeneruje události obsahující informace o aktuální změně. Mimo jiné můžou být události domény:

  • ContactCreated. Vyvolá se při přidání kontaktu.
  • ContactNameUpdated. Vyvolání při FirstName změně nebo LastName změně.
  • ContactEmailUpdated. Vyvolá se při aktualizaci e-mailové adresy.
  • ContactCompanyUpdated. Vyvolá se při změně některé z vlastností společnosti.

Transakční dávky

Pokud chcete tento model implementovat, musíte zajistit Contact , aby obchodní objekt a odpovídající události byly uloženy ve stejné databázové transakci. Ve službě Azure Cosmos DB fungují transakce jinak než v relačních databázových systémech. Transakce Azure Cosmos DB, označované jako transakční dávky, pracují s jedním logickým oddílem, takže zaručují atomicitu, konzistenci, izolaci a odolnost (ACID). V transakční dávkové operaci nemůžete uložit dva dokumenty v různých kontejnerech nebo logických oddílech. Pro ukázkovou službu to znamená, že obchodní objekt i událost nebo události budou vloženy do stejného kontejneru a logického oddílu.

Kontext, úložiště a UnitOfWork

Jádrem ukázkové implementace je kontext kontejneru, který sleduje objekty uložené ve stejné transakční dávce. Udržuje seznam vytvořených a upravených objektů a funguje v jednom kontejneru Azure Cosmos DB. Rozhraní pro něj vypadá takto:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Seznam v komponentě kontextu kontejneru sleduje Contact a DomainEvent objekty. Obě budou vloženy do stejného kontejneru. To znamená, že ve stejném kontejneru Azure Cosmos DB je uloženo více typů objektů a k rozlišení mezi obchodním objektem a událostí použijte Type vlastnost.

Pro každý typ je vyhrazené úložiště, které definuje a implementuje přístup k datům. Rozhraní Contact úložiště poskytuje tyto metody:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Úložiště Event vypadá podobně, s výjimkou jediné metody, která v úložišti vytváří nové události:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Implementace obou rozhraní úložiště získávají odkaz prostřednictvím injektáže závislostí do jedné IContainerContext instance, aby se zajistilo, že oba fungují ve stejném kontextu služby Azure Cosmos DB.

Poslední komponenta je UnitOfWork, která potvrdí změny uchovávané v IContainerContext instanci do služby Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Zpracování událostí: Vytvoření a publikování

Při každém vytvoření, změně nebo odstranění objektu Contact (obnovitelné odstranění) služba vyvolá odpovídající událost. Jádrem poskytnutého řešení je kombinace návrhu řízeného doménou (DDD) a mediátora navrženého Jimmy Bogardem. Navrhuje udržování seznamu událostí, ke kterým došlo kvůli změnám objektu domény a publikování těchto událostí před uložením skutečného objektu do databáze.

Seznam změn se uchovává v samotném objektu domény, takže žádná jiná komponenta nemůže měnit řetěz událostí. Chování udržování událostí (IEvent instancí) v objektu domény je definováno prostřednictvím rozhraní IEventEmitter<IEvent> a implementováno v abstraktní DomainEntity třídě:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Objekt Contact vyvolává události domény. Entita Contact se řídí základními koncepty DDD a konfiguruje settery vlastností domény jako soukromé. Ve třídě neexistují žádné veřejné settery. Místo toho nabízí metody pro manipulaci s interním stavem. V těchto metodách mohou být vyvolány vhodné události pro určitou změnu (například ContactNameUpdatedContactEmailUpdated) .

Tady je příklad aktualizací jména kontaktu. (Událost je vyvolána na konci metody.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Odpovídající ContactNameUpdatedEvent, který sleduje změny, vypadá takto:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Zatím se události právě protokolují do objektu domény a nic se neuloží do databáze nebo dokonce publikuje do zprostředkovatele zpráv. Po doporučení se seznam událostí zpracuje přímo před uložením obchodního objektu do úložiště dat. V tomto případě k tomu dochází v SaveChangesAsync metodě IContainerContext instance, která je implementována v privátní RaiseDomainEvents metodě. (dObjs je seznam sledovaných entit kontextu kontejneru.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Na posledním řádku se balíček MediatR , implementace mediátorového vzoru v jazyce C#, používá k publikování události v aplikaci. Je to možné, protože všechny události, jako je ContactNameUpdatedEvent implementace INotification rozhraní balíčku MediatR.

Tyto události je potřeba zpracovat odpovídající obslužnou rutinou. Zde přichází implementace IEventsRepository do hry. Tady je ukázka NameUpdated obslužné rutiny události:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Instance IEventRepository se vloží do třídy obslužné rutiny prostřednictvím konstruktoru. Jakmile ContactNameUpdatedEvent se publikuje ve službě, Handle vyvolá se metoda a použije instanci úložiště událostí k vytvoření objektu oznámení. Tento objekt oznámení se zase vloží do seznamu sledovaných objektů v objektu IContainerContext a spojí objekty uložené ve stejné transakční dávce se službou Azure Cosmos DB.

Zatím kontext kontejneru ví, které objekty se mají zpracovat. Aby se nakonec sledované objekty zachovaly ve službě Azure Cosmos DB, IContainerContext implementace vytvoří transakční dávku, přidá všechny relevantní objekty a spustí operaci s databází. Popsaný proces se zpracovává v SaveInTransactionalBatchAsync metodě, která je vyvolána metodou SaveChangesAsync .

Tady jsou důležité části implementace, kterou potřebujete k vytvoření a spuštění transakční dávky:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Tady je přehled toho, jak proces zatím funguje (pro aktualizaci jména u objektu kontaktu):

  1. Klient chce aktualizovat jméno kontaktu. Metoda SetName je vyvolána u objektu kontaktu a vlastnosti jsou aktualizovány.
  2. Událost ContactNameUpdated se přidá do seznamu událostí v objektu domény.
  3. Vyvolá se metoda úložiště Update kontaktů, která přidá objekt domény do kontextu kontejneru. Objekt je nyní sledován.
  4. CommitAsync je vyvolána v UnitOfWork instanci, která následně volá SaveChangesAsync kontext kontejneru.
  5. V rámci SaveChangesAsync, všechny události v seznamu objektu domény jsou publikovány MediatR instancí a jsou přidány prostřednictvím úložiště událostí do stejného kontextu kontejneru.
  6. V souboru < a0/0SaveChangesAsync> se vytvoří.TransactionalBatch Bude obsahovat objekt kontaktu i událost.
  7. Spuštění TransactionalBatch a data se zapíšou do služby Azure Cosmos DB.
  8. SaveChangesAsync a CommitAsync úspěšně se vrátí.

Uchování

Jak vidíte v předchozích fragmentech kódu, všechny objekty uložené ve službě DataObject Azure Cosmos DB se zabalí do instance. Tento objekt poskytuje společné vlastnosti:

  • ID.
  • PartitionKey.
  • Type.
  • State. Updated Podobně jako Createdv Azure Cosmos DB se neuchová.
  • Etag. Pro optimistické uzamčení.
  • TTL. Vlastnost Time To Live pro automatické vyčištění starých dokumentů
  • Data. Obecný datový objekt.

Tyto vlastnosti jsou definovány v obecném rozhraní, které se volá IDataObject a používá úložiště a kontext kontejneru:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Objekty zabalené v DataObject instanci a uložené do databáze pak budou vypadat jako v této ukázce (Contact a ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Vidíte, že Contact dokumenty a ContactNameUpdatedEvent (typ domainEvent) mají stejný klíč oddílu a že oba dokumenty budou zachovány ve stejném logickém oddílu.

Zpracování kanálu změn

Ke čtení datového proudu událostí a jejich odesílání do zprostředkovatele zpráv bude služba používat kanál změn služby Azure Cosmos DB.

Kanál změn je trvalý protokol změn v kontejneru. Funguje na pozadí a sleduje úpravy. V rámci jednoho logického oddílu je zaručeno pořadí změn. Nejpohodlnější způsob, jak číst kanál změn, je použít funkci Azure s triggerem služby Azure Cosmos DB. Další možností je použít knihovnu procesoru kanálu změn. Umožňuje integrovat zpracování kanálu změn ve webovém rozhraní API jako službu na pozadí (prostřednictvím IHostedService rozhraní). Ukázka zde používá jednoduchou konzolovou aplikaci, která implementuje abstraktní třídu BackgroundService k hostování dlouhotrvajících úloh na pozadí v aplikacích .NET Core.

Pokud chcete přijímat změny z kanálu změn služby Azure Cosmos DB, musíte vytvořit instanci ChangeFeedProcessor objektu, zaregistrovat metodu obslužné rutiny pro zpracování zpráv a začít naslouchat změnám:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Metoda obslužné rutiny (HandleChangesAsync zde) pak zpracuje zprávy. V této ukázce se události publikují do tématu služby Service Bus, které je rozdělené na oddíly pro škálovatelnost a má povolenou funkci odstranění duplicit. Každá služba, která má zájem o Contact změny objektů, se pak může přihlásit k odběru daného tématu služby Service Bus a přijímat a zpracovávat změny pro svůj vlastní kontext.

Vytvořené zprávy služby Service Bus mají SessionId vlastnost. Při použití relací ve službě Service Bus zaručujete, že pořadí zpráv zůstane zachováno (FIFO). Pro tento případ použití je nutné zachovat pořadí.

Tady je fragment kódu, který zpracovává zprávy z kanálu změn:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Zpracování chyb

Pokud během zpracování změn dojde k chybě, knihovna kanálu změn restartuje čtení zpráv na místě, kde úspěšně zpracovala poslední dávku. Pokud například aplikace úspěšně zpracovala 10 000 zpráv, pracuje na dávce 10 001 až 10 025 a dojde k chybě, může restartovat a vyzvednout svou práci na pozici 10 001. Knihovna automaticky sleduje, co bylo zpracováno prostřednictvím informací uložených v kontejneru ve službě Leases Azure Cosmos DB.

Je možné, že služba již odeslala některé zprávy, které se znovu zpracovaly do služby Service Bus. Za normálních okolností by tento scénář vedl ke zpracování duplicitních zpráv. Jak už jsme uvedli dříve, service Bus má funkci pro detekci duplicitních zpráv, kterou je potřeba pro tento scénář povolit. Služba zkontroluje, jestli už byla zpráva přidána do tématu služby Service Bus (nebo fronty) na základě vlastnosti zprávy řízené MessageId aplikací. Tato vlastnost je nastavena na ID dokument události. Pokud se stejná zpráva odešle znovu do služby Service Bus, služba ji ignoruje a zahodí.

Údržba

V typické implementaci Transactional Outbox služba aktualizuje zpracovávané události a nastaví Processed vlastnost na true, která indikuje, že zpráva byla úspěšně publikována. Toto chování lze implementovat ručně v metodě obslužné rutiny. V aktuálním scénáři takový proces není potřeba. Azure Cosmos DB sleduje události zpracovávané pomocí kanálu změn (v kombinaci s kontejnerem Leases ).

V posledním kroku někdy potřebujete události z kontejneru odstranit, abyste zachovali pouze nejnovější záznamy a dokumenty. K pravidelnému vyčištění platí pro implementaci další funkce služby Azure Cosmos DB: Time To Live (TTL) na dokumenty. Azure Cosmos DB může automaticky odstraňovat dokumenty na TTL základě vlastnosti, kterou je možné přidat do dokumentu: časové období v sekundách. Služba bude neustále kontrolovat kontejner dokumentů, které mají TTL vlastnost. Jakmile vyprší platnost dokumentu, Azure Cosmos DB ho z databáze odebere.

Pokud všechny komponenty fungují podle očekávání, události se zpracovávají a publikují rychle: během několika sekund. Pokud ve službě Azure Cosmos DB dojde k chybě, události se do sběrnice zpráv neodesílají, protože obchodní objekt i odpovídající události nelze do databáze uložit. Jedinou věcí, kterou je potřeba vzít v úvahu, je nastavit v dokumentech odpovídající TTL hodnotu DomainEvent , když pracovní proces na pozadí (procesor kanálu změn) nebo service bus není k dispozici. V produkčním prostředí je nejlepší vybrat časový rozsah několika dnů. Například 10 dní. Všechny zúčastněné komponenty pak budou mít dostatek času na zpracování a publikování změn v rámci aplikace.

Shrnutí

Model Transakční pošta k odeslání řeší problém spolehlivého publikování událostí domény v distribuovaných systémech. Potvrzením stavu obchodního objektu a jejích událostí ve stejné transakční dávce a použitím procesoru na pozadí jako předávání zpráv zajistíte, že ostatní služby, interní nebo externí, nakonec obdrží informace, na které závisejí. Tato ukázka není tradiční implementací modelu Transakční pošta k odeslání. Používá funkce, jako je kanál změn služby Azure Cosmos DB a funkce Time To Live, které udržují věci jednoduché a čisté.

Tady je souhrn komponent Azure používaných v tomto scénáři:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

Stáhněte si soubor aplikace Visio s touto architekturou.

Výhody tohoto řešení jsou:

  • Spolehlivé zasílání zpráv a zaručené doručování událostí
  • Zachování pořadí událostí a odstranění duplicit zpráv přes Service Bus
  • Není nutné udržovat další Processed vlastnost, která indikuje úspěšné zpracování dokumentu události.
  • Odstranění událostí ze služby Azure Cosmos DB prostřednictvím hodnoty TTL Tento proces nevyužívají jednotky žádostí potřebné ke zpracování požadavků uživatelů a aplikací. Místo toho v úloze na pozadí používá jednotky žádostí "leftover".
  • Zpracování zpráv s důkazy o chybách prostřednictvím ChangeFeedProcessor funkce (nebo funkce Azure)
  • Volitelné: Více procesorů kanálu změn, z nichž každá udržuje svůj vlastní ukazatel v kanálu změn.

Důležité informace

Ukázková aplikace probíraná v tomto článku ukazuje, jak v Azure implementovat model Transakční pošta k odeslání se službou Azure Cosmos DB a Service Bus. Existují také další přístupy, které používají databáze NoSQL. Chcete-li zaručit, že obchodní objekt a události budou spolehlivě uloženy v databázi, můžete vložit seznam událostí do dokumentu obchodního objektu. Nevýhodou tohoto přístupu je, že proces čištění bude muset aktualizovat každý dokument, který obsahuje události. To není ideální, zejména pokud jde o náklady na jednotku žádosti ve srovnání s použitím hodnoty TTL.

Mějte na paměti, že byste neměli brát v úvahu vzorový kód, který je zde k dispozici pro produkční prostředí. Má určitá omezení týkající se multithreadingu, zejména způsobu zpracování událostí ve DomainEntity třídě a způsobu sledování objektů v CosmosContainerContext implementacích. Použijte ho jako výchozí bod pro vlastní implementace. Případně zvažte použití existujících knihoven, které už mají tuto funkci integrovanou, jako je NServiceBus nebo MassTransit.

Nasazení tohoto scénáře

Zdrojový kód, soubory nasazení a pokyny k otestování tohoto scénáře najdete na GitHubu: https://github.com/mspnp/transactional-outbox-pattern

Přispěvatelé

Tento článek spravuje Microsoft. Původně byla napsána následujícími přispěvateli.

Hlavní autor:

Pokud chcete zobrazit neveřejné profily LinkedIn, přihlaste se na LinkedIn.

Další kroky

Další informace najdete v těchto článcích: