EventProcessorClientBuilder Class

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventProcessorClientBuilder

Implements

public class EventProcessorClientBuilder
implements TokenCredentialTrait<EventProcessorClientBuilder>, AzureNamedKeyCredentialTrait<EventProcessorClientBuilder>, ConnectionStringTrait<EventProcessorClientBuilder>, AzureSasCredentialTrait<EventProcessorClientBuilder>, AmqpTrait<EventProcessorClientBuilder>, ConfigurationTrait<EventProcessorClientBuilder>

This class provides a fluent builder API to help aid the configuration and instantiation of the EventProcessorClient. Calling buildEventProcessorClient() constructs a new instance of EventProcessorClient.

To create an instance of EventProcessorClient, the following fields are required:

The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".

Sample: Construct an EventProcessorClient

The following code sample demonstrates the creation of the processor client. The processor client is recommended for production scenarios because it can load balance between multiple running instances, can perform checkpointing, and reconnects on transient failures such as network outages. The sample below uses an in-memory CheckpointStore but azure-messaging-eventhubs-checkpointstore-blob provides a checkpoint store backed by Azure Blob Storage.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup("<< CONSUMER GROUP NAME >>")
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .checkpointStore(new SampleCheckpointStore())
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .buildEventProcessorClient();

Field Summary

Modifier and Type Field and Description
static final Duration DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL

Default load balancing update interval.

static final Duration DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL

Default ownership expiration.

Constructor Summary

Constructor Description
EventProcessorClientBuilder()

Creates a new instance of EventProcessorClientBuilder.

Method Summary

Modifier and Type Method and Description
EventProcessorClient buildEventProcessorClient()

This will create a new EventProcessorClient configured with the options set in this builder.

EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)

Sets the CheckpointStore the EventProcessorClient will use for storing partition ownership and checkpoint information.

EventProcessorClientBuilder clientOptions(ClientOptions clientOptions)

Sets the client options for the processor client.

EventProcessorClientBuilder configuration(Configuration configuration)

Sets the configuration store that is used during construction of the service client.

EventProcessorClientBuilder connectionString(String connectionString)

Sets the credential information given a connection string to the Event Hub instance.

EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)

Sets the credential information given a connection string to the Event Hubs namespace and name to a specific Event Hub instance.

EventProcessorClientBuilder consumerGroup(String consumerGroup)

Sets the consumer group name from which the EventProcessorClient should consume events.

EventProcessorClientBuilder credential(AzureNamedKeyCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

EventProcessorClientBuilder credential(AzureSasCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

EventProcessorClientBuilder credential(TokenCredential credential)

Sets the TokenCredential used to authorize requests sent to the service.

EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

EventProcessorClientBuilder customEndpointAddress(String customEndpointAddress)

Sets a custom endpoint address when connecting to the Event Hubs service.

EventProcessorClientBuilder eventHubName(String eventHubName)

Sets the name of the Event Hub to connect the client to.

EventProcessorClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)

Sets the fully qualified name for the Event Hubs namespace.

EventProcessorClientBuilder initialPartitionEventPosition(Map<String,EventPosition> initialPartitionEventPosition)

Sets the map containing the event position to use for each partition if a checkpoint for the partition does not exist in CheckpointStore.

EventProcessorClientBuilder initialPartitionEventPosition(Function<String,EventPosition> initialEventPositionProvider)

Sets the default starting position for each partition if a checkpoint for that partition does not exist in the CheckpointStore.

EventProcessorClientBuilder loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)

The LoadBalancingStrategy the EventProcessorClient will use for claiming partition ownership.

EventProcessorClientBuilder loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)

The time interval between load balancing update cycles.

EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)

The time duration after which the ownership of partition expires if it's not renewed by the owning processor instance.

EventProcessorClientBuilder prefetchCount(int prefetchCount)

Sets the count used by the receivers to control the number of events each consumer will actively receive and queue locally without regard to whether a receive operation is currently active.

EventProcessorClientBuilder processError(Consumer<ErrorContext> processError)

The function that is called when an error occurs while processing events.

EventProcessorClientBuilder processEvent(Consumer<EventContext> processEvent)

The function that is called for each event received by this EventProcessorClient.

EventProcessorClientBuilder processEvent(Consumer<EventContext> processEvent, Duration maxWaitTime)

The function that is called for each event received by this EventProcessorClient.

EventProcessorClientBuilder processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize)

The function that is called for each event received by this EventProcessorClient.

EventProcessorClientBuilder processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime)

The function that is called for each event received by this EventProcessorClient.

EventProcessorClientBuilder processPartitionClose(Consumer<CloseContext> closePartition)

The function that is called when a processing for a partition stops.

EventProcessorClientBuilder processPartitionInitialization(Consumer<InitializationContext> initializePartition)

The function that is called before processing starts for a partition.

EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)

Sets the proxy configuration to use for EventHubAsyncClient.

EventProcessorClientBuilder retry(AmqpRetryOptions retryOptions)

Deprecated

Sets the retry policy for EventHubAsyncClient.

EventProcessorClientBuilder retryOptions(AmqpRetryOptions retryOptions)

Sets the retry policy for EventHubAsyncClient.

EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)

Sets whether or not the event processor should request information on the last enqueued event on its associated partition, and track that information as events are received.

EventProcessorClientBuilder transportType(AmqpTransportType transport)

Sets the transport type by which all the communication with Azure Event Hubs occurs.

Methods inherited from java.lang.Object

Field Details

DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL

public static final Duration DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL

Default load balancing update interval. Balancing interval should account for latency between the client and the storage account.

DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL

public static final Duration DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL

Default ownership expiration.

Constructor Details

EventProcessorClientBuilder

public EventProcessorClientBuilder()

Creates a new instance of EventProcessorClientBuilder.

Method Details

buildEventProcessorClient

public EventProcessorClient buildEventProcessorClient()

This will create a new EventProcessorClient configured with the options set in this builder. Each call to this method will return a new instance of EventProcessorClient.

All partitions processed by this EventProcessorClient will start processing from earliest() available event in the respective partitions.

Returns:

A new instance of EventProcessorClient.

checkpointStore

public EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)

Sets the CheckpointStore the EventProcessorClient will use for storing partition ownership and checkpoint information.

Users can, optionally, provide their own implementation of CheckpointStore which will store ownership and checkpoint information.

Parameters:

checkpointStore - Implementation of CheckpointStore.

Returns:

The updated EventProcessorClientBuilder instance.

clientOptions

public EventProcessorClientBuilder clientOptions(ClientOptions clientOptions)

Sets the client options for the processor client. The application id set on the client options will be used for tracing. The headers set on ClientOptions are currently not used but can be used in later releases to add to AMQP message.

Parameters:

clientOptions - The client options.

Returns:

The updated EventProcessorClientBuilder object.

configuration

public EventProcessorClientBuilder configuration(Configuration configuration)

Sets the configuration store that is used during construction of the service client. If not specified, the default configuration store is used to configure the EventHubAsyncClient. Use NONE to bypass using configuration settings during construction.

Parameters:

configuration - The configuration store used to configure the EventHubAsyncClient.

Returns:

The updated EventProcessorClientBuilder object.

connectionString

public EventProcessorClientBuilder connectionString(String connectionString)

Sets the credential information given a connection string to the Event Hub instance.

If the connection string is copied from the Event Hubs namespace, it will likely not contain the name to the desired Event Hub, which is needed. In this case, the name can be added manually by adding "EntityPath=EVENT_HUB_NAME" to the end of the connection string. For example, "EntityPath=telemetry-hub".

If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that Event Hub will result in a connection string that contains the name.

Parameters:

connectionString - The connection string to use for connecting to the Event Hub instance. It is expected that the Event Hub name and the shared access key properties are contained in this connection string.

Returns:

The updated EventProcessorClientBuilder object.

connectionString

public EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)

Sets the credential information given a connection string to the Event Hubs namespace and name to a specific Event Hub instance.

Parameters:

connectionString - The connection string to use for connecting to the Event Hubs namespace; it is expected that the shared access key properties are contained in this connection string, but not the Event Hub name.
eventHubName - The name of the Event Hub to connect the client to.

Returns:

The updated EventProcessorClientBuilder object.

consumerGroup

public EventProcessorClientBuilder consumerGroup(String consumerGroup)

Sets the consumer group name from which the EventProcessorClient should consume events.

Parameters:

consumerGroup - The consumer group name this EventProcessorClient should consume events.

Returns:

The updated EventProcessorClientBuilder instance.

credential

public EventProcessorClientBuilder credential(AzureNamedKeyCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

Parameters:

credential - The shared access name and key credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.

Returns:

The updated EventProcessorClientBuilder object.

credential

public EventProcessorClientBuilder credential(AzureSasCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

Parameters:

credential - The shared access signature credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.

Returns:

The updated EventProcessorClientBuilder object.

credential

public EventProcessorClientBuilder credential(TokenCredential credential)

Sets the TokenCredential used to authorize requests sent to the service. Refer to the Azure SDK for Java identity and authentication documentation for more details on proper usage of the TokenCredential type.

Parameters:

credential - The token credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.

Returns:

The updated EventProcessorClientBuilder object.

credential

public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

Parameters:

fullyQualifiedNamespace - The fully qualified name for the Event Hubs namespace. This is likely to be similar to "{your-namespace}.servicebus.windows.net".
eventHubName - The name of the Event Hub to connect the client to.
credential - The shared access name and key credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.

Returns:

The updated EventProcessorClientBuilder object.

credential

public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

Parameters:

fullyQualifiedNamespace - The fully qualified name for the Event Hubs namespace. This is likely to be similar to "{your-namespace}.servicebus.windows.net".
eventHubName - The name of the Event Hub to connect the client to.
credential - The shared access signature credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.

Returns:

The updated EventProcessorClientBuilder object.

credential

public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)

Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.

Parameters:

fullyQualifiedNamespace - The fully qualified name for the Event Hubs namespace. This is likely to be similar to "{your-namespace}.servicebus.windows.net".
eventHubName - The name of the Event Hub to connect the client to.
credential - The token credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.

Returns:

The updated EventProcessorClientBuilder object.

customEndpointAddress

public EventProcessorClientBuilder customEndpointAddress(String customEndpointAddress)

Sets a custom endpoint address when connecting to the Event Hubs service. This can be useful when your network does not allow connecting to the standard Azure Event Hubs endpoint address, but does allow connecting through an intermediary. For example: https://my.custom.endpoint.com:55300.

If no port is specified, the default port for the transportType(AmqpTransportType transport) is used.

Parameters:

customEndpointAddress - The custom endpoint address.

Returns:

The updated EventProcessorClientBuilder object.

eventHubName

public EventProcessorClientBuilder eventHubName(String eventHubName)

Sets the name of the Event Hub to connect the client to.

Parameters:

eventHubName - The name of the Event Hub to connect the client to.

Returns:

The updated EventProcessorClientBuilder object.

fullyQualifiedNamespace

public EventProcessorClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)

Sets the fully qualified name for the Event Hubs namespace.

Parameters:

fullyQualifiedNamespace - The fully qualified name for the Event Hubs namespace. This is likely to be similar to "{your-namespace}.servicebus.windows.net".

Returns:

The updated EventProcessorClientBuilder object.

initialPartitionEventPosition

public EventProcessorClientBuilder initialPartitionEventPosition(Map initialPartitionEventPosition)

Sets the map containing the event position to use for each partition if a checkpoint for the partition does not exist in CheckpointStore. This map is keyed off of the partition id.

Only one overload of initialPartitionEventPosition should be used when constructing an EventProcessorClient.

Parameters:

initialPartitionEventPosition - Map of initial event positions for partition ids.

Returns:

The updated EventProcessorClientBuilder instance.

initialPartitionEventPosition

public EventProcessorClientBuilder initialPartitionEventPosition(Function initialEventPositionProvider)

Sets the default starting position for each partition if a checkpoint for that partition does not exist in the CheckpointStore.

Only one overload of initialPartitionEventPosition should be used when constructing an EventProcessorClient.

Parameters:

initialEventPositionProvider - Function that maps the given partitionId to an EventPosition.

Returns:

The updated EventProcessorClientBuilder instance.

loadBalancingStrategy

public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)

The LoadBalancingStrategy the EventProcessorClient will use for claiming partition ownership. By default, a BALANCED approach will be used.

Parameters:

loadBalancingStrategy - The LoadBalancingStrategy to use.

Returns:

The updated EventProcessorClientBuilder instance.

loadBalancingUpdateInterval

public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)

The time interval between load balancing update cycles. This is also generally the interval at which ownership of partitions are renewed. By default, this interval is set to 10 seconds.

Parameters:

loadBalancingUpdateInterval - The time duration between load balancing update cycles.

Returns:

The updated EventProcessorClientBuilder instance.

partitionOwnershipExpirationInterval

public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)

The time duration after which the ownership of partition expires if it's not renewed by the owning processor instance. This is the duration that this processor instance will wait before taking over the ownership of partitions previously owned by an inactive processor. By default, this duration is set to a minute.

Parameters:

partitionOwnershipExpirationInterval - The time duration after which the ownership of partition expires.

Returns:

The updated EventProcessorClientBuilder instance.

prefetchCount

public EventProcessorClientBuilder prefetchCount(int prefetchCount)

Sets the count used by the receivers to control the number of events each consumer will actively receive and queue locally without regard to whether a receive operation is currently active.

Parameters:

prefetchCount - The number of events to queue locally.

Returns:

The updated EventHubClientBuilder object.

processError

public EventProcessorClientBuilder processError(Consumer processError)

The function that is called when an error occurs while processing events. The input contains the partition information where the error happened.

Parameters:

processError - The callback that's called when an error occurs while processing events.

Returns:

The updated EventProcessorClientBuilder instance.

processEvent

public EventProcessorClientBuilder processEvent(Consumer processEvent)

The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data.

Parameters:

processEvent - The callback that's called when an event is received by this EventProcessorClient.

Returns:

The updated EventProcessorClientBuilder instance.

processEvent

public EventProcessorClientBuilder processEvent(Consumer processEvent, Duration maxWaitTime)

The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.

Parameters:

processEvent - The callback that's called when an event is received by this EventProcessorClient or when the max wait duration has expired.
maxWaitTime - The max time duration to wait to receive an event before invoking this handler.

Returns:

The updated EventProcessorClient instance.

processEventBatch

public EventProcessorClientBuilder processEventBatch(Consumer processEventBatch, int maxBatchSize)

The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.

Parameters:

processEventBatch - The callback that's called when an event is received by this EventProcessorClient or when the max wait duration has expired.
maxBatchSize - The maximum number of events that will be in the list when this callback is invoked.

Returns:

The updated EventProcessorClient instance.

processEventBatch

public EventProcessorClientBuilder processEventBatch(Consumer processEventBatch, int maxBatchSize, Duration maxWaitTime)

The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .checkpointStore(new SampleCheckpointStore())
     .processEventBatch(eventBatchContext -> {
         eventBatchContext.getEvents().forEach(eventData -> {
             System.out.printf("Partition id = %s and sequence number of event = %s%n",
                 eventBatchContext.getPartitionContext().getPartitionId(),
                 eventData.getSequenceNumber());
         });
     }, 50, Duration.ofSeconds(30))
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .buildEventProcessorClient();

Parameters:

processEventBatch - The callback that's called when an event is received or when the max wait duration has expired.
maxBatchSize - The maximum number of events that will be in the list when this callback is invoked.
maxWaitTime - The max time duration to wait to receive a batch of events upto the max batch size before invoking this callback.

Returns:

The updated EventProcessorClient instance.

processPartitionClose

public EventProcessorClientBuilder processPartitionClose(Consumer closePartition)

The function that is called when a processing for a partition stops. The input contains the partition information along with the reason for stopping the event processing for this partition.

Parameters:

closePartition - The callback that's called after processing for a partition stops.

Returns:

The updated EventProcessorClientBuilder instance.

processPartitionInitialization

public EventProcessorClientBuilder processPartitionInitialization(Consumer initializePartition)

The function that is called before processing starts for a partition. The input contains the partition information along with a default starting position for processing events that will be used in the case of a checkpoint unavailable in CheckpointStore. Users can update this position if a different starting position is preferred.

Parameters:

initializePartition - The callback that's called before processing starts for a partition

Returns:

The updated EventProcessorClientBuilder instance.

proxyOptions

public EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)

Sets the proxy configuration to use for EventHubAsyncClient. When a proxy is configured, AMQP_WEB_SOCKETS must be used for the transport type.

Parameters:

proxyOptions - The proxy options to use.

Returns:

The updated EventProcessorClientBuilder object.

retry

@Deprecated
public EventProcessorClientBuilder retry(AmqpRetryOptions retryOptions)

Deprecated

Sets the retry policy for EventHubAsyncClient. If not specified, the default retry options are used.

Parameters:

retryOptions - The retry policy to use.

Returns:

The updated EventProcessorClientBuilder object.

retryOptions

public EventProcessorClientBuilder retryOptions(AmqpRetryOptions retryOptions)

Sets the retry policy for EventHubAsyncClient. If not specified, the default retry options are used.

Parameters:

retryOptions - The retry options to use.

Returns:

The updated EventProcessorClientBuilder object.

trackLastEnqueuedEventProperties

public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)

Sets whether or not the event processor should request information on the last enqueued event on its associated partition, and track that information as events are received.

When information about the partition's last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition that it otherwise would not. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client.

Parameters:

trackLastEnqueuedEventProperties - true if the resulting events will keep track of the last enqueued information for that partition; false otherwise.

Returns:

The updated EventProcessorClientBuilder instance.

transportType

public EventProcessorClientBuilder transportType(AmqpTransportType transport)

Sets the transport type by which all the communication with Azure Event Hubs occurs. Default value is AMQP.

Parameters:

transport - The transport type to use.

Returns:

The updated EventProcessorClientBuilder object.

Applies to