Přihlášení k odběru událostí

Tip

Tento obsah je výňatek z eBooku, architektury mikroslužeb .NET pro kontejnerizované aplikace .NET, které jsou k dispozici na .NET Docs nebo jako zdarma ke stažení PDF, které lze číst offline.

.NET Microservices Architecture for Containerized .NET Applications eBook cover thumbnail.

Prvním krokem při použití sběrnice událostí je přihlášení mikroslužeb k odběru událostí, které chtějí přijímat. Tato funkce by měla být provedena v mikroslužbách přijímače.

Následující jednoduchý kód ukazuje, co každá mikroslužba příjemce potřebuje implementovat při spuštění služby (tj. ve Startup třídě), aby se přihlásila k odběru událostí, které potřebuje. V takovém případě basket-api se mikroslužba musí přihlásit k odběru ProductPriceChangedIntegrationEventOrderStartedIntegrationEvent a zpráv.

Například při přihlášení k odběru ProductPriceChangedIntegrationEvent události, která mikroslužbu košíku informuje o jakýchkoli změnách ceny produktu a umožní uživateli upozornit na změnu, pokud je daný produkt v košíku uživatele.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

Po spuštění tohoto kódu bude mikroslužba odběratele naslouchat prostřednictvím kanálů RabbitMQ. Když přijde jakákoli zpráva typu ProductPriceChangedIntegrationEvent, kód vyvolá obslužnou rutinu události, která je předána do ní a zpracuje událost.

Publikování událostí přes sběrnici událostí

Odesílatel zprávy (mikroslužba původu) nakonec publikuje události integrace s kódem podobným následujícímu příkladu. (Tento přístup je zjednodušený příklad, který nebere v úvahu atomicitu.) Podobný kód byste implementovali vždy, když se událost musí rozšířit do více mikroslužeb, obvykle hned po potvrzení dat nebo transakcí z mikroslužby původu.

Nejprve by se do konstruktoru kontroleru vložil objekt implementace sběrnice událostí (založený na RabbitMQ nebo na základě sběrnice Service Bus), jak je znázorněno v následujícím kódu:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Pak ho použijete z metod kontroleru, například v metodě UpdateProduct:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

V tomto případě, protože původní mikroslužba je jednoduchá mikroslužba CRUD, tento kód se umístí přímo do kontroleru webového rozhraní API.

V pokročilejších mikroslužbách, jako je použití přístupů CQRS, je možné ji implementovat ve CommandHandler třídě v rámci Handle() metody.

Návrh atomicity a odolnosti při publikování do sběrnice událostí

Když publikujete události integrace prostřednictvím distribuovaného systému zasílání zpráv, jako je například sběrnice událostí, máte problém s atomicky aktualizací původní databáze a publikováním události (to znamená, že obě operace jsou dokončené nebo žádné z nich). Například ve zjednodušeném příkladu uvedeném dříve kód potvrdí data do databáze při změně ceny produktu a pak publikuje zprávu ProductPriceChangedIntegrationEvent. Na začátku může vypadat zásadní, že tyto dvě operace se provádějí atomicky. Pokud však používáte distribuovanou transakci zahrnující databázi a zprostředkovatele zpráv, stejně jako ve starších systémech, jako je Microsoft Message Queuing (MSMQ), tento přístup se nedoporučuje z důvodů popsaných teorémem CAP.

V podstatě používáte mikroslužby k vytváření škálovatelných a vysoce dostupných systémů. Teorém CAP poněkud zjednodušuje, že nemůžete sestavit (distribuovanou) databázi (nebo mikroslužbu, která vlastní svůj model), která je neustále dostupná, silně konzistentní a tolerantní k žádnému oddílu. Musíte zvolit dvě z těchto tří vlastností.

V architekturách založených na mikroslužbách byste měli zvolit dostupnost a toleranci a měli byste zdůraznit silnou konzistenci. Proto ve většině moderních aplikací založených na mikroslužbách obvykle nechcete používat distribuované transakce v zasílání zpráv, stejně jako při implementaci distribuovaných transakcí založených na systému Windows Distributed Transaction Coordinator (DTC) s MSMQ.

Vraťme se k počátečnímu problému a jeho příkladu. Pokud se služba po aktualizaci databáze chybově ukončí (v tomto případě hned po řádku kódu s _context.SaveChangesAsync()), ale před publikováním integrační události může být celkový systém nekonzistentní. Tento přístup může být důležitý pro chod firmy v závislosti na konkrétní obchodní operaci, se kterou pracujete.

Jak už bylo zmíněno dříve v části architektura, můžete mít několik přístupů k řešení tohoto problému:

  • Použití úplného vzoru Event Sourcing

  • Použití dolování transakčních protokolů.

  • Použití vzoru Pošta k odeslání Jedná se o transakční tabulku pro ukládání událostí integrace (rozšíření místní transakce).

V tomto scénáři je použití úplného vzoru Event Sourcing (ES) jedním z nejlepších přístupů, pokud ne nejlepších. V mnoha scénářích aplikací ale možná nebudete moct implementovat úplný systém ES. ES znamená ukládání pouze událostí domény do transakční databáze místo ukládání dat o aktuálním stavu. Ukládání pouze událostí domény může mít skvělé výhody, například dostupnost historie systému a možnost určit stav systému v libovolném okamžiku v minulosti. Implementace celého systému ES však vyžaduje, abyste většinu systému upravili a zavedli mnoho dalších složitostí a požadavků. Například byste chtěli použít databázi určenou speciálně pro model Event Sourcing, jako je úložiště událostí, nebo databáze orientované na dokumenty, jako je Azure Cosmos DB, MongoDB, Cassandra, CouchDB nebo RavenDB. ES je pro tento problém skvělý přístup, ale není to nejjednodušší řešení, pokud ještě neznáte model Event Sourcing.

Možnost použít dolování transakčních protokolů zpočátku vypadá transparentně. Pokud ale chcete tento přístup použít, musí být mikroslužba svázána s transakčním protokolem RDBMS, jako je transakční protokol SQL Serveru. Tento přístup pravděpodobně není žádoucí. Další nevýhodou je, že aktualizace nízké úrovně zaznamenané v transakčním protokolu nemusí být na stejné úrovni jako události integrace vysoké úrovně. Pokud ano, může být proces zpětné analýzy těchto operací transakčního protokolu obtížné.

Vyvážený přístup je kombinací tabulky transakční databáze a zjednodušeného vzoru ES. Můžete použít stav, například "připraveno k publikování události", který jste nastavili v původní události při potvrzení do tabulky událostí integrace. Pak se pokusíte událost publikovat do sběrnice událostí. Pokud akce publikování-událost proběhne úspěšně, spustíte ve službě původu další transakci a přesunete stav z "připraveno k publikování události" na událost, která je již publikovaná.

Pokud akce události publikování ve sběrnici událostí selže, data stále nebudou v rámci mikroslužby původu nekonzistentní – stále se označí jako "připravená k publikování události", a pokud jde o zbytek služeb, bude nakonec konzistentní. Vždy můžete mít úlohy na pozadí, které kontrolují stav transakcí nebo událostí integrace. Pokud úloha najde událost ve stavu Připraveno k publikování události, může se pokusit tuto událost znovu publikovat do sběrnice událostí.

Všimněte si, že při tomto přístupu zachováváte pouze události integrace pro každou původní mikroslužbu a pouze události, které chcete komunikovat s jinými mikroslužbami nebo externími systémy. Naproti tomu v plném systému ES ukládáte také všechny události domény.

Tento vyvážený přístup je proto zjednodušený systém ES. Potřebujete seznam integračních událostí s aktuálním stavem (připraveno k publikování a publikování). Tyto stavy ale potřebujete implementovat pouze pro události integrace. V tomto přístupu nemusíte ukládat všechna data domény jako události v transakční databázi, stejně jako byste to udělali v plném systému ES.

Pokud už používáte relační databázi, můžete k ukládání událostí integrace použít transakční tabulku. K dosažení atomicity v aplikaci použijete dvoustupňový proces založený na místních transakcích. V podstatě máte tabulku IntegrationEvent ve stejné databázi, ve které máte entity domény. Tato tabulka funguje jako pojištění pro dosažení atomicity, takže zahrnete trvalé integrační události do stejných transakcí, které potvrdí data vaší domény.

Krok za krokem tento proces vypadá takto:

  1. Aplikace zahájí transakci místní databáze.

  2. Potom aktualizuje stav entit vaší domény a vloží událost do tabulky událostí integrace.

  3. Nakonec potvrdí transakci, takže získáte požadovanou atomicitu a pak

  4. Událost publikujete nějak (příště).

Při implementaci kroků publikování událostí máte tyto možnosti:

  • Událost integrace publikujte hned po potvrzení transakce a pomocí jiné místní transakce označte události v tabulce jako publikované. Pak tabulku použijte stejně jako artefakt ke sledování událostí integrace v případě problémů ve vzdálených mikroslužbách a provádění kompenzačních akcí na základě uložených integračních událostí.

  • Tabulku použijte jako druh fronty. Samostatné vlákno aplikace nebo proces dotazuje tabulku událostí integrace, publikuje události do sběrnice událostí a pak pomocí místní transakce označí události jako publikované.

Obrázek 6–22 znázorňuje architekturu pro první z těchto přístupů.

Diagram of atomicity when publishing without a worker microservice.

Obrázek 6–22 Atomicita při publikování událostí do sběrnice událostí

Na obrázku 6–22 chybí další mikroslužba pracovního procesu, která má na starosti kontrolu a potvrzení úspěšnosti publikovaných integračních událostí. V případě selhání může další mikroslužba kontrolního procesu číst události z tabulky a znovu je publikovat, to znamená opakovat krok 2.

O druhém přístupu: Použijete tabulku EventLog jako frontu a k publikování zpráv vždy použijete pracovní mikroslužbu. V takovém případě se tento proces podobá tomu, co je znázorněno na obrázku 6–23. Zobrazí se další mikroslužba a tabulka je jediným zdrojem při publikování událostí.

Diagram of atomicity when publishing with a worker microservice.

Obrázek 6–23 Atomicita při publikování událostí do sběrnice událostí s mikroslužbou pracovního procesu

Pro zjednodušení používá ukázka eShopOnContainers první přístup (bez dalších procesů nebo kontrolních mikroslužeb) a sběrnici událostí. Ukázka eShopOnContainers ale nezpracovává všechny možné případy selhání. Ve skutečné aplikaci nasazené do cloudu musíte přijmout skutečnost, že k problémům dojde nakonec, a musíte implementovat tuto logiku kontroly a opětovného odeslání. Použití tabulky jako fronty může být efektivnější než první přístup, pokud máte tuto tabulku jako jediný zdroj událostí při jejich publikování (pomocí pracovního procesu) prostřednictvím sběrnice událostí.

Implementace atomicity při publikování událostí integrace přes sběrnici událostí

Následující kód ukazuje, jak můžete vytvořit jednu transakci zahrnující více objektů DbContext – jeden kontext související s původními daty, které se aktualizují, a druhý kontext související s tabulkou IntegrationEventLog.

Transakce v níže uvedeném ukázkovém kódu nebude odolná, pokud připojení k databázi mají v době, kdy je kód spuštěný, problém. K tomu může dojít v cloudových systémech, jako je Azure SQL DB, které můžou přesouvat databáze mezi servery. Informace o implementaci odolných transakcí napříč několika kontexty najdete v části Implementace odolných připojení Entity Framework Core SQL dále v této příručce.

V následujícím příkladu je vidět celý proces v jediné části kódu. Implementace eShopOnContainers je však refaktorována a rozděluje tuto logiku do více tříd, takže je jednodušší udržovat.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

Po vytvoření události integrace ProductPriceChangedIntegrationEvent transakce, která ukládá původní operaci domény (aktualizovat položku katalogu), zahrnuje také trvalost události v tabulce EventLog. Díky tomu se jedná o jednu transakci a vždy budete moct zkontrolovat, jestli se odesílaly zprávy událostí.

Tabulka protokolu událostí se aktualizuje atomicky pomocí původní databázové operace pomocí místní transakce se stejnou databází. Pokud některá z operací selže, vyvolá se výjimka a transakce vrátí zpět všechny dokončené operace, čímž se zachová konzistence mezi operacemi domény a zprávami událostí uloženými v tabulce.

Příjem zpráv z odběrů: obslužné rutiny událostí v mikroslužbách příjemce

Kromě logiky odběru událostí musíte implementovat interní kód pro obslužné rutiny událostí integrace (jako je metoda zpětného volání). Obslužná rutina události je místem, kde určíte, kde se budou přijímat a zpracovávat zprávy událostí určitého typu.

Obslužná rutina události nejprve obdrží instanci události ze sběrnice událostí. Potom vyhledá komponentu, která se má zpracovat v souvislosti s danou událostí integrace, šířením a zachováním události jako změny stavu v mikroslužbě příjemce. Pokud například událost ProductPriceChanged pochází z mikroslužby katalogu, zpracovává se v mikroslužbě košíku a mění stav v této mikroslužbě košíku příjemce, jak je znázorněno v následujícím kódu.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Obslužná rutina události musí ověřit, zda produkt existuje v některé z instancí košíku. Aktualizuje také cenu položky pro každou související položku řádku košíku. Nakonec vytvoří upozornění, které se uživateli zobrazí o změně ceny, jak je znázorněno na obrázku 6–24.

Screenshot of a browser showing the price change notification on the user cart.

Obrázek 6–24 Zobrazení změny ceny položky v košíku podle událostí integrace

Idempotence v událostech aktualizační zprávy

Důležitým aspektem událostí aktualizačních zpráv je to, že selhání v jakémkoli okamžiku komunikace by mělo způsobit opakování zprávy. Jinak se úloha na pozadí může pokusit publikovat událost, která už byla publikovaná, a vytvořit podmínku časování. Ujistěte se, že aktualizace jsou buď idempotentní, nebo že poskytují dostatek informací, abyste zajistili, že můžete zjistit duplicitní, zahodit a odeslat zpět pouze jednu odpověď.

Jak jsme uvedli dříve, idempotence znamená, že operaci je možné provést několikrát, aniž by došlo ke změně výsledku. V prostředí zasílání zpráv je událost jako při komunikaci událostí idempotentní, pokud ji lze doručit vícekrát, aniž by se změnil výsledek mikroslužby příjemce. To může být nezbytné z důvodu povahy samotné události nebo kvůli způsobu, jakým systém zpracovává událost. Idempotence zpráv je důležitá v každé aplikaci, která používá zasílání zpráv, nejen v aplikacích, které implementují vzor sběrnice událostí.

Příkladem idempotentní operace je příkaz SQL, který vkládá data do tabulky pouze v případě, že tato data ještě nejsou v tabulce. Nezáleží na tom, kolikrát spustíte příkaz SQL. výsledek bude stejný – tabulka bude obsahovat tato data. Idempotence, jako je tato, může být také nezbytná při práci se zprávami, pokud by mohly být zprávy odeslány, a proto zpracovávat více než jednou. Pokud například logika opakování způsobí, že odesílatel odešle přesně stejnou zprávu více než jednou, musíte se ujistit, že je idempotentní.

Je možné navrhnout idempotentní zprávy. Můžete například vytvořit událost s textem "set the product price to $25" místo "add $5 to the product price" (Přidat $5 do ceny produktu). První zprávu můžete bezpečně zpracovat libovolný počet a výsledek bude stejný. To neplatí pro druhou zprávu. Ale i v prvním případě možná nebudete chtít zpracovat první událost, protože systém mohl také odeslat novější událost změny ceny a vy byste přepsali novou cenu.

Dalším příkladem může být událost dokončená objednávky, která se rozšíří na více odběratelů. Aplikace musí zajistit, aby se informace o objednávce aktualizovaly v jiných systémech pouze jednou, a to i v případě, že pro stejnou událost dokončení objednávky došlo k duplicitním událostem zprávy.

Pro každou událost je vhodné mít určitý druh identity, abyste mohli vytvořit logiku, která vynucuje, že každá událost se zpracuje pouze jednou na příjemce.

Zpracování některých zpráv je ze své podstaty idempotentní. Pokud například systém generuje miniatury obrázků, nemusí být důležité, kolikrát se zpráva o vygenerované miniaturě zpracuje; výsledkem je, že se miniatury vygenerují a jsou vždy stejné. Na druhou stranu nemusí být operace, jako je volání platební brány k účtování poplatků za platební kartu, vůbec idempotentní. V těchto případech je potřeba zajistit, aby zpracování zprávy několikrát mělo vliv, který očekáváte.

Další materiály

Odstranění duplicitních dat zpráv událostí integrace

Můžete se ujistit, že se události zpráv odesílají a zpracovávají pouze jednou pro předplatitele na různých úrovních. Jedním ze způsobů je použití funkce odstranění duplicitních dat, kterou nabízí infrastruktura zasílání zpráv, kterou používáte. Další je implementace vlastní logiky v cílové mikroslužbě. Nejlepší volbou je ověření na úrovni přenosu i na úrovni aplikace.

Odstranění duplicitních událostí zpráv na úrovni obslužné rutiny událostí

Jedním ze způsobů, jak zajistit, aby událost byla zpracována pouze jednou jakýmkoli příjemcem, je implementace určité logiky při zpracování událostí událostí v obslužných rutinách událostí. To je například přístup použitý v aplikaci eShopOnContainers, jak můžete vidět ve zdrojovém kódu UserCheckoutAcceptedIntegrationEventHandler třídy , když obdrží UserCheckoutAcceptedIntegrationEvent událost integrace. (V tomto případě CreateOrderCommand je zabalen pomocí IdentifiedCommandeventMsg.RequestId identifikátoru , před odesláním do obslužné rutiny příkazu).

Odstranění duplicit zpráv při použití RabbitMQ

Pokud dojde k přerušovaným selháním sítě, můžou se zprávy duplikovat a příjemce zprávy musí být připravený ke zpracování těchto duplicitních zpráv. Pokud je to možné, příjemci by měli zpracovávat zprávy idempotentním způsobem, což je lepší než jejich explicitní zpracování pomocí odstranění duplicitních dat.

Podle dokumentace RabbitMQ", "Pokud je zpráva doručena spotřebiteli a pak se znovu odešle do fronty (protože nebylo potvrzeno před ukončením připojení příjemce, například) pak RabbitMQ nastaví znovuečervený příznak na něj, když se znovu doručí (ať už stejnému spotřebiteli nebo jinému).

Pokud je nastaven příznak "znovu" musí příjemce vzít v úvahu, protože zpráva již mohla být zpracována. Ale to není zaručeno; zpráva možná nikdy nedosáhla příjemce po tom, co opustila zprostředkovatele zpráv, možná kvůli problémům se sítí. Na druhou stranu platí, že pokud není nastaven příznak "redelivered", je zaručeno, že zpráva nebyla odeslána více než jednou. Příjemce proto musí zprávy odstraněním duplicit nebo zpracovávat zprávy idempotentním způsobem pouze v případě, že je ve zprávě nastaven příznak "redelivered".

Další materiály