libreria client del processore di eventi Hub eventi di Azure per .NET - versione 5.8.1

Hub eventi di Azure è un servizio pubblica-sottoscrizione altamente scalabile che può inserire milioni di eventi al secondo e trasmetterli a più consumer. In questo modo è possibile elaborare e analizzare le grandi quantità di dati prodotti dai dispositivi e dalle applicazioni connesse. Dopo aver raccolto i dati, è possibile recuperare, trasformare e archiviarlo usando qualsiasi provider di analisi in tempo reale o con adattatori di batch/archiviazione. Per altre informazioni su Hub eventi di Azure, è possibile esaminare: Informazioni su Hub eventi.

La libreria client del processore di eventi è una soluzione complementare alla libreria client Hub eventi di Azure, fornendo un client autonomo per l'utilizzo di eventi in modo affidabile, durevole e scalabile adatto per la maggior parte degli scenari di produzione. Un'implementazione opinioneata compilata usando i BLOB di Archiviazione di Azure, è consigliabile che Il processore di eventi sia consigliato per:

  • Lettura ed elaborazione di eventi in tutte le partizioni di un hub eventi su larga scala con resilienza a errori temporanei e problemi di rete intermittenti.

  • Elaborazione di eventi in modo cooperativo, in cui più processori distribuiscono dinamicamente e condividono la responsabilità nel contesto di un gruppo di consumer, gestendo in modo uniforme il carico come processori vengono aggiunti e rimossi dal gruppo.

  • Gestione dei checkpoint e dello stato per l'elaborazione in modo durevole usando i BLOB di Archiviazione di Azure come archivio dati sottostante.

Codice | sorgente Pacchetto (NuGet) | Documentazione di | riferimento sulle API Documentazione | del prodotto Guida alla risoluzione dei problemi

Introduzione

Prerequisiti

  • Sottoscrizione di Azure: Per usare i servizi di Azure, inclusi Hub eventi di Azure, è necessaria una sottoscrizione. Se non si dispone di un account Azure esistente, è possibile iscriversi per una versione di valutazione gratuita o usare i vantaggi della sottoscrizione di Visual Studio quando si crea un account.

  • Spazio dei nomi hub eventi con un hub eventi: Per interagire con Hub eventi di Azure, è anche necessario disporre di uno spazio dei nomi e dell'hub eventi. Se non si ha familiarità con la creazione di risorse di Azure, è possibile seguire la guida dettagliata per la creazione di un hub eventi usando l'portale di Azure. È anche possibile trovare istruzioni dettagliate per l'uso dell'interfaccia della riga di comando di Azure, Azure PowerShell o modelli di Azure Resource Manager (ARM) per creare un hub eventi.

  • Account di archiviazione di Azure con archiviazione BLOB: Per rendere persistenti i checkpoint e la proprietà in Archiviazione di Azure, è necessario disporre di un account di archiviazione di Azure con BLOB disponibili. Se non si ha familiarità con gli account di Archiviazione di Azure, è possibile seguire la guida dettagliata per la creazione di un account di archiviazione usando l'portale di Azure. È anche possibile trovare istruzioni dettagliate per l'uso dell'interfaccia della riga di comando di Azure, Azure PowerShell o modelli di Azure Resource Manager (ARM) per creare account di archiviazione.

  • Contenitore BLOB di Archiviazione di Azure: I dati di checkpoint e proprietà in Archiviazione di Azure verranno scritti in BLOB in un contenitore specifico. Richiede EventProcessorClient un contenitore esistente e non creerà in modo implicito uno per proteggere la configurazione accidentale. Se non si ha familiarità con i contenitori di Archiviazione di Azure, è possibile fare riferimento alla documentazione relativa alla gestione dei contenitori. È possibile trovare istruzioni dettagliate per l'uso di .NET, l'interfaccia della riga di comando di Azure o Azure PowerShell per creare un contenitore.

  • C# 8.0: La libreria client Hub eventi di Azure usa nuove funzionalità introdotte in C# 8.0. Per sfruttare la sintassi C# 8.0, è consigliabile compilare usando .NET Core SDK 3.0 o versione successiva con una versione del linguaggio di latest.

    Gli utenti di Visual Studio che desiderano sfruttare al meglio la sintassi C# 8.0 dovranno usare Visual Studio 2019 o versione successiva. Visual Studio 2019, inclusa l'edizione Community gratuita, può essere scaricato qui. Gli utenti di Visual Studio 2017 possono sfruttare la sintassi C# 8 usando il pacchetto NuGet Microsoft.Net.Compilers e impostando la versione del linguaggio, anche se l'esperienza di modifica potrebbe non essere ideale.

    È comunque possibile usare la libreria con le versioni precedenti del linguaggio C#, ma sarà necessario gestire i membri enumerabili asincroni e asincroni manualmente anziché trarre vantaggio dalla nuova sintassi. È comunque possibile usare qualsiasi versione del framework supportata da .NET Core SDK, incluse le versioni precedenti di .NET Core o .NET Framework. Per altre informazioni, vedere: come specificare i framework di destinazione.

    Nota importante: Per compilare o eseguire gli esempi e gli esempi senza modifiche, è necessario usare C# 11.0. È comunque possibile eseguire gli esempi se si decide di modificarli per altre versioni della lingua.

Per creare rapidamente le risorse necessarie in Azure e per ricevere le stringhe di connessione, è possibile distribuire il modello di esempio facendo clic su:

Distribuisci in Azure

Installare il pacchetto

Installare la libreria client del processore di eventi Hub eventi di Azure per .NET usando NuGet:

dotnet add package Azure.Messaging.EventHubs.Processor

Autenticare il client

Ottenere una stringa di connessione di Hub eventi

Per la libreria client di Hub eventi per interagire con un hub eventi, sarà necessario comprendere come connettersi e autorizzarlo. Il modo più semplice per farlo consiste nell'usare una stringa di connessione, creata automaticamente durante la creazione di uno spazio dei nomi di Hub eventi. Se non si ha familiarità con l'uso delle stringhe di connessione con Hub eventi, è possibile seguire la guida dettagliata per ottenere una stringa di connessione di Hub eventi.

Ottenere una stringa di connessione di Archiviazione di Azure

Per consentire al client del processore eventi di usare i BLOB di Archiviazione di Azure per il checkpoint, sarà necessario comprendere come connettersi a un account di archiviazione e autorizzarlo. Il metodo più semplice di questa operazione consiste nell'usare una stringa di connessione, generata al momento della creazione dell'account di archiviazione. Se non si ha familiarità con l'autorizzazione della stringa di connessione dell'account di archiviazione in Azure, è possibile seguire la guida dettagliata per configurare le stringhe di connessione di Archiviazione di Azure.

Concetti chiave

  • Un processore di eventi è un costrutto destinato a gestire le responsabilità associate alla connessione a un determinato Hub eventi ed elabora gli eventi da ognuna delle relative partizioni, nel contesto di un gruppo di consumer specifico. L'atto di elaborazione degli eventi letti dalla partizione e la gestione di eventuali errori che si verificano vengono delegati dal processore di eventi al codice fornito, consentendo alla logica di concentrarsi sulla distribuzione del valore aziendale mentre il processore gestisce le attività associate alla lettura degli eventi, la gestione delle partizioni e l'inserimento dello stato in modo permanente sotto forma di checkpoint.

  • Il checkpoint è un processo in base al quale i lettori contrassegnano e mantengono la loro posizione per gli eventi elaborati per una partizione. Il checkpoint è la responsabilità del consumer e si verifica in una partizione, in genere nel contesto di un gruppo di consumer specifico. EventProcessorClientPer , ciò significa che, per una combinazione di gruppi di consumer e partizione, il processore deve tenere traccia della posizione corrente nel flusso di eventi. Per altre informazioni, fare riferimento al checkpoint nella documentazione del prodotto Hub eventi.

    Quando un processore di eventi si connette, inizierà a leggere gli eventi nel checkpoint precedentemente persistente dall'ultimo processore di tale partizione nel gruppo consumer, se presente. Poiché un processore di eventi legge e agisce sugli eventi nella partizione, deve creare periodicamente checkpoint per contrassegnare gli eventi come "completi" da applicazioni downstream e per fornire resilienza deve essere un processore di eventi o l'ambiente che lo ospita ha esito negativo. Se necessario, è possibile rielaborare gli eventi contrassegnati in precedenza come "completi" specificando un offset precedente tramite questo processo di checkpoint.

  • Una partizione è una sequenza ordinata di eventi che viene mantenuta in un hub eventi. Le partizioni sono un mezzo di organizzazione dei dati associato al parallelismo richiesto dai consumer di eventi. Hub eventi di Azure fornisce il flusso dei messaggi tramite un modello di consumer partizionato in cui ogni consumer legge solo un subset specifico, o partizione, del flusso di messaggi. Man mano che arrivano, i nuovi eventi vengono aggiunti alla fine di questa sequenza. Il numero di partizioni viene specificato al momento della creazione di un hub eventi e non può essere modificato.

  • Un gruppo di consumer è una visualizzazione di un intero hub eventi. I gruppi di consumer consentono a più applicazioni consumer di avere ognuna una vista separata del flusso di eventi e di leggere il flusso in modo indipendente, in base al proprio ritmo e dalla propria posizione. In una partizione possono esistere al massimo cinque lettori simultanei per gruppo di consumer. È tuttavia consigliabile che sia presente un solo consumer attivo in una specifica partizione e associazione di gruppi di consumer. Ogni lettore attivo riceve tutti gli eventi dalla partizione; se sono presenti più lettori nella stessa partizione, riceveranno eventi duplicati.

Per altri concetti e discussioni più approfondite, vedere: Funzionalità di Hub eventi.

Durata client

L'oggetto EventProcessorClient è sicuro per memorizzare nella cache e usare come singleton per la durata dell'applicazione, che è la procedura consigliata quando gli eventi vengono letti regolarmente. I client sono responsabili della gestione efficiente della rete, della CPU e dell'uso della memoria, lavorando per mantenere l'utilizzo basso durante periodi di inattività. La chiamata StopProcessingAsync o StopProcessing sul processore è necessaria per assicurarsi che le risorse di rete e altri oggetti non gestiti siano puliti correttamente.

Thread safety

Si garantisce che tutti i metodi di istanza client siano thread-safe e indipendenti tra loro (linee guida). Ciò garantisce che la raccomandazione di riutilizzare le istanze client sia sempre sicura, anche tra thread.

I tipi di modello di dati, ad EventData esempio e EventDataBatch non sono thread-safe. Non devono essere condivisi tra thread né usati simultaneamente con i metodi client.

Concetti aggiuntivi

Opzioni | client Gestori eventi | Gestione degli errori | Diagnostica | Simulazione (processore) | Simulazione (tipi di client)

Esempio

Creazione di un client del processore di eventi

Poiché l'oggetto EventProcessorClient ha una dipendenza dai BLOB di Archiviazione di Azure per la persistenza dello stato, sarà necessario specificare un oggetto BlobContainerClient per il processore, configurato per l'account di archiviazione e il contenitore da usare. Il contenitore usato per configurare l'oggetto EventProcessorClient deve esistere.

Poiché non EventProcessorClient ha modo di conoscere la finalità di specificare un contenitore che non esiste, non creerà in modo implicito il contenitore. Ciò funge da protezione da un contenitore non configurato in modo errato che causa un processore non autorizzato in grado di condividere la proprietà e interferire con altri processori nel gruppo di consumer.

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

Configurare gli eventi e i gestori degli errori

Per usare , i gestori per l'elaborazione EventProcessorClientdegli eventi e gli errori devono essere forniti. Questi gestori sono considerati autonomi e gli sviluppatori sono responsabili della garanzia che le eccezioni all'interno del codice del gestore siano considerate autonome.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

Avviare e arrestare l'elaborazione

L'oggetto EventProcessorClient eseguirà l'elaborazione in background dopo aver avviato in modo esplicito e continuare a farlo fino a quando non è stato arrestato in modo esplicito. Anche se ciò consente al codice dell'applicazione di eseguire altre attività, la responsabilità di garantire che il processo non venga terminato durante l'elaborazione se non sono presenti altre attività eseguite.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

Uso di un'entità Active Directory con il client del processore di eventi

La libreria di identità di Azure offre il supporto di autenticazione di Azure Active Directory che può essere usato per le librerie client di Azure, tra cui Hub eventi e Archiviazione di Azure.

Per usare un'entità Active Directory, viene specificata una delle credenziali disponibili dalla libreria durante la creazione del Azure.Identity client hub eventi. Inoltre, lo spazio dei nomi hub eventi completo e il nome dell'hub eventi desiderato vengono forniti al posto della stringa di connessione di Hub eventi.

Per usare un'entità Active Directory con contenitori BLOB di Archiviazione di Azure, è necessario specificare l'URL completo del contenitore durante la creazione del client di archiviazione. I dettagli sui formati URI validi per l'accesso all'archiviazione BLOB possono essere trovati in Contenitori di denominazione e riferimento a contenitori, BLOB e metadati validi.

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

Quando si usa Azure Active Directory con Hub eventi, l'entità deve essere assegnata a un ruolo che consente la lettura da Hub eventi, ad esempio il Azure Event Hubs Data Receiver ruolo. Per altre informazioni sull'uso dell'autorizzazione di Azure Active Directory con Hub eventi, vedere la documentazione associata.

Quando si usa Azure Active Directory con Archiviazione di Azure, l'entità deve essere assegnata a un ruolo che consente la lettura, la scrittura e l'eliminazione dell'accesso ai BLOB, ad esempio il Storage Blob Data Contributor ruolo. Per altre informazioni sull'uso dell'autorizzazione di Active Directory con Archiviazione di Azure, vedere la documentazione associata e l'esempio di autorizzazione di Archiviazione di Azure.

Risoluzione dei problemi

Per informazioni dettagliate sulla risoluzione dei problemi, vedere la Guida alla risoluzione dei problemi di Hub eventi.

Gestione delle eccezioni

Eccezioni client del processore di eventi

Il client del processore di eventi esegue ogni tentativo di essere resiliente rispetto alle eccezioni e eseguirà le azioni necessarie per continuare l'elaborazione, a meno che non sia impossibile farlo. Non è necessaria alcuna azione da parte degli sviluppatori per farlo; fa parte in modo nativo del comportamento del processore.

Per consentire agli sviluppatori di controllare e reagire alle eccezioni che si verificano all'interno delle operazioni client del processore di eventi, vengono visualizzate tramite l'evento ProcessError . Gli argomenti per questo evento offrono informazioni dettagliate sull'eccezione e sul contesto in cui è stato osservato. Gli sviluppatori possono eseguire operazioni normali sul client del processore di eventi dall'interno di questo gestore eventi, ad esempio l'arresto e/o il riavvio in risposta agli errori, ma potrebbero non influire in caso contrario sul comportamento dell'eccezione del processore.

Per un esempio di base dell'implementazione del gestore degli errori, vedere l'esempio: Gestori processore eventi.

Eccezioni nei gestori eventi

Poiché il client del processore di eventi non ha il contesto appropriato per comprendere la gravità delle eccezioni all'interno dei gestori eventi forniti dagli sviluppatori, non può presupporre quali azioni sarebbero una risposta ragionevole a loro. Di conseguenza, gli sviluppatori sono considerati responsabili delle eccezioni che si verificano all'interno dei gestori eventi che forniscono blocchi try/catch e altri costrutti di linguaggio standard.

Il client del processore di eventi non tenterà di rilevare eccezioni nel codice per sviluppatori né di visualizzarle in modo esplicito. Il comportamento risultante dipenderà dall'ambiente di hosting del processore e dal contesto in cui è stato chiamato il gestore eventi. Poiché questo può variare tra diversi scenari, è consigliabile che gli sviluppatori codicino i gestori eventi in modo difensivo e account per potenziali eccezioni.

Registrazione e diagnostica

La libreria client del processore di eventi è completamente instrumentata per la registrazione delle informazioni a vari livelli di dettaglio usando .NET EventSource per generare informazioni. La registrazione viene eseguita per ogni operazione e segue il modello di contrassegnare il punto iniziale dell'operazione, il completamento e eventuali eccezioni rilevate. Altre informazioni che possono offrire informazioni dettagliate vengono registrate anche nel contesto dell'operazione associata.

I log client del processore di eventi sono disponibili per qualsiasi EventListener opzione optando per l'origine denominata "Azure-Messaging-EventHubs-Processor-EventProcessorClient" o optando su tutte le origini con il tratto "AzureEventSource". Per semplificare l'acquisizione dei log dalle librerie client di Azure, la Azure.Core libreria usata da Hub eventi offre un AzureEventSourceListeneroggetto . Altre informazioni sono disponibili in Acquisizione dei log di Hub eventi usando AzureEventSourceListener.

La libreria del processore di eventi viene inoltre instrumentata per la traccia distribuita usando Application Insights o OpenTelemetry. Altre informazioni sono disponibili nell'esempio di Diagnostica di Azure.Core.

Passaggi successivi

Oltre agli scenari descritti, la libreria processore di Hub eventi di Azure offre supporto per scenari aggiuntivi per sfruttare il set completo di funzionalità di EventProcessorClient. Per esplorare alcuni di questi scenari, la libreria client del processore di Hub eventi offre un progetto di esempi da usare come illustrazione per scenari comuni. Per informazioni dettagliate, vedere gli esempi README .

Contributo

In questo progetto sono benvenuti i contributi e i suggerimenti. Per la maggior parte dei contenuti è necessario sottoscrivere un contratto di licenza di collaborazione (CLA, Contributor License Agreement) che stabilisce che l'utente ha il diritto di concedere, e di fatto concede a Microsoft i diritti d'uso del suo contributo. Per informazioni dettagliate, vedere https://cla.microsoft.com.

Quando si invia una richiesta pull, un bot CLA determina automaticamente se è necessario specificare un contratto CLA e completare la richiesta pull in modo appropriato (ad esempio con un'etichetta e un commento). Seguire le istruzioni specificate dal bot. È sufficiente eseguire questa operazione una sola volta per tutti i repository che usano il contratto CLA Microsoft.

Questo progetto ha adottato il Codice di comportamento di Microsoft per l'open source. Per altre informazioni, vedere Code of Conduct FAQ (Domande frequenti sul Codice di comportamento Open Source di Microsoft) oppure contattare opencode@microsoft.com per eventuali altre domande o commenti.

Per altre informazioni, vedere la guida ai contributi .

Impression