Provádění běžných operací s klientskou knihovnou služby Event Hubs

Dokončeno

Tato lekce obsahuje příklady běžných operací, které můžete provádět s klientskou knihovnou služby Event Hubs (Azure.Messaging.EventHubs) pro interakci se službou Event Hubs.

Kontrola služby Event Hubs

Mnoho operací služby Event Hubs probíhá v rámci konkrétního oddílu. Vzhledem k tomu, že služba Event Hubs vlastní oddíly, jejich názvy se přiřazují při vytváření. Pokud chcete zjistit, jaké oddíly jsou k dispozici, dotazujete službu Event Hubs pomocí jednoho z klientů služby Event Hubs. Na obrázku je znázorněno EventHubProducerClient v těchto příkladech, ale koncept a forma jsou společné pro klienty.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Publikování událostí do služby Event Hubs

Chcete-li publikovat události, musíte vytvořit .EventHubProducerClient Producenti publikují události v dávkách a mohou požadovat konkrétní oddíl nebo povolit službě Event Hubs rozhodnout, do kterých událostí oddílů se mají publikovat. Doporučujeme použít automatické směrování, když publikování událostí musí být vysoce dostupné nebo kdy se data událostí mají rovnoměrně distribuovat mezi oddíly. Náš příklad využívá automatické směrování.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Čtení událostí ze služby Event Hubs

Pokud chcete číst události ze služby Event Hubs, musíte vytvořit pro danou EventHubConsumerClient skupinu příjemců. Když je služba Event Hubs vytvořená, poskytuje výchozí skupinu příjemců, kterou můžete použít k zahájení zkoumání služby Event Hubs. V našem příkladu se zaměříme na čtení všech událostí publikovaných ve službě Event Hubs pomocí iterátoru.

Poznámka:

Je důležité si uvědomit, že tento přístup k využívání je určený ke zlepšení prostředí pro zkoumání klientské knihovny a vytváření prototypů služby Event Hubs. Doporučuje se, aby se nepoužíval v produkčních scénářích. Pro produkční použití doporučujeme použít klienta procesoru událostí, protože poskytuje robustnější a výkonnější prostředí.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Čtení událostí z oddílu Event Hubs

Pokud chcete číst z konkrétního oddílu, musí příjemce určit, kde v datovém proudu událostí začít přijímat události; v našem příkladu se zaměříme na čtení všech publikovaných událostí pro první oddíl služby Event Hubs.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Zpracování událostí pomocí klienta procesoru událostí

U většiny produkčních scénářů se doporučuje EventProcessorClient použít ke čtení a zpracování událostí. EventProcessorClient Vzhledem k tomu, že má závislost na objektech blob služby Azure Storage pro trvalost jeho stavu, musíte poskytnout BlobContainerClient procesor, který je nakonfigurovaný pro účet úložiště a kontejner, který by se měl použít.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}