EventHubConsumerAsyncClient Class

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventHubConsumerAsyncClient

Implements

public class EventHubConsumerAsyncClient
implements Closeable

An asynchronous consumer responsible for reading EventData from either a specific Event Hub partition or all partitions in the context of a specific consumer group.

The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".

Sample: Creating an EventHubConsumerAsyncClient

The following code sample demonstrates the creation of the asynchronous client EventHubConsumerAsyncClient. The fullyQualifiedNamespace is the Event Hubs Namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The consumerGroup is found by navigating to the Event Hub instance, and selecting "Consumer groups" under the "Entities" panel. The consumerGroup(String consumerGroup) is required for creating consumer clients. The credential used is DefaultAzureCredential because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventHubProducerAsyncClient producer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .buildAsyncProducerClient();

Sample: Consuming events a single partition from Event Hub

The code sample below demonstrates receiving events from partition "0" of an Event Hub starting from latest(). latest() points to the end of the partition stream. The consumer receives events enqueued after it started subscribing for events.

receiveFromPartition(String partitionId, EventPosition startingPosition) is a non-blocking call. After setting up the operation, its async representation is returned. The Flux must be subscribed to, like the sample below, to start receiving events.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         new DefaultAzureCredentialBuilder().build())
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .buildAsyncConsumerClient();

 // Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
 String partitionId = "0";
 EventPosition startingPosition = EventPosition.latest();

 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 //
 // NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
 // operation.  If the program ends after this, or the class is immediately disposed, no events will be
 // received.
 Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
     .subscribe(partitionEvent -> {
         PartitionContext partitionContext = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
         System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
     }, error -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.err.print("An error occurred:" + error);
     }, () -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.out.print("Stream has ended.");
     });

Sample: Including latest partition information in received events

EventData can be decorated with the latest partition information and sent to consumers. Enable this by setting setTrackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties) to true. As events come in, explore the PartitionEvent object. This is useful in scenarios where customers want to constant up-to-date information about their Event Hub. This does take a performance hit as the extra partition information must be sent over the wire with every event.

receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) is a non-blocking call. After setting up the operation, its async representation is returned. The Flux must be subscribed to, like sample below, to start receiving events.

// Set `setTrackLastEnqueuedEventProperties` to true to get the last enqueued information from the partition for
 // each event that is received.
 ReceiveOptions receiveOptions = new ReceiveOptions()
     .setTrackLastEnqueuedEventProperties(true);
 EventPosition startingPosition = EventPosition.earliest();

 // Receives events from partition "0" starting at the beginning of the stream.
 // Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receiveFromPartition("0", startingPosition, receiveOptions)
     .subscribe(partitionEvent -> {
         LastEnqueuedEventProperties properties = partitionEvent.getLastEnqueuedEventProperties();
         System.out.printf("Information received at %s. Last enqueued sequence number: %s%n",
             properties.getRetrievalTime(),
             properties.getSequenceNumber());
     });

Sample: Rate limiting consumption of events from Event Hub

For event consumers that need to limit the number of events they receive at a given time, they can use BaseSubscriber#request(long). Using a custom subscriber allows developers more granular control over the rate at which they receive events.

receiveFromPartition(String partitionId, EventPosition startingPosition) is a non-blocking call. After setting up the operation, its async representation is returned. The Flux must be subscribed to, like the sample below, to start receiving events.

consumer.receiveFromPartition(partitionId, EventPosition.latest()).subscribe(new BaseSubscriber<PartitionEvent>() {
     private static final int NUMBER_OF_EVENTS = 5;
     private final AtomicInteger currentNumberOfEvents = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 events at a time.
         request(NUMBER_OF_EVENTS);
     }

     @Override
     protected void hookOnNext(PartitionEvent value) {
         // Process the EventData

         // If the number of events we have currently received is a multiple of 5, that means we have reached the
         // last event the Publisher will provide to us. Invoking request(long) here, tells the Publisher that
         // the subscriber is ready to get more events from upstream.
         if (currentNumberOfEvents.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_EVENTS);
         }
     }
 });

Sample: Receiving from all partitions

The code sample below demonstrates receiving events from all partitions of an Event Hub starting the beginning of each partition's stream. This is valuable for demo purposes but is not intended for production scenarios. For production scenarios, consider using EventProcessorClient.

receive(boolean startReadingAtEarliestEvent) is a non-blocking call. After setting up the operation, its async representation is returned. The Flux must be subscribed to, like the sample below, to start receiving events.

// Keep a reference to `subscription`. When the program is finished receiving events, call
 // subscription.dispose(). This will stop fetching events from the Event Hub.
 Disposable subscription = consumer.receive(true)
     .subscribe(partitionEvent -> {
         PartitionContext context = partitionEvent.getPartitionContext();
         EventData event = partitionEvent.getData();

         System.out.printf("Event %s is from partition %s%n.", event.getSequenceNumber(),
             context.getPartitionId());
     }, error -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.err.print("An error occurred:" + error);
     }, () -> {
         // This is a terminal signal.  No more events will be received from the same Flux object.
         System.out.print("Stream has ended.");
     });

Method Summary

Modifier and Type Method and Description
void close()

Disposes of the consumer by closing the underlying connection to the service.

String getConsumerGroup()

Gets the consumer group this consumer is reading events as a part of.

String getEventHubName()

Gets the Event Hub name this client interacts with.

Mono<EventHubProperties> getEventHubProperties()

Retrieves information about an Event Hub, including the number of partitions present and their identifiers.

String getFullyQualifiedNamespace()

Gets the fully qualified Event Hubs namespace that the connection is associated with.

String getIdentifier()

Gets the client identifier.

Flux<String> getPartitionIds()

Retrieves the identifiers for the partitions of an Event Hub.

Mono<PartitionProperties> getPartitionProperties(String partitionId)

Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.

Flux<PartitionEvent> receive()

Consumes events from all partitions starting from the beginning of each partition.

Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent)

Consumes events from all partitions.

Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)

Consumes events from all partitions configured with a set of receiveOptions.

Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition)

Consumes events from a single partition starting at startingPosition.

Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)

Consumes events from a single partition starting at startingPosition with a set of ReceiveOptions.

Methods inherited from java.lang.Object

Method Details

close

public void close()

Disposes of the consumer by closing the underlying connection to the service.

getConsumerGroup

public String getConsumerGroup()

Gets the consumer group this consumer is reading events as a part of.

Returns:

The consumer group this consumer is reading events as a part of.

getEventHubName

public String getEventHubName()

Gets the Event Hub name this client interacts with.

Returns:

The Event Hub name this client interacts with.

getEventHubProperties

public Mono getEventHubProperties()

Retrieves information about an Event Hub, including the number of partitions present and their identifiers.

Returns:

The set of information for the Event Hub that this client is associated with.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.

Returns:

The fully qualified Event Hubs namespace that the connection is associated with

getIdentifier

public String getIdentifier()

Gets the client identifier.

Returns:

The unique identifier string for current client.

getPartitionIds

public Flux getPartitionIds()

Retrieves the identifiers for the partitions of an Event Hub.

Returns:

A Flux of identifiers for the partitions of an Event Hub.

getPartitionProperties

public Mono getPartitionProperties(String partitionId)

Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.

Parameters:

partitionId - The unique identifier of a partition associated with the Event Hub.

Returns:

The set of information for the requested partition under the Event Hub this client is associated with.

receive

public Flux receive()

Consumes events from all partitions starting from the beginning of each partition.

This method is not recommended for production use; the EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput. It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a given partition or subset of partitions.

Returns:

A stream of events for every partition in the Event Hub starting from the beginning of each partition.

receive

public Flux receive(boolean startReadingAtEarliestEvent)

Consumes events from all partitions.

This method is not recommended for production use; the EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput. It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a given partition or subset of partitions.

Parameters:

startReadingAtEarliestEvent - true to begin reading at the first events available in each partition; otherwise, reading will begin at the end of each partition seeing only new events as they are published.

Returns:

A stream of events for every partition in the Event Hub.

receive

public Flux receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions)

Consumes events from all partitions configured with a set of receiveOptions.

This method is not recommended for production use; the EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput. It is important to note that this method does not guarantee fairness amongst the partitions. Depending on service communication, there may be a clustering of events per partition and/or there may be a noticeable bias for a given partition or subset of partitions.

  • If receive is invoked where getOwnerLevel() has a value, then Event Hubs service will guarantee only one active consumer exists per partitionId and consumer group combination. This receive operation is sometimes referred to as an "Epoch Consumer".
  • Multiple consumers per partitionId and consumer group combination can be created by not setting getOwnerLevel() when invoking receive operations. This non-exclusive consumer is sometimes referred to as a "Non-Epoch Consumer."

Parameters:

startReadingAtEarliestEvent - true to begin reading at the first events available in each partition; otherwise, reading will begin at the end of each partition seeing only new events as they are published.
receiveOptions - Options when receiving events from each Event Hub partition.

Returns:

A stream of events for every partition in the Event Hub.

receiveFromPartition

public Flux receiveFromPartition(String partitionId, EventPosition startingPosition)

Consumes events from a single partition starting at startingPosition.

Parameters:

partitionId - Identifier of the partition to read events from.
startingPosition - Position within the Event Hub partition to begin consuming events.

Returns:

A stream of events for this partition starting from startingPosition.

receiveFromPartition

public Flux receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions)

Consumes events from a single partition starting at startingPosition with a set of ReceiveOptions.

  • If receive is invoked where getOwnerLevel() has a value, then Event Hubs service will guarantee only one active consumer exists per partitionId and consumer group combination. This receive operation is sometimes referred to as an "Epoch Consumer".
  • Multiple consumers per partitionId and consumer group combination can be created by not setting getOwnerLevel() when invoking receive operations. This non-exclusive consumer is sometimes referred to as a "Non-Epoch Consumer."

Parameters:

partitionId - Identifier of the partition to read events from.
startingPosition - Position within the Event Hub partition to begin consuming events.
receiveOptions - Options when receiving events from the partition.

Returns:

A stream of events for this partition. If a stream for the events was opened before, the same position within that partition is returned. Otherwise, events are read starting from startingPosition.

Applies to