Panoramica dell'API .NET Standard di Hub eventiEvent Hubs .NET Standard API overview

In questo articolo vengono riepilogate alcune delle principali API client .NET Standard di Hub eventi.This article summarizes some of the key Event Hubs .NET Standard client APIs. Esistono attualmente due librerie client .NET Standard:There are currently two .NET Standard client libraries:

Client di Hub eventiEvent Hubs client

EventHubClient è l'oggetto principale da usare per inviare eventi, creare ricevitori e ottenere informazioni di runtime.EventHubClient is the primary object you use to send events, create receivers, and to get run-time information. Questo client è collegato a un determinato hub eventi e crea una nuova connessione all'endpoint di Hub eventi.This client is linked to a particular event hub, and creates a new connection to the Event Hubs endpoint.

Creare un client di Hub eventiCreate an Event Hubs client

Un oggetto EventHubClient viene creato da una stringa di connessione.An EventHubClient object is created from a connection string. Nell'esempio seguente viene illustrato il modo più semplice per creare un'istanza di un nuovo client:The simplest way to instantiate a new client is shown in the following example:

var eventHubClient = EventHubClient.CreateFromConnectionString("{Event Hubs connection string}");

Per modificare la stringa di connessione a livello di codice, è possibile usare la classe EventHubsConnectionStringBuilder e passare la stringa di connessione come parametro a EventHubClient.CreateFromConnectionString.To programmatically edit the connection string, you can use the EventHubsConnectionStringBuilder class, and pass the connection string as a parameter to EventHubClient.CreateFromConnectionString.

var connectionStringBuilder = new EventHubsConnectionStringBuilder("{Event Hubs connection string}")
{
    EntityPath = EhEntityPath
};

var eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

Inviare eventiSend events

Per inviare eventi a un hub eventi, usare la classe EventData.To send events to an event hub, use the EventData class. Il corpo deve essere un array byte o un segmento di array byte.The body must be a byte array, or a byte array segment.

// Create a new EventData object by encoding a string as a byte array
var data = new EventData(Encoding.UTF8.GetBytes("This is my message..."));
// Set user properties if needed
data.Properties.Add("Type", "Informational");
// Send single message async
await eventHubClient.SendAsync(data);

Ricevere eventiReceive events

Il metodo consigliato per ricevere eventi da Hub eventi è tramite EventProcessorHost, che offre funzionalità che permettono di tenere automaticamente traccia dell'offset e delle informazioni di partizione.The recommended way to receive events from Event Hubs is using the Event Processor Host, which provides functionality to automatically keep track of offset, and partition information. Tuttavia, in alcune situazioni per ricevere eventi è preferibile usare la flessibilità della libreria di Hub eventi di base.However, there are certain situations in which you may want to use the flexibility of the core Event Hubs library to receive events.

Creare un ricevitoreCreate a receiver

I ricevitori sono legati a partizioni specifiche, pertanto, per ricevere tutti gli eventi in un hub eventi, è necessario creare più istanze.Receivers are tied to specific partitions, so in order to receive all events in an event hub, you will need to create multiple instances. In generale, è buona norma ottenere le informazioni di partizione a livello di programmazione, anziché impostare come hardcoded gli ID di partizione.Generally speaking, it is a good practice to get the partition information programatically, rather than hard-coding the partition ids. A tale scopo, è possibile usare il metodo GetRuntimeInformationAsync.In order to do so, you can use the GetRuntimeInformationAsync method.

// Create a list to keep track of the receivers
var receivers = new List<PartitionReceiver>();
// Use the eventHubClient created above to get the runtime information
var runTimeInformation = await eventHubClient.GetRuntimeInformationAsync();
// Loop over the resulting partition ids
foreach (var partitionId in runTimeInformation.PartitionIds)
{
    // Create the receiver
    var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, PartitionReceiver.EndOfStream);
    // Add the receiver to the list
    receivers.Add(receiver);
}

Dato che gli eventi non vengono mai rimossi da un hub eventi e possono solo scadere, è necessario specificare il punto di partenza appropriato.Since events are never removed from an event hub (and only expire), you need to specify the proper starting point. L'esempio seguente mostra le combinazioni possibili.The following example shows possible combinations.

// partitionId is assumed to come from GetRuntimeInformationAsync()

// Using the constant PartitionReceiver.EndOfStream only receives all messages from this point forward.
var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, PartitionReceiver.EndOfStream);

// All messages available
var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, "-1");

// From one day ago
var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, DateTime.Now.AddDays(-1));

Usare un eventoConsume an event

// Receive a maximum of 100 messages in this call to ReceiveAsync
var ehEvents = await receiver.ReceiveAsync(100);
// ReceiveAsync can return null if there are no messages
if (ehEvents != null)
{
    // Since ReceiveAsync can return more than a single event you will need a loop to process
    foreach (var ehEvent in ehEvents)
    {
        // Decode the byte array segment
        var message = UnicodeEncoding.UTF8.GetString(ehEvent.Body.Array);
        // Load the custom property that we set in the send example
        var customType = ehEvent.Properties["Type"];
        // Implement processing logic here
    }
}       

API host processore di eventiEvent Processor Host APIs

Queste API garantiscono resilienza ai processi di lavoro che possono diventare non disponibili, distribuendo tuttavia partizioni tra i ruoli di lavoro disponibili.These APIs provide resiliency to worker processes that may become unavailable, by distributing partitions across available workers.

// Checkpointing is done within the SimpleEventProcessor and on a per-consumerGroup per-partition basis, workers resume from where they last left off.

// Read these connection strings from a secure location
var ehConnectionString = "{Event Hubs connection string}";
var ehEntityPath = "{event hub path/name}";
var storageConnectionString = "{Storage connection string}";
var storageContainerName = "{Storage account container name}";

var eventProcessorHost = new EventProcessorHost(
    ehEntityPath,
    PartitionReceiver.DefaultConsumerGroupName,
    ehConnectionString,
    storageConnectionString,
    storageContainerName);

// Start/register an EventProcessorHost
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();

// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();

Di seguito è riportato un esempio di implementazione di IEventProcessor.The following is a sample implementation of the IEventProcessor.

public class SimpleEventProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
        return Task.CompletedTask;
    }

    public Task OpenAsync(PartitionContext context)
    {
        Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
        return Task.CompletedTask;
    }

    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
        Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
        return Task.CompletedTask;
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var eventData in messages)
        {
            var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
        }

        return context.CheckpointAsync();
    }
}

Passaggi successiviNext steps

Per altre informazioni sugli scenari di Hub eventi, visitare i collegamenti seguenti:To learn more about Event Hubs scenarios, visit these links:

I riferimenti API .NET sono qui:The .NET API references are here: