EventProcessorHost

Poznámka

Tento článek se týká staré verze sady Azure Event Hubs SDK. Aktuální verzi sady SDK najdete v tématu Vyrovnávání zatížení oddílů napříč několika instancemi vaší aplikace. Informace o migraci kódu na novější verzi sady SDK najdete v těchto průvodcích migrací.

Azure Event Hubs je výkonná služba pro příjem telemetrie, kterou je možné použít ke streamování milionů událostí s nízkými náklady. Tento článek popisuje, jak využívat ingestované události pomocí třídy Event Processor Host (EPH). Inteligentní agent pro spotřebitele, který zjednodušuje správu kontrolních bodů, pronájmu a paralelních čtenářů událostí.

Klíčem ke škálování pro Event Hubs je myšlenka rozdělených konzumentů. Na rozdíl od modelu konkurenčních spotřebitelů umožňuje model rozdělených konzumentů velké škálování tím, že odstraňuje kritický bod náporu a usnadňuje koncový paralelismus.

Scénář zabezpečení domovského systému

Jako příklad scénáře si představte domácí bezpečnostní společnost, která monitoruje 100 000 domácností. Každou minutu získává data z různých senzorů, jako je detektor pohybu, senzor otevřených dveří/oken, detektoru prolomení oken atd., které se instaluje do každého domu. Společnost poskytuje obyvatelům web, na který mohou sledovat aktivitu jejich domova v reálném čase.

Každý senzor předá data do centra událostí. Centrum událostí má nakonfigurovaných 16 oddílů. Na straně uživatele potřebujete mechanismus, který dokáže tyto události číst, konsolidovat (filtrovat, agregovat atd.) a vyčíst agregaci do objektu blob úložiště, který se pak promítá na uživatelsky přívětivou webovou stránku.

Napsání aplikace příjemce

Při navrhování příjemce v distribuovaném prostředí musí scénář splňovat následující požadavky:

  1. Škálování: Vytvořte více konzumentů, přičemž každý příjemce převezme vlastnictví čtení z několika Event Hubs oddílů.
  2. Vyrovnávání zatížení: Dynamicky zvyšte nebo snižte počet konzumentů. Když se například do každého domova přidá nový typ senzoru (například detektor uhlíkových monanů), počet událostí se zvýší. V takovém případě operátor (člověk) zvýší počet instancí spotřebitele. Fond konzumentů pak může znovu vyvážení počtu oddílů, které vlastní, a sdílet tak zatížení s nově přidaní spotřebiteli.
  3. Bezproblémové obnovení při selhání: Pokud příjemce (příjemce A) selže (například virtuální počítač, který je hostitelem konzumenta, náhle dojde k chybě), musí být ostatní spotřebiteli schopni vybrat oddíly vlastněné příjemcem A a pokračovat. Bod pokračování nazývaný kontrolní bod nebo posun by měl být také přesně v okamžiku, kdy příjemce A selhal, nebo mírně před tím.
  4. Používání událostí: Předchozí tři body se sice zabývají s právy pro správu příjemce, ale musí být kód, který události spotřebuje a bude s ním dělat něco užitečného. Můžete ji například agregovat a nahrát do úložiště objektů blob.

Místo vytváření vlastního řešení pro Event Hubs poskytuje tuto funkci prostřednictvím rozhraní IEventProcessor a třídy EventProcessorHost.

IEventProcessor – rozhraní

Zaprvé, využívání aplikací implementuje rozhraní IEventProcessor, které má čtyři metody: OpenAsync, CloseAsync, ProcessErrorAsync a ProcessEventsAsync. Toto rozhraní obsahuje skutečný kód pro používání událostí, které Event Hubs odesílá. Následující kód ukazuje jednoduchou implementaci:

public class SimpleEventProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
       Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
       return Task.CompletedTask;
    }

    public Task OpenAsync(PartitionContext context)
    {
       Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
       return Task.CompletedTask;
     }

    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
       Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
       return Task.CompletedTask;
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
       foreach (var eventData in messages)
       {
          var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
             Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
       }
       return context.CheckpointAsync();
    }
}

Dále vytvořte instanci třídy EventProcessorHost. V závislosti na přetížení se při vytváření instance třídy EventProcessorHost v konstruktoru používají následující parametry:

  • hostName: Název každé instance příjemce. Každá instance třídy EventProcessorHost musí mít jedinečnou hodnotu pro tuto proměnnou ve skupině uživatelů, proto tuto hodnotu nezadáte pevným kódem.
  • eventHubPath: Název centra událostí.
  • consumerGroupName: Event Hubs používá $Default jako název výchozí skupiny uživatelů, ale je vhodné vytvořit skupinu uživatelů pro váš konkrétní aspekt zpracování.
  • eventHubConnectionString: Připojovací řetězec k centru událostí, který lze načíst z Azure Portal. Tento připojovací řetězec by měl mít v centru událostí oprávnění Naslouchat.
  • storageConnectionString: Účet úložiště, který se používá pro interní správu prostředků.

Důležité

  • V účtu úložiště, který se používá jako úložiště kontrolních bodů, nepo povolte funkci softwarového odstranění.
  • Nepoužívejte hierarchické úložiště (Azure Data Lake Storage Gen 2) jako úložiště kontrolních bodů.

Nakonec uživatelé zaregistrují instanci třídy EventProcessorHost do Event Hubs služby. Registrace třídy procesoru událostí v instanci třídy EventProcessorHost zahájí zpracování událostí. Registrace instruuje službu Event Hubs, aby očekávala, že aplikace příjemce bude využívat události z některých svých oddílů, a volat implementační kód IEventProcessor vždy, když předá události, které se mají využívat.

Poznámka

ConsumerGroupName rozlišuje velká a malá písmena. Změny consumerGroupName mohou mít za následek čtení všech oddílů od začátku datového proudu.

Příklad

Představte si například, že na každém virtuálním počítači je 5 virtuálních počítačů vyhrazených pro využívání událostí a jednoduchá konzolová aplikace, která skutečně spotřebovává prostředky. Každá konzolová aplikace pak vytvoří jednu instanci třídy EventProcessorHost a zaregistruje ji Event Hubs službě.

V tomto příkladu scénáře řekněme, že 5 instancím třídy EventProcessorHost je přiděleno 16 oddílů. Některé instance třídy EventProcessorHost mohou vlastnit několik dalších oddílů než jiné. Pro každý oddíl, který instance třídy EventProcessorHost vlastní, vytvoří instanci SimpleEventProcessor třídy . Proto je k dispozici celkem 16 instancí s jednou SimpleEventProcessor přiřazenou k jednotlivým oddílům.

Následující seznam shrnuje tento příklad:

  • 16 Event Hubs oddílů.
  • 5 virtuálních počítače, 1 spotřebitelská aplikace (například Consumer.exe) na každém virtuálním počítače.
  • 5 registrovaných instancí EPH, 1 na každém virtuálním Consumer.exe.
  • 16 SimpleEventProcessor objektů vytvořených 5 instancemi EPH.
  • 1 Consumer.exe aplikace může obsahovat 4 SimpleEventProcessor objekty, protože 1 instance EPH může vlastnit 4 oddíly.

Sledování vlastnictví oddílů

Vlastnictví oddílu k instanci EPH (neboli spotřebiteli) se sleduje prostřednictvím účtu Azure Storage který je k dispozici pro sledování. Sledování můžete vizualizovat jako jednoduchou tabulku následujícím způsobem. Skutečnou implementaci můžete zobrazit prozkoumáním objektů blob v rámci poskytnutého Storage účtu:

Název skupiny uživatelů ID oddílu Název hostitele (vlastník) Doba získání zapůjčení (nebo vlastnictví) Posun v oddílu (kontrolní bod)
$Default 0 Consumer _ VM3 2018-04-15T01:23:45 156
$Default 1 Consumer _ VM4 2018-04-15T01:22:13 734
$Default 2 Consumer _ VM0 2018-04-15T01:22:56 122
:
:
$Default 15 Consumer _ VM3 2018-04-15T01:22:56 976

Tady každý hostitel získá vlastnictví oddílu po určitou dobu (dobu trvání zapůjčení). Pokud dojde k selhání hostitele (virtuální počítač se vypne), platnost zapůjčení vyprší. Jiní hostitelé se pokusí získat vlastnictví oddílu a jeden z hostitelů bude úspěšný. Tento proces resetuje zapůjčení oddílu s novým vlastníkem. Tímto způsobem může z libovolného oddílu v rámci skupiny uživatelů číst jenom jeden čtenář najednou.

Příjem zpráv

Každé volání metody ProcessEventsAsync doručuje kolekci událostí. Za zpracování těchto událostí zodpovídáte vy. Pokud chcete zajistit, aby hostitel procesoru zpracuje alespoň jednou každou zprávu, musíte napsat vlastní kód, který bude dál opakovat. Ale buďte opatrní ohledně chybných zpráv.

Doporučuje se, abyste to dělat poměrně rychle. To znamená, že se co nejvíce zpracovává. Místo toho použijte skupiny uživatelů. Pokud potřebujete zapisovat do úložiště a nějakým způsobem směrovat, je lepší použít dvě skupiny uživatelů a mít dvě implementace třídy IEventProcessor, které se spouštějí samostatně.

V nějakém okamžiku během zpracování můžete chtít sledovat, co jste si přečetli a dokončili. Sledování je důležité, pokud je nutné čtení restartovat, takže se nevrátíte na začátek datového proudu. EventProcessorHost zjednodušuje toto sledování pomocí kontrolních bodů. Kontrolní bod je umístění neboli posun pro daný oddíl v rámci dané skupiny uživatelů, kdy jste spokojeni s tím, že jste zprávy zpracují. Označení kontrolního bodu v třídy EventProcessorHost se provádí voláním metody CheckpointAsync u objektu PartitionContext. Tato operace se provádí v rámci metody ProcessEventsAsync, ale je možné ji provést také v metodě CloseAsync.

Vytváření kontrolních bodů

Metoda CheckpointAsync má dvě přetížení: první, bez parametrů, kontrolní body k nejvyššímu posunu události v kolekci vrácené processEventsAsync. Tento posun je značkou "vysoké vody". Předpokládá, že jste při volání zpracují všechny nedávné události. Pokud tuto metodu použijete tímto způsobem, uvědomte si, že se očekává, že ji budete volat po vrácení kódu pro zpracování dalších událostí. Druhé přetížení umožňuje určit instanci EventData pro kontrolní bod. Tato metoda umožňuje použít pro kontrolní bod jiný typ vodoznaku. Pomocí této vodoznaku můžete implementovat značku "nízké hodnoty vody": byla zpracována nejnižší sekvenční událost, o které jste si jistí. Toto přetížení je k dispozici pro umožnění flexibility správy posunu.

Po provedení kontrolního bodu se soubor JSON s informacemi specifickými pro oddíly (konkrétně posun) zapisovat do účtu úložiště dodaného v konstruktoru do třídy EventProcessorHost. Tento soubor se průběžně aktualizuje. Je velmi důležité uvažovat o kontrolních bodech v kontextu – není důležité kontrolovat každou zprávu. Účet úložiště, který se používá k kontrolních bodům, pravděpodobně tuto zátěž nezvládá, ale důležitější je, že kontrolní body každé události naznačují vzor zasílání zpráv ve frontě, pro který může být fronta služby Service Bus lepší možností než centrum událostí. Myšlenka za Event Hubs je, že získáte doručení "aspoň jednou" ve velkém měřítku. Díky tomu, že podřízené systémy jsou idempotentní, je snadné se zotavit ze selhání nebo restartování, což má za následek vícenásobné přijetí stejných událostí.

Bezpečnostní a procesorové instance vláken

Třída EventProcessorHost je standardně bezpečná pro přístup z více vláken a chová se synchronním způsobem s ohledem na instanci třídy IEventProcessor. Při doručení událostí pro oddíl se v instanci IEventProcessor pro tento oddíl zavolá ProcessEventsAsync a zablokuje další volání ProcessEventsAsync pro oddíl. Následné zprávy a volání metody ProcessEventsAsync se zařadit na pozadí, protože pumpa zpráv dál běží na pozadí v jiných vláknech. Tato bezpečnost vlákna eliminuje potřebu kolekcí bezpečných pro práci s vlákny a výrazně zvyšuje výkon.

Řádně se vypněte.

EventProcessorHost.UnregisterEventProcessorAsync umožňuje čisté vypnutí všech čtenářů oddílů a při vypínání instance třídy EventProcessorHostby se mělo vždy volat . Pokud to neučiníte, může dojít ke zpoždění při spouštění jiných instancí třídy EventProcessorHost z důvodu vypršení platnosti zapůjčení a konfliktů epochy. Správa epochy je podrobně popsána v části Epoch článku.

Správa zapůjčení

Registrace třídy procesoru událostí v instanci třídy EventProcessorHost zahájí zpracování událostí. Instance hostitele získá zapůjčení pro některé oddíly centra událostí, případně je z jiných hostitelských instancí uchopí způsobem, který konverguje na rovnoměrně rozdělené oddíly napříč všemi instancemi hostitele. Pro každý pronajatý oddíl vytvoří instance hostitele instanci poskytnuté třídy procesoru událostí, pak přijímá události z tohoto oddílu a předává je do instance procesoru událostí. S tím, jak se přidávají další instance a je k dispozici další zapůjčení, EventProcessorHost nakonec vyvažuje zatížení mezi všemi spotřebiteli.

Jak jsme vysvětlili dříve, sledovací tabulka výrazně zjednodušuje povahu automatického škálování třídy EventProcessorHost.UnregisterEventProcessorAsync. Při spuštění instance třídy EventProcessorHost získá co nejvíce zapůjčení a začne číst události. Při nejbližším vypršení platnosti zapůjčení se eventProcessorHost pokusí o jejich obnovení umístěním rezervace. Pokud je zapůjčení k dispozici pro prodloužení, procesor pokračuje ve čtení, ale pokud ne, čtečka se zavře a volá se CloseAsync. CloseAsync je vhodné provést jakékoli konečné vyčištění pro tento oddíl.

EventProcessorHost obsahuje vlastnost PartitionManagerOptions. Tato vlastnost umožňuje řídit správu zapůjčení. Před registrací implementace třídy IEventProcessor nastavte tyto možnosti.

Řízení možností hostitele procesoru událostí

Kromě toho jedno přetížení metody RegisterEventProcessorAsync přebírá objekt EventProcessorOptions jako parametr. Tento parametr slouží k řízení chování samotné třídy EventProcessorHost.UnregisterEventProcessorAsync. EventProcessorOptions definuje čtyři vlastnosti a jednu událost:

  • MaxBatchSize:Maximální velikost kolekce, kterou chcete přijmout při vyvolání metody ProcessEventsAsync. Tato velikost není minimální, ale jenom maximální. Pokud je k dispozici méně zpráv, které mají být přijaty, processEventsAsync provede s tolika, kolik bylo k dispozici.
  • PrefetchCount:Hodnota používaná podkladovým kanálem AMQP k určení horního limitu počtu zpráv, které má klient obdržet. Tato hodnota by měla být větší nebo rovna hodnotě MaxBatchSize.
  • InvokeProcessorAfterReceiveTimeout:Pokud je tento parametr pravdivý, volá se processEventsAsync, když dojde k časového limitu základního volání pro příjem událostí v oddílu. Tato metoda je užitečná pro provedení časových akcí během období nečinnosti v oddílu.
  • InitialOffsetProvider:Umožňuje nastavit ukazatel funkce nebo výraz lambda, který se volá, aby poskytoval počáteční posun, když čtenář začne číst oddíl. Bez zadání tohoto posunu začíná čtenář nejstarší událostí, pokud soubor JSON s posunem již nebyl uložen v účtu úložiště poskytnutém konstruktoru EventProcessorHost. Tato metoda je užitečná, když chcete změnit chování spuštění čtečky. Při vyvolání této metody obsahuje parametr objektu ID oddílu, pro který se čtenář spustí.
  • ExceptionReceivedEventArgs:Umožňuje dostávat oznámení o všech základních výjimce, ke kterým dochází v třídy EventProcessorHost. Pokud vše nefunguje podle očekávání, je tato událost dobrým místem, kde začít hledat.

Epocha

Tady je způsob, jak funguje epocha příjmu:

S epochou

Epocha je jedinečný identifikátor (hodnota epochy), který služba používá k vynucení vlastnictví oddílu nebo zapůjčení. Přijímač založený na epochě vytvoříte pomocí metody CreateEpochReceiver. Tato metoda vytvoří přijímač založený na epochě. Příjemce se vytvoří pro konkrétní oddíl centra událostí ze zadané skupiny příjemců.

Funkce epochy poskytuje uživatelům možnost zajistit, aby v libovolném okamžiku byl ve skupině příjemců jen jeden příjemce s následujícími pravidly:

  • Pokud ve skupině příjemců není žádný příjemce, uživatel může vytvořit příjemce s libovolnou epochovou hodnotou.
  • Pokud je příjemce s epochovou hodnotou e1 a vytvoří se nový příjemce s epochovou hodnotou e2, kde e1 <= e2, přijímač s e1 se automaticky odpojí a příjemce s e2 se úspěšně vytvoří.
  • Pokud existuje příjemce s epochovou hodnotou e1 a vytvoří se nový příjemce s epochovou hodnotou e2, kde e1 > e2, vytvoření e2 s chybou s chybou: Příjemce s epochou e1 už existuje.

Žádná epocha

Příjemce bez epochy vytvoříte pomocí metody CreateReceiver.

Při zpracování datových proudů existuje několik scénářů, ve kterých by uživatelé chtěli vytvořit více příjemců v jedné skupině příjemců. Pro podporu takových scénářů máme možnost vytvořit příjemce bez epochy a v tomto případě povolíme až 5 souběžných příjemců ve skupině příjemců.

Smíšený režim

Nedoporučujeme používat aplikaci, když vytvoříte příjemce s epochou a pak ve stejné skupině příjemců přepnete na ne epochu nebo naopak. Pokud ale k tomuto chování dojde, služba ho zpracuje pomocí následujících pravidel:

  • Pokud je příjemce již vytvořen s epochou e1 a aktivně přijímá události a je vytvořen nový příjemce bez epochy, vytvoření nového příjemce selže. Epochové přijímače mají v systému vždy přednost.
  • Pokud byl příjemce již vytvořen s epochou e1 a odpojil se a nový příjemce se vytvoří bez epochy nové třídy MessagingFactory, vytvoření nového příjemce bude úspěšné. Existuje tu upozornění, že náš systém detekuje "odpojení příjemce" po 10 minutách.
  • Pokud je vytvořený jeden nebo více příjemců bez epochy a vytvoří se nový příjemce s epochou e1, odpojí se všichni původní příjemci.

Poznámka

Doporučujeme používat různé skupiny konzumentů pro aplikace, které používají epochy, a pro aplikace, které používají epochy, aby nedocházelo k chybám.

Další kroky

Teď, když jste se seznámili s event processor hostem, si přečtěte následující články, ve které najdete další informace o Event Hubs: