How to use Spring Cloud Azure Stream Binder for Azure Service Bus

This article demonstrates how to use the Spring Cloud Stream Binder to send messages to and receive messages from Service Bus queues and topics.

Important

Spring Boot 2.0 (or later) is required to complete the steps in this article.

Azure provides an asynchronous messaging platform called Azure Service Bus ("Service Bus") that is based on the Advanced Message Queueing Protocol 1.0 ("AMQP 1.0") standard. Service Bus can be used across the range of supported Azure platforms.

Prerequisites

The following prerequisites are required for this article:

  1. An Azure subscription; if you don't already have an Azure subscription, you can activate your MSDN subscriber benefits or sign up for a free account.

  2. A supported Java Development Kit (JDK), version 8 or later. For more information about the JDKs available for use when developing on Azure, see Java support on Azure and Azure Stack.

  3. Apache's Maven, version 3.2 or later.

  4. If you already have a configured Service Bus queue or topic, ensure that the Service Bus namespace meets the following requirements:

    1. Allows access from all networks
    2. Is Standard (or higher)
    3. Has an access policy with read/write access for your queue and topic
  5. If you don't have a configured Service Bus queue or topic, use the Azure portal to create a Service Bus queue or create a Service Bus topic. Ensure that the namespace meets the requirements specified in the previous step. Also, make note of the connection string in the namespace as you need it for this tutorial's test app.

  6. If you don't have a Spring Boot application, create a Maven project with the Spring Initializr. Remember to select Maven Project and, under Dependencies, add the Web dependency, under Spring Boot, select 2.4.6, select 8 Java version.

Use the Spring Cloud Stream Binder starter

  1. Locate the pom.xml file in the parent directory of your app; for example:

    C:\SpringBoot\servicebus\pom.xml

    -or-

    /users/example/home/servicebus/pom.xml

  2. Open the pom.xml file in a text editor.

  3. Add the following code block under the <dependencies> element, depending on whether you're using a Service Bus queue or topic:

    Service Bus queue

    <dependency>
        <groupId>com.azure.spring</groupId>
        <artifactId>azure-spring-cloud-stream-binder-servicebus-queue</artifactId>
        <version>2.13.0</version>
    </dependency>
    

    Service Bus topic

    <dependency>
        <groupId>com.azure.spring</groupId>
        <artifactId>azure-spring-cloud-stream-binder-servicebus-topic</artifactId>
        <version>2.13.0</version>
    </dependency>
    
  4. Save and close the pom.xml file.

Configure the app for your service bus

You can configure your app based on either the connection string or service principal. This tutorial uses a connection string. For more information about using service principal, see the Spring Cloud Azure Stream Binder for Service Bus queue Code Sample and Spring Cloud Azure Stream Binder for Service Bus topic Code Sample.

  1. Add an application.yaml in the resources directory of your app; for example:

    C:\SpringBoot\servicebus\src\main\resources\application.yaml

    -or-

    /users/example/home/servicebus/src/main/resources/application.yaml

  2. Open the application.yaml file in a text editor, append the appropriate code to the end of the application.yaml file depending on whether you're using a Service Bus queue or topic. Use the Field descriptions table to replace the sample values with the appropriate properties for your service bus.

    Service Bus queue

    spring:
      cloud:
        azure:
          servicebus:
            connection-string: <ServiceBusNamespaceConnectionString>
        stream:
          bindings:
            consume-in-0:
              destination: examplequeue
            supply-out-0:
              destination: examplequeue
          servicebus:
            queue:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint-mode: MANUAL
          function:
            definition: consume;supply;
          poller:
            fixed-delay: 1000
            initial-delay: 0
    

    Service Bus topic

    spring:
      cloud:
        azure:
          servicebus:
            connection-string: <ServiceBusNamespaceConnectionString>
        stream:
          bindings:
            consume-in-0:
              destination: exampletopic
              group: examplesubscription
            supply-out-0:
              destination: exampletopic
          servicebus:
            topic:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint-mode: MANUAL
          function:
            definition: consume;supply;
          poller:
            fixed-delay: 1000
            initial-delay: 0
    

    Field descriptions

    Field Description
    spring.cloud.azure.function.definition Specify which functional bean to bind to the external destination(s) exposed by the bindings.
    spring.cloud.azure.poller.fixed-delay Specify fixed delay for default poller in milliseconds, default 1000L.
    spring.cloud.azure.poller.initial-delay Specify initial delay for periodic triggers, default 0.
    spring.cloud.azure.servicebus.connection-string Specify the connection string you obtained in your Service Bus namespace from the Azure portal.
    spring.cloud.stream.bindings.consume-in-0.destination Specify the Service Bus queue or Service Bus topic you used in this tutorial.
    spring.cloud.stream.bindings.consume-in-0.group If you used a Service Bus topic, specify the topic subscription.
    spring.cloud.stream.bindings.supply-out-0.destination Specify the same value used for input destination.
    spring.cloud.stream.servicebus.queue.bindings.consume-in-0.consumer.checkpoint-mode Specify MANUAL.
    spring.cloud.stream.servicebus.topic.bindings.consume-in-0.consumer.checkpoint-mode Specify MANUAL.
  3. Save and close the application.yaml file.

Implement basic Service Bus functionality

In this section, you create the necessary Java classes for sending messages to your service bus.

Modify the main application class

  1. Locate the main application Java file in the package directory of your app; for example:

    C:\SpringBoot\servicebus\src\main\java\com\example\servicebus\ServiceBusApplication.java

    -or-

    /users/example/home/servicebus/src/main/java/com/example/servicebus/ServiceBusApplication.java

  2. Open the main application Java file in a text editor.

  3. Add the following code to the file:

    package com.example.servicebus;
    
    import com.azure.spring.integration.core.api.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    
    import java.util.function.Consumer;
    
    import static com.azure.spring.integration.core.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusApplication {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusApplication.class);
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusApplication.class, args);
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message -> {
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success().handle((r, ex) -> {
                    if (ex == null) {
                        LOGGER.info("Message '{}' successfully checkpointed", message.getPayload());
                    }
                    return null;
                });
            };
        }
    }
    
  4. Save and close the file.

Create a new producer configuration class

  1. Using a text editor, create a Java file named ServiceProducerConfiguration.java in the package directory of your app.

  2. Add the following code to the new file:

    package com.example.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.Message;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    
    import java.util.function.Supplier;
    
    @Configuration
    public class ServiceProducerConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProducerConfiguration.class);
    
        @Bean
        public Sinks.Many<Message<String>> many() {
            return Sinks.many().unicast().onBackpressureBuffer();
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) {
            return () -> many.asFlux()
                             .doOnNext(m -> LOGGER.info("Manually sending message {}", m))
                             .doOnError(t -> LOGGER.error("Error encountered", t));
        }
    }
    
  3. Save and close the ServiceProducerConfiguration.java file.

Create a new controller class

  1. Using a text editor, create a Java file named ServiceProducerController.java in the package directory of your app.

  2. Add the following lines of code to the new file:

    package com.example.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.ResponseEntity;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Sinks;
    
    @RestController
    public class ServiceProducerController {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProducerController.class);
    
        @Autowired
        private Sinks.Many<Message<String>> many;
    
        @PostMapping("/messages")
        public ResponseEntity<String> sendMessage(@RequestParam String message) {
            LOGGER.info("Going to add message {} to Sinks.Many.", message);
            many.emitNext(MessageBuilder.withPayload(message).build(), Sinks.EmitFailureHandler.FAIL_FAST);
            return ResponseEntity.ok("Sent!");
        }
    }
    
  3. Save and close the ServiceProducerController.java file.

Build and test your application

  1. Open a command prompt.

  2. Change the directory to the location of your pom.xml file; for example:

    cd C:\SpringBoot\servicebus

    -or-

    cd /users/example/home/servicebus

  3. Build your Spring Boot application with Maven and run it:

    mvn clean spring-boot:run
    
  4. Once your application is running, you can use curl to test your application:

    curl -X POST localhost:8080/messages?message=hello
    

    You should see "hello" posted to your application's log:

    New message received: 'hello'
    Message 'hello' successfully checkpointed
    

Clean up resources

When no longer needed, use the Azure portal to delete the resources created in this article to avoid unexpected charges.

Next steps