How to use Spring Cloud Azure Stream Binder for Azure Service Bus
This article demonstrates how to use the Spring Cloud Stream Binder to send messages to and receive messages from Service Bus queues and topics.
Important
Spring Boot 2.0 (or later) is required to complete the steps in this article.
Azure provides an asynchronous messaging platform called Azure Service Bus ("Service Bus") that is based on the Advanced Message Queueing Protocol 1.0 ("AMQP 1.0") standard. Service Bus can be used across the range of supported Azure platforms.
Prerequisites
The following prerequisites are required for this article:
An Azure subscription; if you don't already have an Azure subscription, you can activate your MSDN subscriber benefits or sign up for a free account.
A supported Java Development Kit (JDK), version 8 or later. For more information about the JDKs available for use when developing on Azure, see Java support on Azure and Azure Stack.
Apache's Maven, version 3.2 or later.
If you already have a configured Service Bus queue or topic, ensure that the Service Bus namespace meets the following requirements:
- Allows access from all networks
- Is Standard (or higher)
- Has an access policy with read/write access for your queue and topic
If you don't have a configured Service Bus queue or topic, use the Azure portal to create a Service Bus queue or create a Service Bus topic. Ensure that the namespace meets the requirements specified in the previous step. Also, make note of the connection string in the namespace as you need it for this tutorial's test app.
If you don't have a Spring Boot application, create a Maven project with the Spring Initializr. Remember to select Maven Project and, under Dependencies, add the Web dependency, under Spring Boot, select 2.4.6, select 8 Java version.
Use the Spring Cloud Stream Binder starter
Locate the pom.xml file in the parent directory of your app; for example:
C:\SpringBoot\servicebus\pom.xml-or-
/users/example/home/servicebus/pom.xmlOpen the pom.xml file in a text editor.
Add the following code block under the <dependencies> element, depending on whether you're using a Service Bus queue or topic:
Service Bus queue
<dependency> <groupId>com.azure.spring</groupId> <artifactId>azure-spring-cloud-stream-binder-servicebus-queue</artifactId> <version>2.13.0</version> </dependency>Service Bus topic
<dependency> <groupId>com.azure.spring</groupId> <artifactId>azure-spring-cloud-stream-binder-servicebus-topic</artifactId> <version>2.13.0</version> </dependency>Save and close the pom.xml file.
Configure the app for your service bus
You can configure your app based on either the connection string or service principal. This tutorial uses a connection string. For more information about using service principal, see the Spring Cloud Azure Stream Binder for Service Bus queue Code Sample and Spring Cloud Azure Stream Binder for Service Bus topic Code Sample.
Add an application.yaml in the resources directory of your app; for example:
C:\SpringBoot\servicebus\src\main\resources\application.yaml-or-
/users/example/home/servicebus/src/main/resources/application.yamlOpen the application.yaml file in a text editor, append the appropriate code to the end of the application.yaml file depending on whether you're using a Service Bus queue or topic. Use the Field descriptions table to replace the sample values with the appropriate properties for your service bus.
Service Bus queue
spring: cloud: azure: servicebus: connection-string: <ServiceBusNamespaceConnectionString> stream: bindings: consume-in-0: destination: examplequeue supply-out-0: destination: examplequeue servicebus: queue: bindings: consume-in-0: consumer: checkpoint-mode: MANUAL function: definition: consume;supply; poller: fixed-delay: 1000 initial-delay: 0Service Bus topic
spring: cloud: azure: servicebus: connection-string: <ServiceBusNamespaceConnectionString> stream: bindings: consume-in-0: destination: exampletopic group: examplesubscription supply-out-0: destination: exampletopic servicebus: topic: bindings: consume-in-0: consumer: checkpoint-mode: MANUAL function: definition: consume;supply; poller: fixed-delay: 1000 initial-delay: 0Field Description spring.cloud.azure.function.definitionSpecify which functional bean to bind to the external destination(s) exposed by the bindings. spring.cloud.azure.poller.fixed-delaySpecify fixed delay for default poller in milliseconds, default 1000L. spring.cloud.azure.poller.initial-delaySpecify initial delay for periodic triggers, default 0. spring.cloud.azure.servicebus.connection-stringSpecify the connection string you obtained in your Service Bus namespace from the Azure portal. spring.cloud.stream.bindings.consume-in-0.destinationSpecify the Service Bus queue or Service Bus topic you used in this tutorial. spring.cloud.stream.bindings.consume-in-0.groupIf you used a Service Bus topic, specify the topic subscription. spring.cloud.stream.bindings.supply-out-0.destinationSpecify the same value used for input destination. spring.cloud.stream.servicebus.queue.bindings.consume-in-0.consumer.checkpoint-modeSpecify MANUAL.spring.cloud.stream.servicebus.topic.bindings.consume-in-0.consumer.checkpoint-modeSpecify MANUAL.Save and close the application.yaml file.
Implement basic Service Bus functionality
In this section, you create the necessary Java classes for sending messages to your service bus.
Modify the main application class
Locate the main application Java file in the package directory of your app; for example:
C:\SpringBoot\servicebus\src\main\java\com\example\servicebus\ServiceBusApplication.java-or-
/users/example/home/servicebus/src/main/java/com/example/servicebus/ServiceBusApplication.javaOpen the main application Java file in a text editor.
Add the following code to the file:
package com.example.servicebus; import com.azure.spring.integration.core.api.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import java.util.function.Consumer; import static com.azure.spring.integration.core.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusApplication { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusApplication.class); public static void main(String[] args) { SpringApplication.run(ServiceBusApplication.class, args); } @Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success().handle((r, ex) -> { if (ex == null) { LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()); } return null; }); }; } }Save and close the file.
Create a new producer configuration class
Using a text editor, create a Java file named ServiceProducerConfiguration.java in the package directory of your app.
Add the following code to the new file:
package com.example.servicebus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Supplier; @Configuration public class ServiceProducerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProducerConfiguration.class); @Bean public Sinks.Many<Message<String>> many() { return Sinks.many().unicast().onBackpressureBuffer(); } @Bean public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) { return () -> many.asFlux() .doOnNext(m -> LOGGER.info("Manually sending message {}", m)) .doOnError(t -> LOGGER.error("Error encountered", t)); } }Save and close the ServiceProducerConfiguration.java file.
Create a new controller class
Using a text editor, create a Java file named ServiceProducerController.java in the package directory of your app.
Add the following lines of code to the new file:
package com.example.servicebus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Sinks; @RestController public class ServiceProducerController { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProducerController.class); @Autowired private Sinks.Many<Message<String>> many; @PostMapping("/messages") public ResponseEntity<String> sendMessage(@RequestParam String message) { LOGGER.info("Going to add message {} to Sinks.Many.", message); many.emitNext(MessageBuilder.withPayload(message).build(), Sinks.EmitFailureHandler.FAIL_FAST); return ResponseEntity.ok("Sent!"); } }Save and close the ServiceProducerController.java file.
Build and test your application
Open a command prompt.
Change the directory to the location of your pom.xml file; for example:
cd C:\SpringBoot\servicebus-or-
cd /users/example/home/servicebusBuild your Spring Boot application with Maven and run it:
mvn clean spring-boot:runOnce your application is running, you can use curl to test your application:
curl -X POST localhost:8080/messages?message=helloYou should see "hello" posted to your application's log:
New message received: 'hello' Message 'hello' successfully checkpointed
Clean up resources
When no longer needed, use the Azure portal to delete the resources created in this article to avoid unexpected charges.