EventProcessorHost Class

  • java.lang.Object
    • com.microsoft.azure.eventprocessorhost.EventProcessorHost

public class EventProcessorHost

Constructor Summary

Constructor Description
EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, final String storageConnectionString, final String storageContainerName)

Create a new host instance to process events from an Event Hub.

Since Event Hubs are generally used for scale-out, high-traffic scenarios, in most scenarios there will be only one host instances per process, and the processes will be run on separate machines. Besides scale, this also provides isolation: one process or machine crashing will not take out multiple host instances. However, it is supported to run multiple host instances on one machine, or even within one process, for development and testing.

The hostName parameter is a name for this event processor host, which must be unique among all event processor host instances receiving from this event hub+consumer group combination: the unique name is used to distinguish which event processor host instance owns the lease for a given partition. An easy way to generate a unique hostName which also includes other information is to call EventProcessorHost.createHostName("mystring").

This overload of the constructor uses the built-in lease and checkpoint managers. The Azure Storage account specified by the storageConnectionString parameter is used by the built-in managers to record leases and checkpoints, in the specified container.

The Event Hub connection string may be conveniently constructed using the ConnectionStringBuilder class from the Java Event Hub client.

EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, final String storageConnectionString, final String storageContainerName, final ScheduledExecutorService executorService)

Create a new host to process events from an Event Hub.

This overload adds an argument to specify a user-provided thread pool. The number of partitions in the target event hub and the number of host instances should be considered when choosing the size of the thread pool: how many partitions is one instance expected to own under normal circumstances? One thread per partition should provide good performance, while being able to support more partitions adequately if a host instance fails and its partitions must be redistributed.

EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, final String storageConnectionString, final String storageContainerName, final String storageBlobPrefix)

Create a new host to process events from an Event Hub.

This overload adds an argument to specify a prefix used by the built-in lease manager when naming blobs in Azure Storage.

EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, final String storageConnectionString, final String storageContainerName, final String storageBlobPrefix, final ScheduledExecutorService executorService)

Create a new host to process events from an Event Hub.

This overload allows the caller to specify both a user-supplied thread pool and a prefix used by the built-in lease manager when naming blobs in Azure Storage.

EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, ICheckpointManager checkpointManager, ILeaseManager leaseManager)

Create a new host to process events from an Event Hub.

This overload allows the caller to provide their own lease and checkpoint managers to replace the built-in ones based on Azure Storage.

EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, ICheckpointManager checkpointManager, ILeaseManager leaseManager, ScheduledExecutorService executorService, RetryPolicy retryPolicy)

Create a new host to process events from an Event Hub.

This overload allows the caller to provide their own lease and checkpoint managers to replace the built-in ones based on Azure Storage, and to provide an executor service and a retry policy for communications with the event hub.

Method Summary

Modifier and Type Method and Description
String createHostName(String prefix)

Convenience method for generating unique host names, safe to pass to the EventProcessorHost constructors that take a hostName argument.

If a prefix is supplied, the constructed name begins with that string. If the prefix argument is null or an empty string, the constructed name begins with "javahost". Then a dash '-' and a UUID are appended to create a unique name.

String getHostName()

The processor host name is supplied by the user at constructor time, but being able to get it is useful because it means not having to carry both the host object and the name around. As long as you have the host object, you can get the name back, such as for logging.

PartitionManagerOptions getPartitionManagerOptions()

Returns the existing partition manager options object. Unless you are providing implementations of ILeaseManager and ICheckpointMananger, to change partition manager options, call this method to get the existing object and call setters on it to adjust the values.

<T extends IEventProcessor> CompletableFuture<Void> registerEventProcessor(Class<T> eventProcessorType)

Register class for event processor and start processing.

This overload uses the default event processor factory, which simply creates new instances of the registered event processor class, and uses all the default options.

The returned CompletableFuture completes when host initialization is finished. Initialization failures are reported by completing the future with an exception, so it is important to call get() on the future and handle any exceptions thrown.

class MyEventProcessor implements IEventProcessor { ... }
EventProcessorHost host = new EventProcessorHost(...);
{ CompletableFuture<Void>} foo = host.registerEventProcessor(MyEventProcessor.class);
foo.get();

<T extends IEventProcessor> CompletableFuture<Void> registerEventProcessor(Class<T> eventProcessorType, EventProcessorOptions processorOptions)

Register class for event processor and start processing.

This overload uses the default event processor factory, which simply creates new instances of the registered event processor class, but takes user-specified options.

The returned CompletableFuture completes when host initialization is finished. Initialization failures are reported by completing the future with an exception, so it is important to call get() on the future and handle any exceptions thrown.

CompletableFuture<Void> registerEventProcessorFactory(IEventProcessorFactory<?> factory)

Register a user-supplied event processor factory and start processing.

If creating a new event processor requires more work than just new'ing an objects, the user must create an object that implements IEventProcessorFactory and pass it to this method, instead of calling registerEventProcessor.

This overload uses default options for the processor host and event processor(s).

The returned CompletableFuture completes when host initialization is finished. Initialization failures are reported by completing the future with an exception, so it is important to call get() on the future and handle any exceptions thrown.

CompletableFuture<Void> registerEventProcessorFactory(IEventProcessorFactory<?> factory, EventProcessorOptions processorOptions)

Register user-supplied event processor factory and start processing.

This overload takes user-specified options.

The returned CompletableFuture completes when host initialization is finished. Initialization failures are reported by completing the future with an exception, so it is important to call get() on the future and handle any exceptions thrown.

String safeCreateUUID()

Synchronized string UUID generation convenience method.

We saw null and empty strings returned from UUID.randomUUID().toString() when used from multiple threads and there is no clear answer on the net about whether it is really thread-safe or not.

One of the major users of UUIDs is the built-in lease and checkpoint manager, which can be replaced by user implementations. This UUID generation method is public so user implementations can use it as well and avoid the problems.

void setPartitionManagerOptions(PartitionManagerOptions options)

Set the partition manager options all at once. Normally this method is used only when providing user implementations of ILeaseManager and ICheckpointManager, because it allows passing an object of a class derived from PartitionManagerOptions, which could contain options specific to the user-implemented ILeaseManager or ICheckpointMananger. When using the default, Azure Storage-based implementation, the recommendation is to call getPartitionManangerOptions to return the existing options object, then call setters on that object to adjust the values.

CompletableFuture<Void> unregisterEventProcessor()

Stop processing events and shut down this host instance.

Constructor Details

EventProcessorHost

public EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, final String storageConnectionString, final String storageContainerName)

Create a new host instance to process events from an Event Hub.

Since Event Hubs are generally used for scale-out, high-traffic scenarios, in most scenarios there will be only one host instances per process, and the processes will be run on separate machines. Besides scale, this also provides isolation: one process or machine crashing will not take out multiple host instances. However, it is supported to run multiple host instances on one machine, or even within one process, for development and testing.

The hostName parameter is a name for this event processor host, which must be unique among all event processor host instances receiving from this event hub+consumer group combination: the unique name is used to distinguish which event processor host instance owns the lease for a given partition. An easy way to generate a unique hostName which also includes other information is to call EventProcessorHost.createHostName("mystring").

This overload of the constructor uses the built-in lease and checkpoint managers. The Azure Storage account specified by the storageConnectionString parameter is used by the built-in managers to record leases and checkpoints, in the specified container.

The Event Hub connection string may be conveniently constructed using the ConnectionStringBuilder class from the Java Event Hub client.

Parameters:

hostName - A name for this event processor host. See method notes.
eventHubPath - Specifies the Event Hub to receive events from.
consumerGroupName - The name of the consumer group to use when receiving from the Event Hub.
eventHubConnectionString - Connection string for the Event Hub to receive from.
storageConnectionString - Connection string for the Azure Storage account to use for persisting leases and checkpoints.
storageContainerName - Azure Storage container name for use by built-in lease and checkpoint manager.

EventProcessorHost

public EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, final String storageConnectionString, final String storageContainerName, final ScheduledExecutorService executorService)

Create a new host to process events from an Event Hub.

This overload adds an argument to specify a user-provided thread pool. The number of partitions in the target event hub and the number of host instances should be considered when choosing the size of the thread pool: how many partitions is one instance expected to own under normal circumstances? One thread per partition should provide good performance, while being able to support more partitions adequately if a host instance fails and its partitions must be redistributed.

Parameters:

hostName - A name for this event processor host. See method notes.
eventHubPath - Specifies the Event Hub to receive events from.
consumerGroupName - The name of the consumer group to use when receiving from the Event Hub.
eventHubConnectionString - Connection string for the Event Hub to receive from.
storageConnectionString - Connection string for the Azure Storage account to use for persisting leases and checkpoints.
storageContainerName - Azure Storage container name for use by built-in lease and checkpoint manager.
executorService - User-supplied thread executor, or null to use EventProcessorHost-internal executor.

EventProcessorHost

public EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, final String storageConnectionString, final String storageContainerName, final String storageBlobPrefix)

Create a new host to process events from an Event Hub.

This overload adds an argument to specify a prefix used by the built-in lease manager when naming blobs in Azure Storage.

Parameters:

hostName - A name for this event processor host. See method notes.
eventHubPath - Specifies the Event Hub to receive events from.
consumerGroupName - The name of the consumer group to use when receiving from the Event Hub.
eventHubConnectionString - Connection string for the Event Hub to receive from.
storageConnectionString - Connection string for the Azure Storage account to use for persisting leases and checkpoints.
storageContainerName - Azure Storage container name for use by built-in lease and checkpoint manager.
storageBlobPrefix - Prefix used when naming blobs within the storage container.

EventProcessorHost

public EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, final String storageConnectionString, final String storageContainerName, final String storageBlobPrefix, final ScheduledExecutorService executorService)

Create a new host to process events from an Event Hub.

This overload allows the caller to specify both a user-supplied thread pool and a prefix used by the built-in lease manager when naming blobs in Azure Storage.

Parameters:

hostName - A name for this event processor host. See method notes.
eventHubPath - Specifies the Event Hub to receive events from.
consumerGroupName - The name of the consumer group to use when receiving from the Event Hub.
eventHubConnectionString - Connection string for the Event Hub to receive from.
storageConnectionString - Connection string for the Azure Storage account to use for persisting leases and checkpoints.
storageContainerName - Azure Storage container name for use by built-in lease and checkpoint manager.
storageBlobPrefix - Prefix used when naming blobs within the storage container.
executorService - User-supplied thread executor, or null to use EventProcessorHost-internal executor.

EventProcessorHost

public EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, ICheckpointManager checkpointManager, ILeaseManager leaseManager)

Create a new host to process events from an Event Hub.

This overload allows the caller to provide their own lease and checkpoint managers to replace the built-in ones based on Azure Storage.

Parameters:

hostName - A name for this event processor host. See method notes.
eventHubPath - Specifies the Event Hub to receive events from.
consumerGroupName - The name of the consumer group to use when receiving from the Event Hub.
eventHubConnectionString - Connection string for the Event Hub to receive from.
checkpointManager - Implementation of ICheckpointManager, to be replacement checkpoint manager.
leaseManager - Implementation of ILeaseManager, to be replacement lease manager.

EventProcessorHost

public EventProcessorHost(final String hostName, final String eventHubPath, final String consumerGroupName, final String eventHubConnectionString, ICheckpointManager checkpointManager, ILeaseManager leaseManager, ScheduledExecutorService executorService, RetryPolicy retryPolicy)

Create a new host to process events from an Event Hub.

This overload allows the caller to provide their own lease and checkpoint managers to replace the built-in ones based on Azure Storage, and to provide an executor service and a retry policy for communications with the event hub.

Parameters:

hostName - A name for this event processor host. See method notes.
eventHubPath - Specifies the Event Hub to receive events from.
consumerGroupName - The name of the consumer group to use when receiving from the Event Hub.
eventHubConnectionString - Connection string for the Event Hub to receive from.
checkpointManager - Implementation of ICheckpointManager, to be replacement checkpoint manager.
leaseManager - Implementation of ILeaseManager, to be replacement lease manager.
executorService - User-supplied thread executor, or null to use EventProcessorHost-internal executor.
retryPolicy - Retry policy governing communications with the event hub.

Method Details

createHostName

public static String createHostName(String prefix)

Convenience method for generating unique host names, safe to pass to the EventProcessorHost constructors that take a hostName argument.

If a prefix is supplied, the constructed name begins with that string. If the prefix argument is null or an empty string, the constructed name begins with "javahost". Then a dash '-' and a UUID are appended to create a unique name.

Parameters:

prefix - String to use as the beginning of the name. If null or empty, a default is used.

Returns:

A unique host name to pass to EventProcessorHost constructors.

getHostName

public String getHostName()

The processor host name is supplied by the user at constructor time, but being able to get it is useful because it means not having to carry both the host object and the name around. As long as you have the host object, you can get the name back, such as for logging.

Returns:

The processor host name

getPartitionManagerOptions

public PartitionManagerOptions getPartitionManagerOptions()

Returns the existing partition manager options object. Unless you are providing implementations of ILeaseManager and ICheckpointMananger, to change partition manager options, call this method to get the existing object and call setters on it to adjust the values.

Returns:

the internally-created PartitionManangerObjects object or any replacement object set with setPartitionManangerOptions

registerEventProcessor

public CompletableFuture registerEventProcessor(Class eventProcessorType)

Register class for event processor and start processing.

This overload uses the default event processor factory, which simply creates new instances of the registered event processor class, and uses all the default options.

The returned CompletableFuture completes when host initialization is finished. Initialization failures are reported by completing the future with an exception, so it is important to call get() on the future and handle any exceptions thrown.

class MyEventProcessor implements IEventProcessor { ... }
EventProcessorHost host = new EventProcessorHost(...);
{ CompletableFuture<Void>} foo = host.registerEventProcessor(MyEventProcessor.class);
foo.get();

Parameters:

eventProcessorType - Class that implements IEventProcessor.

Returns:

Future that completes when initialization is finished.

registerEventProcessor

public CompletableFuture registerEventProcessor(Class eventProcessorType, EventProcessorOptions processorOptions)

Register class for event processor and start processing.

This overload uses the default event processor factory, which simply creates new instances of the registered event processor class, but takes user-specified options.

The returned CompletableFuture completes when host initialization is finished. Initialization failures are reported by completing the future with an exception, so it is important to call get() on the future and handle any exceptions thrown.

Parameters:

eventProcessorType - Class that implements IEventProcessor.
processorOptions - Options for the processor host and event processor(s).

Returns:

Future that completes when initialization is finished.

registerEventProcessorFactory

public CompletableFuture registerEventProcessorFactory(IEventProcessorFactory factory)

Register a user-supplied event processor factory and start processing.

If creating a new event processor requires more work than just new'ing an objects, the user must create an object that implements IEventProcessorFactory and pass it to this method, instead of calling registerEventProcessor.

This overload uses default options for the processor host and event processor(s).

The returned CompletableFuture completes when host initialization is finished. Initialization failures are reported by completing the future with an exception, so it is important to call get() on the future and handle any exceptions thrown.

Parameters:

factory - User-supplied event processor factory object.

Returns:

Future that completes when initialization is finished.

registerEventProcessorFactory

public CompletableFuture registerEventProcessorFactory(IEventProcessorFactory factory, EventProcessorOptions processorOptions)

Register user-supplied event processor factory and start processing.

This overload takes user-specified options.

The returned CompletableFuture completes when host initialization is finished. Initialization failures are reported by completing the future with an exception, so it is important to call get() on the future and handle any exceptions thrown.

Parameters:

factory - User-supplied event processor factory object.
processorOptions - Options for the processor host and event processor(s).

Returns:

Future that completes when initialization is finished.

safeCreateUUID

public static String safeCreateUUID()

Synchronized string UUID generation convenience method.

We saw null and empty strings returned from UUID.randomUUID().toString() when used from multiple threads and there is no clear answer on the net about whether it is really thread-safe or not.

One of the major users of UUIDs is the built-in lease and checkpoint manager, which can be replaced by user implementations. This UUID generation method is public so user implementations can use it as well and avoid the problems.

Returns:

A string UUID with dashes but no curly brackets.

setPartitionManagerOptions

public void setPartitionManagerOptions(PartitionManagerOptions options)

Set the partition manager options all at once. Normally this method is used only when providing user implementations of ILeaseManager and ICheckpointManager, because it allows passing an object of a class derived from PartitionManagerOptions, which could contain options specific to the user-implemented ILeaseManager or ICheckpointMananger. When using the default, Azure Storage-based implementation, the recommendation is to call getPartitionManangerOptions to return the existing options object, then call setters on that object to adjust the values.

Parameters:

options - - a PartitionManangerOptions object (or derived object) representing the desired options

unregisterEventProcessor

public CompletableFuture unregisterEventProcessor()

Stop processing events and shut down this host instance.

Returns:

A CompletableFuture that completes when shutdown is finished.

Applies to