ServiceBusProcessorClient Class

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusProcessorClient

Implements

public final class ServiceBusProcessorClient
implements AutoCloseable

The processor client for processing Service Bus messages. ServiceBusProcessorClient provides a push-based mechanism that invokes the message processing callback when a message is received or the error handler when an error occurs when receiving messages. A ServiceBusProcessorClient can be created to process messages for a session-enabled or non session-enabled Service Bus entity. It supports auto-settlement of messages by default.

Sample code to instantiate a processor client and receive in PeekLock mode

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Sample code to instantiate a processor client and receive in ReceiveAndDelete mode

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

Create and run a session-enabled processor

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

Method Summary

Modifier and Type Method and Description
synchronized void close()

Stops message processing and closes the processor.

synchronized String getIdentifier()

Gets the identifier of the instance of ServiceBusProcessorClient.

String getQueueName()

Returns the queue name associated with this instance of ServiceBusProcessorClient.

String getSubscriptionName()

Returns the subscription name associated with this instance of ServiceBusProcessorClient.

String getTopicName()

Returns the topic name associated with this instance of ServiceBusProcessorClient.

synchronized boolean isRunning()

Returns true if the processor is running.

synchronized void start()

Starts the processor in the background.

synchronized void stop()

Stops the message processing for this processor.

Methods inherited from java.lang.Object

Method Details

close

public synchronized void close()

Stops message processing and closes the processor. The receiving links and sessions are closed and calling start() will create a new processing cycle with new links and new sessions.

getIdentifier

public synchronized String getIdentifier()

Gets the identifier of the instance of ServiceBusProcessorClient.

Returns:

The identifier that can identify the instance of ServiceBusProcessorClient.

getQueueName

public String getQueueName()

Returns the queue name associated with this instance of ServiceBusProcessorClient.

Returns:

the queue name associated with this instance of ServiceBusProcessorClient or null if the processor instance is for a topic and subscription.

getSubscriptionName

public String getSubscriptionName()

Returns the subscription name associated with this instance of ServiceBusProcessorClient.

Returns:

the subscription name associated with this instance of ServiceBusProcessorClient or null if the processor instance is for a queue.

getTopicName

public String getTopicName()

Returns the topic name associated with this instance of ServiceBusProcessorClient.

Returns:

the topic name associated with this instance of ServiceBusProcessorClient or null if the processor instance is for a queue.

isRunning

public synchronized boolean isRunning()

Returns true if the processor is running. If the processor is stopped or closed, this method returns false.

Returns:

true if the processor is running; false otherwise.

start

public synchronized void start()

Starts the processor in the background. When this method is called, the processor will initiate a message receiver that will invoke the message handler when new messages are available. This method is idempotent (ie. calling start() again after the processor is already running is a no-op).

Calling start() after calling stop() will resume processing messages using the same underlying connection.

Calling start() after calling close() will start the processor with a new connection.

stop

public synchronized void stop()

Stops the message processing for this processor. The receiving links and sessions are kept active and this processor can resume processing messages by calling start() again.

Applies to