Eseguire operazioni comuni con la libreria client di Hub eventi

Completato

Questa unità contiene esempi di operazioni comuni che è possibile eseguire con la libreria client di Hub eventi (Azure.Messaging.EventHubs) per interagire con un hub eventi.

Esaminare Hub eventi

Molte operazioni di Hub eventi vengono eseguite nell'ambito di una partizione specifica. Poiché le partizioni sono di proprietà di Hub eventi, i nomi vengono assegnati al momento della creazione. Per conoscere le partizioni disponibili, eseguire query su Hhub eventi usando uno dei client di Hub eventi. A scopo illustrativo, in questi esempi viene dimostrato EventHubProducerClient, ma il concetto e la forma sono comuni nei client.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Pubblicare eventi in Hub eventi

Per pubblicare eventi, è necessario creare un oggetto EventHubProducerClient. I producer pubblicano gli eventi in batch e possono richiedere una partizione specifica o consentire al servizio Hub eventi di decidere in quali eventi di partizione pubblicare. È consigliabile usare il routing automatico quando la pubblicazione degli eventi deve essere a disponibilità elevata o quando i dati degli eventi devono essere distribuiti in modo uniforme tra le partizioni. L'esempio usa il routing automatico.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Leggere gli eventi da un Hub eventi

Per leggere gli eventi da un Hub eventi, è necessario creare un oggetto EventHubConsumerClient per un determinato gruppo di consumer. Quando viene creato un Hub eventi, include un gruppo di consumer predefinito che può essere usato per iniziare a esplorare Hub eventi. L'esempio è incentrato sulla lettura di tutti gli eventi pubblicati in Hub eventi usando un iteratore.

Nota

È importante notare che questo approccio all'utilizzo è destinato a migliorare l'esperienza di esplorazione della libreria client e della creazione di prototipi dell'hub eventi. È consigliabile non usarlo negli scenari di produzione. Per l'uso in produzione, è consigliabile usare il client processore di eventi, che offre un'esperienza più affidabile e con migliori prestazioni.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Leggere gli eventi da una partizione di Hub eventi

Per eseguire la lettura da una partizione specifica, il consumer dovrà specificare in quale punto del flusso di eventi iniziare a ricevere gli eventi.Questo esempio illustra lettura di tutti gli eventi pubblicati per la prima partizione di Hub eventi.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Elaborare eventi usando un client processore di eventi

Per la maggior parte degli scenari di produzione, è consigliabile usare l'oggetto EventProcessorClient per la lettura e l'elaborazione degli eventi. Poiché l'oggetto EventProcessorClient ha una dipendenza dai BLOB di Archiviazione di Azure per la persistenza dello stato, sarà necessario specificare un oggetto BlobContainerClient per il processore, configurato per l'account di archiviazione e il contenitore da usare.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}