Sdílet prostřednictvím


Klientská knihovna procesoru událostí Azure Event Hubs pro .NET – verze 5.8.1

Azure Event Hubs je vysoce škálovatelná služba publikování a odběru, která může ingestovat miliony událostí za sekundu a streamovat je více příjemcům. To vám umožní zpracovávat a analyzovat obrovské objemy dat produkovaných připojenými zařízeními a aplikacemi. Jakmile služba Event Hubs shromáždí data, můžete je načítat, transformovat a ukládat pomocí libovolného poskytovatele analýz v reálném čase nebo pomocí adaptérů dávkování nebo úložiště. Pokud se chcete o Azure Event Hubs dozvědět více, můžete si projít téma Co je Event Hubs.

Klientská knihovna Event Processor je doplňkem klientské knihovny Azure Event Hubs a poskytuje samostatného klienta pro zpracování událostí robustním, odolným a škálovatelným způsobem, který je vhodný pro většinu produkčních scénářů. Procesor událostí se doporučuje jako kladná implementace vytvořená pomocí objektů blob služby Azure Storage:

  • Čtení a zpracování událostí napříč všemi oddíly centra událostí ve velkém měřítku s odolností vůči přechodným selháním a občasným problémům se sítí.

  • Společné zpracování událostí, kdy více procesorů dynamicky distribuuje a sdílí odpovědnost v kontextu skupiny příjemců, a elegantně spravuje zatížení při přidání a odebrání procesorů ze skupiny.

  • Správa kontrolních bodů a stavu pro zpracování trvalým způsobem s využitím objektů blob služby Azure Storage jako podkladového úložiště dat

Zdrojový kód | Balíček (NuGet) | Referenční dokumentace k | rozhraní API Dokumentace k | produktu Průvodce odstraňováním potíží

Začínáme

Požadavky

  • Předplatné Azure: Pokud chcete používat služby Azure, včetně Azure Event Hubs, budete potřebovat předplatné. Pokud nemáte účet Azure, můžete si při vytváření účtu zaregistrovat bezplatnou zkušební verzi nebo využít výhody předplatného sady Visual Studio.

  • Obor názvů služby Event Hubs s centrem událostí: Pokud chcete pracovat s Azure Event Hubs, musíte mít také k dispozici obor názvů a centrum událostí. Pokud nejste obeznámeni s vytvářením prostředků Azure, možná budete chtít postupovat podle podrobného průvodce pro vytvoření centra událostí pomocí Azure Portal. Najdete tam také podrobné pokyny k vytvoření centra událostí pomocí Azure CLI, Azure PowerShell nebo šablon Azure Resource Manager (ARM).

  • Účet Azure Storage s úložištěm objektů blob: Pokud chcete zachovat kontrolní body a řídit vlastnictví ve službě Azure Storage, musíte mít účet Služby Azure Storage s dostupnými objekty blob. Pokud účty Azure Storage neznáte, možná budete chtít postupovat podle podrobného průvodce vytvořením účtu úložiště pomocí Azure Portal. Najdete tam také podrobné pokyny k vytváření účtů úložiště pomocí Azure CLI, Azure PowerShell nebo šablon Azure Resource Manager (ARM).

  • Kontejner objektů blob služby Azure Storage: Kontrolní body a data vlastnictví ve službě Azure Storage se zapíšou do objektů blob v konkrétním kontejneru. Vyžaduje EventProcessorClient existující kontejner a implicitně ho nevytváří, aby byl chráněn před nechtěnou chybnou konfigurací. Pokud kontejnery Azure Storage neznáte, můžete si projděte dokumentaci ke správě kontejnerů. Tady najdete podrobné pokyny k vytvoření kontejneru pomocí .NET, Azure CLI nebo Azure PowerShell.

  • C# 8.0: Klientská knihovna Azure Event Hubs využívá nové funkce, které byly zavedeny v jazyce C# 8.0. Abyste mohli využít syntaxi jazyka C# 8.0, doporučujeme kompilaci pomocí sady .NET Core SDK 3.0 nebo vyšší s jazykovou verzí nástroje latest.

    Uživatelé sady Visual Studio, kteří chtějí plně využít syntaxi jazyka C# 8.0, budou muset použít sadu Visual Studio 2019 nebo novější. Visual Studio 2019, včetně bezplatné edice Community, si můžete stáhnout tady. Uživatelé sady Visual Studio 2017 můžou využít syntaxi jazyka C# 8 tím, že využijí balíček NuGet Microsoft.Net.Compilers a nastaví jazykovou verzi, i když prostředí pro úpravy nemusí být ideální.

    Knihovnu můžete stále používat s předchozími verzemi jazyka C#, ale budete muset spravovat asynchronní výčtové a asynchronní jednorázové členy ručně a nebudete tak využívat výhod nové syntaxe. Stále můžete cílit na libovolnou verzi architektury, kterou vaše sada .NET Core SDK podporuje, včetně starších verzí .NET Core nebo .NET Framework. Další informace najdete v tématu Určení cílových architektur.

    Důležitá poznámka: Abyste mohli sestavovat nebo spouštět příklady a ukázky bez úprav, je nutné použít C# 11.0. Pokud se rozhodnete ukázky upravit pro jiné jazykové verze, můžete je přesto spustit.

Pokud chcete rychle vytvořit potřebné prostředky v Azure a získat pro ně připojovací řetězce, můžete naši ukázkovou šablonu nasadit kliknutím na:

Nasazení do Azure

Instalace balíčku

Nainstalujte klientskou knihovnu Azure Event Hubs Event Processor pro .NET pomocí NuGetu:

dotnet add package Azure.Messaging.EventHubs.Processor

Ověření klienta

Získání připojovacího řetězce služby Event Hubs

Aby klientská knihovna Event Hubs s centrem událostí komunikovali, musí pochopit, jak se k němu připojit a autorizovat s ním. Nejjednodušším způsobem je použít připojovací řetězec, který se vytvoří automaticky při vytváření oboru názvů služby Event Hubs. Pokud nejste obeznámeni s používáním připojovacích řetězců se službou Event Hubs, možná budete chtít získat připojovací řetězec služby Event Hubs podle podrobného průvodce.

Získání připojovacího řetězce služby Azure Storage

Aby klient procesoru událostí využíval k vytváření kontrolních bodů objekty blob služby Azure Storage, bude muset pochopit, jak se připojit k účtu úložiště a autorizovat ho. Nejjednodušším způsobem je použít připojovací řetězec, který se vygeneruje při vytvoření účtu úložiště. Pokud nejste obeznámeni s autorizací připojovacího řetězce účtu úložiště v Azure, možná budete chtít postupovat podle podrobného průvodce konfigurací připojovacích řetězců služby Azure Storage.

Klíčové koncepty

  • Procesor událostí je konstruktor určený ke správě odpovědností spojených s připojením k danému centru událostí a zpracováním událostí z jednotlivých oddílů v kontextu konkrétní skupiny příjemců. Zpracování událostí načtených z oddílu a zpracování všech chyb, ke kterým dojde, je delegováno procesorem událostí na zadaný kód, což vaší logice umožňuje soustředit se na poskytování obchodní hodnoty, zatímco procesor zpracovává úlohy spojené se čtením událostí, správou oddílů a povolením zachování stavu ve formě kontrolních bodů.

  • Vytváření kontrolních bodů je proces, při kterém čtenáři označí a zachovají svoji pozici pro události, které byly zpracovány pro oddíl. Za vytváření kontrolních bodů zodpovídá příjemce a probíhá u jednotlivých oddílů, obvykle v kontextu konkrétní skupiny příjemců. V případě EventProcessorClientto znamená, že pro kombinaci skupiny příjemců a oddílů musí procesor sledovat svou aktuální pozici ve streamu událostí. Pokud potřebujete další informace, projděte si vytváření kontrolních bodů v dokumentaci k produktu Event Hubs.

    Když se procesor událostí připojí, začne číst události v kontrolním bodě, který byl dříve zachován posledním procesorem daného oddílu v této skupině příjemců, pokud existuje. Vzhledem k tomu, že procesor událostí čte události v oddílu a pracuje s ním, měl by pravidelně vytvářet kontrolní body, které budou podřízenými aplikacemi označit události jako dokončené, a zajistit odolnost v případě selhání procesoru událostí nebo prostředí, které ho hostuje. V případě potřeby je možné znovu zpracovat události, které byly dříve označeny jako "dokončené", zadáním dřívějšího posunu prostřednictvím tohoto procesu vytváření kontrolních bodů.

  • Oddíl je seřazená posloupnost událostí, která se uchovává v centru událostí. Oddíly jsou prostředky organizace dat spojené s paralelismem vyžadovaným příjemci událostí. Azure Event Hubs zajišťuje streamování zpráv prostřednictvím modelu rozděleného příjemce, ve kterém každý příjemce čte pouze určitou podmnožinu nebo oddíl streamu zpráv. Události, které nově přichází, se zařazují na konec této posloupnosti. Počet oddílů se zadává v okamžiku vytvoření centra událostí a není možné ho změnit.

  • Skupina uživatelů je zobrazení celého centra událostí. Skupiny uživatelů umožňují více aplikacím, které využívají, mít samostatné zobrazení streamu událostí a číst datový proud nezávisle vlastním tempem a z vlastní pozice. Na oddílu může být maximálně 5 souběžných čtenářů na skupinu příjemců. Doporučuje se však, aby pro daný oddíl a párování skupin příjemců existoval pouze jeden aktivní příjemce. Každý aktivní čtenář obdrží všechny události ze svého oddílu; Pokud je ve stejném oddílu více čtenářů, obdrží duplicitní události.

Další koncepty a hlubší diskuzi najdete v tématu Funkce služby Event Hubs.

Životnost klienta

Je EventProcessorClient bezpečné ukládat do mezipaměti a používat ji po celou dobu životnosti aplikace, což je osvědčený postup při pravidelném čtení událostí. Klienti zodpovídají za efektivní správu využití sítě, procesoru a paměti a pracují na tom, aby během období nečinnosti udržovali nízké využití. Volání StopProcessingAsync nebo StopProcessing na procesoru je nutné k zajištění správného vyčištění síťových prostředků a jiných nespravovaných objektů.

Bezpečnost vlákna

Zaručujeme, že všechny metody instance klienta jsou bezpečné pro přístup z více vláken a nezávislé na sobě (pokyny). Tím se zajistí, že doporučení opakovaně používat instance klienta je vždy bezpečné, a to i napříč vlákny.

Typy datových modelů, jako EventData jsou a EventDataBatch , nejsou bezpečné pro přístup z více vláken. Neměly by se sdílet mezi vlákny ani používat souběžně s klientskými metodami.

Další koncepty

Možnosti | klienta Obslužné rutiny událostí | Zpracování selhání | Diagnostika | Napodobování (procesor) | Napodobování (typy klientů)

Příklady

Vytvoření klienta Event Processor

EventProcessorClient Vzhledem k tomu, že objekt je z důvodu trvalosti stavu závislý na objektech blob služby Azure Storage, budete muset pro procesor zadat BlobContainerClient hodnotu , která je nakonfigurovaná pro účet úložiště a kontejner, které se mají použít. Kontejner použitý ke konfiguraci EventProcessorClient musí existovat.

Vzhledem k tomu, EventProcessorClient že nemůže zjistit záměr zadat kontejner, který neexistuje, implicitně kontejner nevytvoří. To funguje jako ochrana proti chybně nakonfigurovanému kontejneru, který způsobuje, že podvodný procesor nemůže sdílet vlastnictví a narušuje ostatní procesory ve skupině příjemců.

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

Konfigurace obslužných rutin událostí a chyb

Aby bylo možné použít EventProcessorClient, musí být k dispozici obslužné rutiny pro zpracování událostí a chyb. Tyto obslužné rutiny jsou považovány za samostatné a vývojáři zodpovídají za to, že se budou počítat s výjimkami v kódu obslužné rutiny.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

Spuštění a zastavení zpracování

Jakmile EventProcessorClient bude explicitně spuštěno, provede zpracování na pozadí a bude v tom pokračovat, dokud se explicitně nezastaví. To sice umožňuje kódu aplikace provádět další úlohy, ale zároveň je potřeba zajistit, aby se proces během zpracování neukončil, pokud se neprovádějí žádné jiné úlohy.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

Použití objektu zabezpečení služby Active Directory s klientem Event Processor

Knihovna identit Azure poskytuje podporu ověřování Azure Active Directory, kterou je možné použít pro klientské knihovny Azure, včetně event hubs a Azure Storage.

K použití objektu zabezpečení služby Active Directory se při vytváření klienta Služby Event Hubs zadá jeden z dostupných přihlašovacích údajů z Azure.Identity knihovny. Kromě toho se místo připojovacího řetězce služby Event Hubs zadává plně kvalifikovaný obor názvů služby Event Hubs a název požadovaného centra událostí.

Pokud chcete používat objekt zabezpečení služby Active Directory s kontejnery objektů blob služby Azure Storage, musí se při vytváření klienta úložiště zadat plně kvalifikovaná adresa URL kontejneru. Podrobnosti o platných formátech identifikátorů URI pro přístup ke službě Blob Storage najdete v tématu Pojmenování kontejnerů, objektů blob a metadat a odkazování na nich.

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

Pokud používáte Azure Active Directory se službou Event Hubs, musí mít objekt zabezpečení přiřazenou roli, která umožňuje čtení ze služby Event Hubs, například roli Azure Event Hubs Data Receiver . Další informace o použití autorizace Azure Active Directory se službou Event Hubs najdete v související dokumentaci.

Pokud používáte Azure Active Directory se službou Azure Storage, musí mít objekt zabezpečení přiřazenou roli, která umožňuje přístup ke čtení, zápisu a odstraňování objektů blob, jako Storage Blob Data Contributor je role. Další informace o použití autorizace Active Directory se službou Azure Storage najdete v související dokumentaci a v ukázce autorizace služby Azure Storage.

Řešení potíží

Podrobné informace o řešení potíží najdete v průvodci odstraňováním potíží se službou Event Hubs.

Ošetření výjimek

Výjimky klienta Event Processor

Klient procesoru událostí se snaží být odolný vůči výjimkám a provede potřebné akce, aby pokračoval ve zpracování, pokud to není možné. K tomu není potřeba žádná akce ze strany vývojářů . je nativně součástí chování procesoru.

Aby vývojáři měli možnost kontrolovat a reagovat na výjimky, ke kterým dochází v rámci operací klienta Procesor událostí, objeví se prostřednictvím ProcessError události. Argumenty pro tuto událost nabízejí podrobnosti o výjimce a kontextu, ve kterém byla zjištěna. Vývojáři můžou provádět normální operace s klientem event processoru z této obslužné rutiny události, jako je zastavení nebo restartování v reakci na chyby, ale jinak nemusí ovlivnit chování výjimky procesoru.

Základní příklad implementace obslužné rutiny chyby najdete v ukázce: Obslužné rutiny procesoru událostí.

Výjimky v obslužných rutinách událostí

Vzhledem k tomu, že klient event processoru nemá odpovídající kontext pro pochopení závažnosti výjimek v rámci obslužných rutin událostí, které vývojáři poskytují, nemůže předpokládat, jaké akce by byly přiměřenou reakcí na ně. V důsledku toho jsou vývojáři považováni za zodpovědné za výjimky, ke kterým dochází v rámci obslužných rutin událostí, které poskytují pomocí try/catch bloků a dalších standardních jazykových konstruktorů.

Klient procesoru událostí se nepokusí detekovat výjimky v kódu vývojáře ani je explicitně nezoznačuje. Výsledné chování bude záviset na hostitelském prostředí procesoru a kontextu, ve kterém byla obslužná rutina události volána. Vzhledem k tomu, že se to může v různých scénářích lišit, důrazně doporučujeme, aby vývojáři kódovali obslužné rutiny událostí defenzivně a zohlednili potenciální výjimky.

Protokolování a diagnostika

Klientská knihovna procesoru událostí je plně instrumentovaná pro protokolování informací na různých úrovních podrobností pomocí rozhraní .NET EventSource k vygenerování informací. Protokolování se provádí pro každou operaci a řídí se vzorem označení počátečního bodu operace, její dokončení a případných zjištěných výjimek. Další informace, které můžou nabídnout přehled, se také protokolují v kontextu přidružené operace.

Protokoly klienta procesoru událostí jsou dostupné všem EventListener uživatelům tak, že se přihlásí ke zdroji s názvem Azure-Messaging-EventHubs-Processor-EventProcessorClient nebo se přihlásí ke všem zdrojům, které mají vlastnost AzureEventSource. Pro usnadnění Azure.Core zachytávání protokolů z klientských knihoven Azure nabízí knihovna používaná službou Event Hubs .AzureEventSourceListener Další informace najdete v tématu Zachycení protokolů služby Event Hubs pomocí azureEventSourceListener.

Knihovna Event Processor je také instrumentovaná pro distribuované trasování pomocí Application Insights nebo OpenTelemetry. Další informace najdete v ukázce diagnostiky Azure.Core.

Další kroky

Kromě probíraných scénářů nabízí knihovna procesorů Azure Event Hubs podporu pro další scénáře, které vám pomůžou využít úplnou sadu EventProcessorClientfunkcí nástroje . Klientská knihovna procesoru Event Hubs nabízí projekt ukázek, které slouží jako ilustrace pro běžné scénáře, aby vám pomohla prozkoumat některé z těchto scénářů. Podrobnosti najdete v ukázkách SOUBORU README .

Přispívání

Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com

Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.

Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo se obraťte na opencode@microsoft.com případné další dotazy nebo komentáře.

Další informace najdete v našem průvodci přispívání .

Imprese