在 Spring 应用程序中使用Azure 服务总线
本文介绍如何在使用 Spring Framework 生成的 Java 应用程序中使用Azure 服务总线。
Azure 提供一个名为 Azure 服务总线 (服务总线) 的异步消息传送平台,该平台基于高级消息队列协议 1.0 (AMQP 1.0) 标准。 可以在受支持的 Azure 平台范围内使用服务总线。
Spring Cloud Azure 提供了各种模块,用于使用 Spring 框架向服务总线队列和主题/订阅发送消息以及接收消息。
可以单独使用以下模块,也可以将它们合并为不同的用例:
Spring Cloud Azure 服务总线 Starter 使你能够使用具有 Spring Boot 功能的 服务总线 Java SDK 客户端库发送和接收消息。
Spring Cloud Azure 服务总线 JMS Starter 使你能够使用 JMS API 发送和接收包含服务总线队列和主题/订阅的消息。
Spring Messaging Azure 服务总线使你能够通过 Spring Messaging API 与服务总线进行交互。
Spring Integration Azure 服务总线使你能够将 Spring Integration Message Channels 与 服务总线 连接。
使用适用于 服务总线 的 Spring Cloud Stream Binder,可以在 Spring Cloud Stream 应用程序中将服务总线用作消息传递中间件。
先决条件
- Azure 订阅 - 免费创建订阅。
- Java 开发工具包 (JDK) 版本 8 或更高版本。
- Apache Maven 版本 3.0 或更高版本。
- Azure 服务总线和队列或主题/订阅。 如果没有队列或主题,请创建服务总线队列或主题。 有关详细信息,请参阅“使用Azure 门户创建服务总线命名空间和队列,或使用Azure 门户创建主题的服务总线主题和订阅。
- 如果没有 Spring Boot 应用程序,请使用 Spring Initializr 创建 Maven 项目。 请务必选择“Maven 项目”,并在“依赖项”下添加“Spring Web”依赖项,然后选择“Java 版本 8 或更高版本”。
注意
若要向帐户授予对服务总线资源的访问权限,请在新创建的Azure 服务总线命名空间中,将Azure 服务总线数据发送方和Azure 服务总线数据接收方角色分配给当前正在使用的 Microsoft Entra 帐户。 有关详细信息,请参阅使用 Azure 门户分配 Azure 角色。
重要
完成本教程中的步骤需要 Spring Boot 2.5 或更高版本。
准备本地环境
在本教程中,配置和代码没有任何身份验证操作。 但是,连接到 Azure 服务需要身份验证。 若要完成身份验证,需要使用 Azure 标识客户端库。 Spring Cloud Azure 使用 DefaultAzureCredential
Azure 标识库提供的帮助获取凭据,而无需进行任何代码更改。
DefaultAzureCredential
支持多种身份验证方法,并确定应在运行时使用哪种方法。 此方法使应用能够在不同环境(如本地或生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅 Azure 托管 Java 应用程序的 DefaultAzureCredential 部分。
若要使用 Azure CLI、IntelliJ 或其他方法在本地开发环境中完成身份验证,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用托管标识。 有关详细信息,请参阅什么是 Azure 资源的托管标识?
注意
JMS API 的Azure 服务总线目前不支持DefaultAzureCredential
。 如果要将 Spring JMS 与 服务总线 配合使用,请忽略此步骤。
使用 Spring Cloud Azure 服务总线 Starter
Spring Cloud Azure 服务总线 Starter 模块使用 Spring Boot 框架导入 服务总线 Java 客户端库。 可以在非互斥模式中使用 Spring Cloud Azure 和 Azure SDK。 因此,可以在 Spring 应用程序中继续使用 服务总线 Java 客户端 API。
添加服务总线依赖项
若要安装 Spring Cloud Azure 服务总线 Starter 模块,请将以下依赖项添加到pom.xml文件:
Spring Cloud Azure 材料清单(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.17.0
。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>
。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure 服务总线 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-servicebus</artifactId> </dependency>
编写应用程序代码以发送和接收消息
本指南介绍如何在 Spring 应用程序的上下文中使用 服务总线 Java 客户端。 在这里,我们引入了两种替代方法。 建议的方法是使用 Spring Boot 自动配置,并使用 Spring 上下文中的现装客户端。 另一种方法是以编程方式构建客户端。
第一种方法涉及从 Spring IoC 容器自动连接客户端豆类,与第二种方法相比,具有以下优势。 使用 服务总线 客户端进行开发时,这些优势可提供更灵活、更高效的体验。
可以使用 外部化配置 ,以便在不同的环境中使用相同的应用程序代码。
可以将学习生成器模式的过程委托给 Spring Boot 框架,并将此客户端注册到应用程序上下文。 通过此委派,你可以专注于如何将客户端用于自己的业务需求。
可以使用运行状况指示器轻松检查应用程序和内部组件的状态和运行状况。
下面的代码示例演示了如何使用 ServiceBusSenderClient
这 ServiceBusProcessorClient
两种替代方法。
注意
用于服务总线的 Azure Java SDK 提供了多个客户端来与服务总线进行交互。 初学者还为所有服务总线客户端和客户端生成器提供自动配置。 此处我们仅 ServiceBusSenderClient
使用并 ServiceBusProcessorClient
作为示例。
使用 Spring Boot 自动配置
若要向服务总线发送消息并从中接收消息,请使用以下步骤配置应用程序:
配置服务总线命名空间和队列,如以下示例所示:
spring.cloud.azure.servicebus.namespace=<your-servicebus-namespace-name> spring.cloud.azure.servicebus.entity-name=<your-servicebus-queue-name> spring.cloud.azure.servicebus.entity-type=queue
提示
此处我们使用服务总线队列作为示例。 若要使用主题/订阅,需要添加
spring.cloud.azure.servicebus.processor.subscription-name
属性并将值更改为entity-type
topic
。创建一个新的
ServiceBusProcessorClientConfiguration
Java 类,如以下示例所示。 此类用于注册消息和错误处理程序ServiceBusProcessorClient
。@Configuration(proxyBeanMethods = false) public class ServiceBusProcessorClientConfiguration { @Bean ServiceBusRecordMessageListener processMessage() { return context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Id: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }; } @Bean ServiceBusErrorHandler processError() { return context -> { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); }; } }
ServiceBusSenderClient
注入 Spring 应用程序中,并调用相关 API 以发送消息,如以下示例所示:@SpringBootApplication public class ServiceBusQueueApplication implements CommandLineRunner { private final ServiceBusSenderClient senderClient; public ServiceBusQueueApplication(ServiceBusSenderClient senderClient) { this.senderClient = senderClient; } public static void main(String[] args) { SpringApplication.run(ServiceBusQueueApplication.class, args); } @Override public void run(String... args) throws Exception { // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.printf("Sent a message to the queue"); senderClient.close(); // wait the processor client to consume messages TimeUnit.SECONDS.sleep(10); } }
注意
默认情况下,自动连接
ServiceBusProcessorClient
豆的生命周期由 Spring 上下文管理。 当 Spring 应用程序上下文启动时,处理器会自动启动,并在 Spring 应用程序上下文停止时停止。 若要禁用此功能,请配置spring.cloud.azure.servicebus.processor.auto-startup=false
。启动应用程序。 将显示类似于以下示例的日志:
Sent a message to the queue Processing message. Id: 6f405435200047069a3caf80893a80bc, Sequence #: 1. Contents: Hello, World!
以编程方式生成服务总线客户端
你可以自行生成这些客户端豆,但该过程很复杂。 在 Spring Boot 应用程序中,必须管理属性、了解生成器模式,并将客户端注册到 Spring 应用程序上下文。 下面的代码示例演示如何执行此操作:
创建一个新的
ServiceBusClientConfiguration
Java 类,如以下示例所示。 此类用于声明ServiceBusSenderClient
和ServiceBusProcessorClient
豆类。@Configuration(proxyBeanMethods = false) public class ServiceBusClientConfiguration { private static final String SERVICE_BUS_FQDN = "<service-bus-fully-qualified-namespace>"; private static final String QUEUE_NAME = "<service-bus-queue-name>"; @Bean ServiceBusClientBuilder serviceBusClientBuilder() { return new ServiceBusClientBuilder() .fullyQualifiedNamespace(SERVICE_BUS_FQDN) .credential(new DefaultAzureCredentialBuilder().build()); } @Bean ServiceBusSenderClient serviceBusSenderClient(ServiceBusClientBuilder builder) { return builder .sender() .queueName(QUEUE_NAME) .buildClient(); } @Bean ServiceBusProcessorClient serviceBusProcessorClient(ServiceBusClientBuilder builder) { return builder.processor() .queueName(QUEUE_NAME) .processMessage(ServiceBusClientConfiguration::processMessage) .processError(ServiceBusClientConfiguration::processError) .buildProcessorClient(); } private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Id: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); } private static void processError(ServiceBusErrorContext context) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); } }
注意
请务必将
<service-bus-fully-qualified-namespace>
占位符替换为Azure 门户服务总线主机名。 将<service-bus-queue-name>
占位符替换为在服务总线命名空间中配置自己的队列名称。将客户端豆类注入应用程序,如以下示例所示:
@SpringBootApplication public class ServiceBusQueueApplication implements CommandLineRunner { private final ServiceBusSenderClient senderClient; private final ServiceBusProcessorClient processorClient; public ServiceBusQueueApplication(ServiceBusSenderClient senderClient, ServiceBusProcessorClient processorClient) { this.senderClient = senderClient; this.processorClient = processorClient; } public static void main(String[] args) { SpringApplication.run(ServiceBusQueueApplication.class, args); } @Override public void run(String... args) throws Exception { // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.printf("Sent a message to the queue"); senderClient.close(); System.out.printf("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.printf("Stopping and closing the processor"); processorClient.close(); } }
启动应用程序。 将显示类似于以下示例的日志:
Sent a message to the queue Starting the processor ... Processing message. Id: 6f405435200047069a3caf80893a80bc, Sequence #: 1. Contents: Hello, World! Stopping and closing the processor
以下列表显示了此代码不灵活或正常的原因:
- 命名空间和队列/主题/订阅名称是硬编码的。
- 如果用于
@Value
从 Spring 环境获取配置,则 application.properties 文件中不能有 IDE 提示。 - 如果有微服务方案,则必须复制每个项目中的代码,并且很容易出错且难以保持一致。
幸运的是,使用 Spring Cloud Azure 不需要自行构建客户端豆类。 相反,你可以直接注入豆类,并使用已熟悉的配置属性来配置服务总线。
Spring Cloud Azure 还为不同方案提供以下全局配置。 有关详细信息,请参阅 Spring Cloud Azure 配置的 Azure 服务 SDK 的 全局配置部分。
- 代理选项。
- 重试选项。
- AMQP 传输客户端选项。
还可以连接到不同的 Azure 云。 有关详细信息,请参阅连接到不同的 Azure 云。
使用 Spring Cloud Azure 服务总线 JMS Starter
Spring Cloud Azure 服务总线 JMS Starter 模块提供 Spring JMS 与 服务总线 的集成。 以下视频介绍如何使用 JMS 2.0 将 Spring JMS 应用程序与 Azure 服务总线集成。
本指南介绍如何使用 Spring Cloud Azure 服务总线 Starter for JMS API 将消息发送到服务总线并从中接收消息。
添加服务总线依赖项
若要安装 Spring Cloud Azure 服务总线 JMS Starter 模块,请将以下依赖项添加到pom.xml文件:
Spring Cloud Azure 材料清单(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.17.0
。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>
。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure 服务总线 JMS 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId> </dependency>
编写应用程序代码以发送和接收消息
为服务总线配置连接字符串和定价层,如以下示例所示:
spring.jms.servicebus.connection-string=<service-bus-namespace-connection-string> spring.jms.servicebus.pricing-tier=<service-bus-pricing-tier>
创建消息接收器。
Spring 提供了将消息发布到任何 POJO(普通旧 Java 对象)的方法。 首先,定义一个用于存储和检索用户名的泛型
User
类,如以下示例所示:public class User implements Serializable { private static final long serialVersionUID = -295422703255886286L; private String name; public User() { } public User(String name) { setName(name); } public String getName() { return name; } public void setName(String name) { this.name = name; } }
提示
实现
Serializable
是为了使用 Spring 框架的JmsTemplate
中的send
方法。 否则,应定义自定义MessageConverter
bean 以文本格式将内容序列化为 JSON。 有关MessageConverter
的详细信息,请参阅官方的 Spring JMS Starter 项目。在此处,可以创建新的
QueueReceiveService
Java 类,如以下示例所示。 此类用于定义消息接收器。@Component public class QueueReceiveService { private static final String QUEUE_NAME = "<service-bus-queue-name>"; @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory") public void receiveMessage(User user) { System.out.printf("Received a message from %s.", user.getName()); } }
注意
请务必将
<service-bus-queue-name>
占位符替换为在服务总线命名空间中配置的自己的队列名称。如果使用主题/订阅,请将
destination
参数更改为主题名称,应containerFactory
为topicJmsListenerContainerFactory
。 此外,请添加参数subscription
来描述订阅名称。使用 Spring 连接发送方和接收方以发送和接收消息,如以下示例所示:
@SpringBootApplication @EnableJms public class ServiceBusJmsStarterApplication { private static final String QUEUE_NAME = "<service-bus-queue-name>"; public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(ServiceBusJMSQueueApplication.class, args); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); // Send a message with a POJO - the template reuse the message converter System.out.println("Sending a user message."); jmsTemplate.convertAndSend(QUEUE_NAME, new User("Tom")); } }
注意
请务必将
<service-bus-queue-name>
占位符替换为在服务总线命名空间中配置的自己的队列名称。提示
请务必添加
@EnableIntegration
批注,这会触发使用@JmsListener
注释注释的方法的发现,从而在封面下创建消息侦听器容器。启动应用程序。 将显示类似于以下示例的日志:
Sending a user message. Received a message from Tom.
其他信息
有关详细信息,请参阅如何将 JMS API 与 服务总线 和 AMQP 1.0 配合使用。
使用 Spring Messaging Azure 服务总线
Spring Messaging Azure 服务总线 模块支持具有 服务总线 的 Spring Messaging 框架。
如果使用 Spring Messaging Azure 服务总线,则可以使用以下功能:
ServiceBusTemplate
:以异步和同步方式将消息发送到服务总线队列和主题。@ServiceBusListener
:将方法标记为目标上服务总线消息侦听器的目标。
本指南介绍如何使用 Spring Messaging Azure 服务总线向服务总线发送消息以及从服务总线接收消息。
添加服务总线依赖项
若要安装 Spring Messaging Azure 服务总线 模块,请将以下依赖项添加到pom.xml文件:
Spring Cloud Azure 材料清单(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.17.0
。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>
。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。Spring Messaging 服务总线 和 Spring Cloud Azure 初学者项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-messaging-azure-servicebus</artifactId> </dependency>
编写应用程序代码以发送和接收消息
为服务总线配置命名空间和队列类型,如以下示例所示:
spring.cloud.azure.servicebus.namespace=<service-bus-namespace-name> spring.cloud.azure.servicebus.entity-type=queue
注意
如果使用主题/订阅,请将
spring.cloud.azure.servicebus.entity-type
值更改为topic
.创建一个新的
ConsumerService
Java 类,如以下示例所示。 此类用于定义消息接收器。@Service public class ConsumerService { private static final String QUEUE_NAME = "<service-bus-queue-name>"; @ServiceBusListener(destination = QUEUE_NAME) public void handleMessageFromServiceBus(String message) { System.out.printf("Consume message: %s%n", message); } }
注意
如果使用主题/订阅,请更改作为主题名称的批注参数
destination
,并添加subscription
参数以描述订阅名称。使用 Spring 连接发送方和接收方以发送和接收消息,如以下示例所示:
@SpringBootApplication @EnableAzureMessaging public class Application { private static final String QUEUE_NAME = "<service-bus-queue-name>"; public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class); ServiceBusTemplate serviceBusTemplate = applicationContext.getBean(ServiceBusTemplate.class); System.out.println("Sending a message to the queue."); serviceBusTemplate.sendAsync(QUEUE_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe(); } }
提示
请务必添加
@EnableAzureMessaging
批注,这会触发使用@ServiceBusListener
注释注释的方法的发现,从而在封面下创建消息侦听器容器。启动应用程序。 将显示类似于以下示例的日志:
Sending a message to the queue. Consume message: Hello world.
使用 Spring Integration Azure 服务总线
Spring Integration Azure 服务总线 模块支持具有 服务总线 的 Spring Integration 框架。
如果 Spring 应用程序使用 Spring Integration 消息通道,则可以使用通道适配器在消息通道和服务总线之间路由消息。
入站通道适配器将消息从服务总线队列或订阅转发到消息通道。 出站通道适配器将消息从消息通道发布到服务总线队列和主题。
本指南介绍如何使用 Spring Integration Azure 服务总线 向服务总线发送消息并从服务总线接收消息。
添加服务总线依赖项
若要安装 Spring Cloud Azure 服务总线 Integration Starter 模块,请将以下依赖项添加到pom.xml文件:
Spring Cloud Azure 材料清单(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.17.0
。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>
。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure 服务总线集成项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId> </dependency>
编写应用程序代码以发送和接收消息
配置服务总线的命名空间,如以下示例所示:
spring.cloud.azure.servicebus.namespace=<your-servicebus-namespace-name>
创建一个新的
QueueReceiveConfiguration
Java 类,如以下示例所示。 此类用于定义消息接收器。@Configuration public class QueueReceiveConfiguration { private static final String INPUT_CHANNEL = "queue.input"; private static final String QUEUE_NAME = "<your-servicebus-queue-name>"; private static final String SERVICE_BUS_MESSAGE_LISTENER_CONTAINER = "queue-listener-container"; /** * This message receiver binding with {@link ServiceBusInboundChannelAdapter} * via {@link MessageChannel} has name {@value INPUT_CHANNEL} */ @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload) { String message = new String(payload); System.out.printf("New message received: '%s'%n", message); } @Bean(SERVICE_BUS_MESSAGE_LISTENER_CONTAINER) public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, @Qualifier(SERVICE_BUS_MESSAGE_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean(name = INPUT_CHANNEL) public MessageChannel input() { return new DirectChannel(); } }
创建一个新的
QueueSendConfiguration
Java 类,如以下示例所示。 此类用于定义消息发件人。@Configuration public class QueueSendConfiguration { private static final String OUTPUT_CHANNEL = "queue.output"; private static final String QUEUE_NAME = "<your-servicebus-queue-name>"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { System.out.println("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { System.out.println("There was an error sending the message."); } }); return handler; } /** * Message gateway binding with {@link MessageHandler} * via {@link MessageChannel} has name {@value OUTPUT_CHANNEL} */ @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
使用 Spring 连接发送方和接收方以发送和接收消息,如以下示例所示:
@SpringBootApplication @EnableIntegration @Configuration(proxyBeanMethods = false) public class ServiceBusIntegrationApplication { public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(ServiceBusIntegrationApplication.class, args); QueueSendConfiguration.QueueOutboundGateway outboundGateway = applicationContext.getBean(QueueSendConfiguration.QueueOutboundGateway.class); System.out.println("Sending a message to the queue"); outboundGateway.send("Hello World"); } }
提示
请务必添加
@EnableIntegration
批注,以便启用 Spring Integration 基础结构。启动应用程序。 将显示类似于以下示例的日志:
Message was sent successfully. New message received: 'Hello World'
使用 Spring Cloud Stream 服务总线 Binder
若要在 Spring Cloud Stream 应用程序中调用服务总线 API,请使用 Spring Cloud Azure 服务总线 Stream Binder 模块。
本指南介绍如何使用 Spring Cloud Stream 服务总线 Binder 将消息发送到服务总线并从服务总线接收消息。
添加服务总线依赖项
若要安装 Spring Cloud Azure 服务总线 Stream Binder 模块,请将以下依赖项添加到pom.xml文件:
Spring Cloud Azure 材料清单(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.17.0
。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>
。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure 服务总线集成项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
编写应用程序代码以发送和接收消息
配置服务总线的命名空间,如以下示例所示:
spring.cloud.azure.servicebus.namespace=<service-bus-namespace-name>
创建消息接收器。
若要将应用程序用作事件接收器,请通过指定以下信息来配置输入绑定器:
声明定义
Consumer
消息处理逻辑的 bean。 例如,以下Consumer
bean 命名consume
为:@Bean public Consumer<Message<String>> consume() { return message -> { System.out.printf("New message received: '%s'.%n", message.getPayload()); }; }
添加配置以通过替换
<service-bus-queue-name>
占位符来指定queue
使用的名称,如以下示例所示:# name for the `Consumer` bean spring.cloud.function.definition=consume spring.cloud.stream.bindings.consume-in-0.destination=<service-bus-queue-name>
注意
若要从服务总线订阅使用,请务必更改
consume-in-0
绑定属性,如以下示例所示:spring.cloud.stream.bindings.consume-in-0.destination=<service-bus-topic-name> spring.cloud.stream.bindings.consume-in-0.group=<service-bus-subscription-name>
创建邮件发件人。
若要将应用程序用作事件源,请通过指定以下信息来配置输出绑定器:
定义一个
Supplier
bean,用于定义消息来自应用程序中的位置。@Bean return () -> { System.out.println("Sending a message."); return MessageBuilder.withPayload("Hello world").build(); }; }
添加配置以
queue
指定发送的名称,方法是替换<your-servicebus-queue-name>
以下示例中的占位符:# "consume" is added from the previous step spring.cloud.function.definition=consume;supply spring.cloud.stream.bindings.supply-out-0.destination=<your-servicebus-queue-name> spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
注意
若要发送到服务总线主题,请务必将其更改为
entity-type
topic
。
启动应用程序。 你会看到类似于以下示例的日志:
Sending a message. New message received: 'Hello world'.
部署到 Azure Spring Apps
现在,你已在本地运行 Spring Boot 应用程序,现在可以将其移动到生产环境。 使用 Azure Spring Apps 可以轻松地将 Spring Boot 应用程序部署到 Azure,而无需进行任何代码更改。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps 可以通过以下方法提供生命周期管理:综合性监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等。 若要将应用程序部署到 Azure Spring Apps,请参阅将 第一个应用程序部署到 Azure Spring Apps。
后续步骤
另请参阅
有关适用于 Microsoft Azure 的 Spring Boot 初学者的详细信息,请参阅 什么是 Spring Cloud Azure?
反馈
https://aka.ms/ContentUserFeedback。
即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅:提交和查看相关反馈