在 Spring 应用程序中使用Azure 服务总线

本文介绍如何在使用 Spring Framework 生成的 Java 应用程序中使用Azure 服务总线。

Azure 提供一个名为 Azure 服务总线 (服务总线) 的异步消息传送平台,该平台基于高级消息队列协议 1.0 (AMQP 1.0) 标准。 可以在受支持的 Azure 平台范围内使用服务总线。

Spring Cloud Azure 提供了各种模块,用于使用 Spring 框架向服务总线队列和主题/订阅发送消息以及接收消息。

可以单独使用以下模块,也可以将它们合并为不同的用例:

先决条件

注意

若要向帐户授予对服务总线资源的访问权限,请在新创建的Azure 服务总线命名空间中,将Azure 服务总线数据发送方Azure 服务总线数据接收方角色分配给当前正在使用的 Microsoft Entra 帐户。 有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

重要

完成本教程中的步骤需要 Spring Boot 2.5 或更高版本。

准备本地环境

在本教程中,配置和代码没有任何身份验证操作。 但是,连接到 Azure 服务需要身份验证。 若要完成身份验证,需要使用 Azure 标识客户端库。 Spring Cloud Azure 使用 DefaultAzureCredentialAzure 标识库提供的帮助获取凭据,而无需进行任何代码更改。

DefaultAzureCredential 支持多种身份验证方法,并确定应在运行时使用哪种方法。 此方法使应用能够在不同环境(如本地或生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅 Azure 托管 Java 应用程序的 DefaultAzureCredential 部分。

若要使用 Azure CLI、IntelliJ 或其他方法在本地开发环境中完成身份验证,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用托管标识。 有关详细信息,请参阅什么是 Azure 资源的托管标识?

注意

JMS API 的Azure 服务总线目前不支持DefaultAzureCredential。 如果要将 Spring JMS 与 服务总线 配合使用,请忽略此步骤。

使用 Spring Cloud Azure 服务总线 Starter

Spring Cloud Azure 服务总线 Starter 模块使用 Spring Boot 框架导入 服务总线 Java 客户端库。 可以在非互斥模式中使用 Spring Cloud Azure 和 Azure SDK。 因此,可以在 Spring 应用程序中继续使用 服务总线 Java 客户端 API。

添加服务总线依赖项

若要安装 Spring Cloud Azure 服务总线 Starter 模块,请将以下依赖项添加到pom.xml文件:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.11.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.17.0。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。

  • Spring Cloud Azure 服务总线 项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-servicebus</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

本指南介绍如何在 Spring 应用程序的上下文中使用 服务总线 Java 客户端。 在这里,我们引入了两种替代方法。 建议的方法是使用 Spring Boot 自动配置,并使用 Spring 上下文中的现装客户端。 另一种方法是以编程方式构建客户端。

第一种方法涉及从 Spring IoC 容器自动连接客户端豆类,与第二种方法相比,具有以下优势。 使用 服务总线 客户端进行开发时,这些优势可提供更灵活、更高效的体验。

  • 可以使用 外部化配置 ,以便在不同的环境中使用相同的应用程序代码。

  • 可以将学习生成器模式的过程委托给 Spring Boot 框架,并将此客户端注册到应用程序上下文。 通过此委派,你可以专注于如何将客户端用于自己的业务需求。

  • 可以使用运行状况指示器轻松检查应用程序和内部组件的状态和运行状况。

下面的代码示例演示了如何使用 ServiceBusSenderClientServiceBusProcessorClient 两种替代方法。

注意

用于服务总线的 Azure Java SDK 提供了多个客户端来与服务总线进行交互。 初学者还为所有服务总线客户端和客户端生成器提供自动配置。 此处我们仅 ServiceBusSenderClient 使用并 ServiceBusProcessorClient 作为示例。

使用 Spring Boot 自动配置

若要向服务总线发送消息并从中接收消息,请使用以下步骤配置应用程序:

  1. 配置服务总线命名空间和队列,如以下示例所示:

    spring.cloud.azure.servicebus.namespace=<your-servicebus-namespace-name>
    spring.cloud.azure.servicebus.entity-name=<your-servicebus-queue-name>
    spring.cloud.azure.servicebus.entity-type=queue
    

    提示

    此处我们使用服务总线队列作为示例。 若要使用主题/订阅,需要添加 spring.cloud.azure.servicebus.processor.subscription-name 属性并将值更改为 entity-typetopic

  2. 创建一个新的 ServiceBusProcessorClientConfiguration Java 类,如以下示例所示。 此类用于注册消息和错误处理程序 ServiceBusProcessorClient

    @Configuration(proxyBeanMethods = false)
    public class ServiceBusProcessorClientConfiguration {
    
        @Bean
        ServiceBusRecordMessageListener processMessage() {
            return context -> {
                ServiceBusReceivedMessage message = context.getMessage();
                System.out.printf("Processing message. Id: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
                        message.getSequenceNumber(), message.getBody());
            };
        }
    
        @Bean
        ServiceBusErrorHandler processError() {
            return context -> {
                System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
                        context.getFullyQualifiedNamespace(), context.getEntityPath());
            };
        }
    }
    
  3. ServiceBusSenderClient注入 Spring 应用程序中,并调用相关 API 以发送消息,如以下示例所示:

    @SpringBootApplication
    public class ServiceBusQueueApplication implements CommandLineRunner {
    
        private final ServiceBusSenderClient senderClient;
    
        public ServiceBusQueueApplication(ServiceBusSenderClient senderClient) {
            this.senderClient = senderClient;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            // send one message to the queue
            senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
            System.out.printf("Sent a message to the queue");
            senderClient.close();
    
            // wait the processor client to consume messages
            TimeUnit.SECONDS.sleep(10);
        }
    
    }
    

    注意

    默认情况下,自动连接 ServiceBusProcessorClient 豆的生命周期由 Spring 上下文管理。 当 Spring 应用程序上下文启动时,处理器会自动启动,并在 Spring 应用程序上下文停止时停止。 若要禁用此功能,请配置 spring.cloud.azure.servicebus.processor.auto-startup=false

  4. 启动应用程序。 将显示类似于以下示例的日志:

    Sent a message to the queue
    Processing message. Id: 6f405435200047069a3caf80893a80bc, Sequence #: 1. Contents: Hello, World!
    

以编程方式生成服务总线客户端

你可以自行生成这些客户端豆,但该过程很复杂。 在 Spring Boot 应用程序中,必须管理属性、了解生成器模式,并将客户端注册到 Spring 应用程序上下文。 下面的代码示例演示如何执行此操作:

  1. 创建一个新的 ServiceBusClientConfiguration Java 类,如以下示例所示。 此类用于声明 ServiceBusSenderClientServiceBusProcessorClient 豆类。

    @Configuration(proxyBeanMethods = false)
    public class ServiceBusClientConfiguration {
    
        private static final String SERVICE_BUS_FQDN = "<service-bus-fully-qualified-namespace>";
        private static final String QUEUE_NAME = "<service-bus-queue-name>";
    
        @Bean
        ServiceBusClientBuilder serviceBusClientBuilder() {
            return new ServiceBusClientBuilder()
                       .fullyQualifiedNamespace(SERVICE_BUS_FQDN)
                       .credential(new DefaultAzureCredentialBuilder().build());
        }
    
        @Bean
        ServiceBusSenderClient serviceBusSenderClient(ServiceBusClientBuilder builder) {
            return builder
                   .sender()
                   .queueName(QUEUE_NAME)
                   .buildClient();
        }
    
        @Bean
        ServiceBusProcessorClient serviceBusProcessorClient(ServiceBusClientBuilder builder) {
            return builder.processor()
                          .queueName(QUEUE_NAME)
                          .processMessage(ServiceBusClientConfiguration::processMessage)
                          .processError(ServiceBusClientConfiguration::processError)
                          .buildProcessorClient();
        }
    
        private static void processMessage(ServiceBusReceivedMessageContext context) {
            ServiceBusReceivedMessage message = context.getMessage();
            System.out.printf("Processing message. Id: %s, Sequence #: %s. Contents: %s%n",
                message.getMessageId(), message.getSequenceNumber(), message.getBody());
        }
    
        private static void processError(ServiceBusErrorContext context) {
            System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
                    context.getFullyQualifiedNamespace(), context.getEntityPath());
        }
    }
    

    注意

    请务必将<service-bus-fully-qualified-namespace>占位符替换为Azure 门户服务总线主机名。 将<service-bus-queue-name>占位符替换为在服务总线命名空间中配置自己的队列名称。

  2. 将客户端豆类注入应用程序,如以下示例所示:

    @SpringBootApplication
    public class ServiceBusQueueApplication implements CommandLineRunner {
    
        private final ServiceBusSenderClient senderClient;
    
        private final ServiceBusProcessorClient processorClient;
    
        public ServiceBusQueueApplication(ServiceBusSenderClient senderClient, ServiceBusProcessorClient processorClient) {
            this.senderClient = senderClient;
            this.processorClient = processorClient;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            // send one message to the queue
            senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
            System.out.printf("Sent a message to the queue");
            senderClient.close();
    
            System.out.printf("Starting the processor");
            processorClient.start();
            TimeUnit.SECONDS.sleep(10);
            System.out.printf("Stopping and closing the processor");
            processorClient.close();
        }
    
    }
    
  3. 启动应用程序。 将显示类似于以下示例的日志:

    Sent a message to the queue
    Starting the processor
    ...
    Processing message. Id: 6f405435200047069a3caf80893a80bc, Sequence #: 1. Contents: Hello, World!
    Stopping and closing the processor
    

以下列表显示了此代码不灵活或正常的原因:

  • 命名空间和队列/主题/订阅名称是硬编码的。
  • 如果用于@Value从 Spring 环境获取配置,则 application.properties 文件中不能有 IDE 提示
  • 如果有微服务方案,则必须复制每个项目中的代码,并且很容易出错且难以保持一致。

幸运的是,使用 Spring Cloud Azure 不需要自行构建客户端豆类。 相反,你可以直接注入豆类,并使用已熟悉的配置属性来配置服务总线。

Spring Cloud Azure 还为不同方案提供以下全局配置。 有关详细信息,请参阅 Spring Cloud Azure 配置的 Azure 服务 SDK全局配置部分。

  • 代理选项。
  • 重试选项。
  • AMQP 传输客户端选项。

还可以连接到不同的 Azure 云。 有关详细信息,请参阅连接到不同的 Azure 云

使用 Spring Cloud Azure 服务总线 JMS Starter

Spring Cloud Azure 服务总线 JMS Starter 模块提供 Spring JMS 与 服务总线 的集成。 以下视频介绍如何使用 JMS 2.0 将 Spring JMS 应用程序与 Azure 服务总线集成。


本指南介绍如何使用 Spring Cloud Azure 服务总线 Starter for JMS API 将消息发送到服务总线并从中接收消息。

添加服务总线依赖项

若要安装 Spring Cloud Azure 服务总线 JMS Starter 模块,请将以下依赖项添加到pom.xml文件:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.11.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.17.0。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。

  • Spring Cloud Azure 服务总线 JMS 项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

  1. 为服务总线配置连接字符串和定价层,如以下示例所示:

    spring.jms.servicebus.connection-string=<service-bus-namespace-connection-string>
    spring.jms.servicebus.pricing-tier=<service-bus-pricing-tier>
    
  2. 创建消息接收器。

    Spring 提供了将消息发布到任何 POJO(普通旧 Java 对象)的方法。 首先,定义一个用于存储和检索用户名的泛型 User 类,如以下示例所示:

    public class User implements Serializable {
    
        private static final long serialVersionUID = -295422703255886286L;
    
        private String name;
    
        public User() {
        }
    
        public User(String name) {
            setName(name);
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }
    

    提示

    实现 Serializable 是为了使用 Spring 框架的 JmsTemplate 中的 send 方法。 否则,应定义自定义 MessageConverter bean 以文本格式将内容序列化为 JSON。 有关 MessageConverter 的详细信息,请参阅官方的 Spring JMS Starter 项目

  3. 在此处,可以创建新的 QueueReceiveService Java 类,如以下示例所示。 此类用于定义消息接收器。

    @Component
    public class QueueReceiveService {
    
        private static final String QUEUE_NAME = "<service-bus-queue-name>";
    
        @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory")
        public void receiveMessage(User user) {
            System.out.printf("Received a message from %s.", user.getName());
        }
    }
    

    注意

    请务必将<service-bus-queue-name>占位符替换为在服务总线命名空间中配置的自己的队列名称。

    如果使用主题/订阅,请将 destination 参数更改为主题名称,应 containerFactorytopicJmsListenerContainerFactory。 此外,请添加参数 subscription 来描述订阅名称。

  4. 使用 Spring 连接发送方和接收方以发送和接收消息,如以下示例所示:

    @SpringBootApplication
    @EnableJms
    public class ServiceBusJmsStarterApplication {
    
        private static final String QUEUE_NAME = "<service-bus-queue-name>";
    
        public static void main(String[] args) {
            ConfigurableApplicationContext context = SpringApplication.run(ServiceBusJMSQueueApplication.class, args);
            JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
    
            // Send a message with a POJO - the template reuse the message converter
            System.out.println("Sending a user message.");
            jmsTemplate.convertAndSend(QUEUE_NAME, new User("Tom"));
        }
    }
    

    注意

    请务必将<service-bus-queue-name>占位符替换为在服务总线命名空间中配置的自己的队列名称。

    提示

    请务必添加 @EnableIntegration 批注,这会触发使用 @JmsListener注释注释的方法的发现,从而在封面下创建消息侦听器容器。

  5. 启动应用程序。 将显示类似于以下示例的日志:

    Sending a user message.
    Received a message from Tom.
    

其他信息

有关详细信息,请参阅如何将 JMS API 与 服务总线 和 AMQP 1.0 配合使用。

使用 Spring Messaging Azure 服务总线

Spring Messaging Azure 服务总线 模块支持具有 服务总线 的 Spring Messaging 框架。

如果使用 Spring Messaging Azure 服务总线,则可以使用以下功能:

  • ServiceBusTemplate:以异步和同步方式将消息发送到服务总线队列和主题。
  • @ServiceBusListener:将方法标记为目标上服务总线消息侦听器的目标。

本指南介绍如何使用 Spring Messaging Azure 服务总线向服务总线发送消息以及从服务总线接收消息。

添加服务总线依赖项

若要安装 Spring Messaging Azure 服务总线 模块,请将以下依赖项添加到pom.xml文件:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.11.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.17.0。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。

  • Spring Messaging 服务总线 和 Spring Cloud Azure 初学者项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-messaging-azure-servicebus</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

  1. 为服务总线配置命名空间和队列类型,如以下示例所示:

    spring.cloud.azure.servicebus.namespace=<service-bus-namespace-name>
    spring.cloud.azure.servicebus.entity-type=queue
    

    注意

    如果使用主题/订阅,请将 spring.cloud.azure.servicebus.entity-type 值更改为 topic.

  2. 创建一个新的 ConsumerService Java 类,如以下示例所示。 此类用于定义消息接收器。

    @Service
    public class ConsumerService {
    
        private static final String QUEUE_NAME = "<service-bus-queue-name>";
    
        @ServiceBusListener(destination = QUEUE_NAME)
        public void handleMessageFromServiceBus(String message) {
            System.out.printf("Consume message: %s%n", message);
        }
    
    }
    

    注意

    如果使用主题/订阅,请更改作为主题名称的批注参数 destination ,并添加 subscription 参数以描述订阅名称。

  3. 使用 Spring 连接发送方和接收方以发送和接收消息,如以下示例所示:

    @SpringBootApplication
    @EnableAzureMessaging
    public class Application {
    
        private static final String QUEUE_NAME = "<service-bus-queue-name>";
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class);
            ServiceBusTemplate serviceBusTemplate = applicationContext.getBean(ServiceBusTemplate.class);
            System.out.println("Sending a message to the queue.");
            serviceBusTemplate.sendAsync(QUEUE_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe();
        }
    }
    

    提示

    请务必添加 @EnableAzureMessaging 批注,这会触发使用 @ServiceBusListener注释注释的方法的发现,从而在封面下创建消息侦听器容器。

  4. 启动应用程序。 将显示类似于以下示例的日志:

    Sending a message to the queue.
    Consume message: Hello world.
    

使用 Spring Integration Azure 服务总线

Spring Integration Azure 服务总线 模块支持具有 服务总线 的 Spring Integration 框架。

如果 Spring 应用程序使用 Spring Integration 消息通道,则可以使用通道适配器在消息通道和服务总线之间路由消息。

入站通道适配器将消息从服务总线队列或订阅转发到消息通道。 出站通道适配器将消息从消息通道发布到服务总线队列和主题。

本指南介绍如何使用 Spring Integration Azure 服务总线 向服务总线发送消息并从服务总线接收消息。

添加服务总线依赖项

若要安装 Spring Cloud Azure 服务总线 Integration Starter 模块,请将以下依赖项添加到pom.xml文件:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.11.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.17.0。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。

  • Spring Cloud Azure 服务总线集成项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

  1. 配置服务总线的命名空间,如以下示例所示:

    spring.cloud.azure.servicebus.namespace=<your-servicebus-namespace-name>
    
  2. 创建一个新的 QueueReceiveConfiguration Java 类,如以下示例所示。 此类用于定义消息接收器。

    @Configuration
    public class QueueReceiveConfiguration {
    
        private static final String INPUT_CHANNEL = "queue.input";
        private static final String QUEUE_NAME = "<your-servicebus-queue-name>";
        private static final String SERVICE_BUS_MESSAGE_LISTENER_CONTAINER = "queue-listener-container";
    
        /**
         * This message receiver binding with {@link ServiceBusInboundChannelAdapter}
         * via {@link MessageChannel} has name {@value INPUT_CHANNEL}
         */
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload) {
            String message = new String(payload);
            System.out.printf("New message received: '%s'%n", message);
        }
    
        @Bean(SERVICE_BUS_MESSAGE_LISTENER_CONTAINER)
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            @Qualifier(SERVICE_BUS_MESSAGE_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean(name = INPUT_CHANNEL)
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 创建一个新的 QueueSendConfiguration Java 类,如以下示例所示。 此类用于定义消息发件人。

    @Configuration
    public class QueueSendConfiguration {
    
        private static final String OUTPUT_CHANNEL = "queue.output";
        private static final String QUEUE_NAME = "<your-servicebus-queue-name>";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    System.out.println("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    System.out.println("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    
        /**
         * Message gateway binding with {@link MessageHandler}
         * via {@link MessageChannel} has name {@value OUTPUT_CHANNEL}
         */
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  4. 使用 Spring 连接发送方和接收方以发送和接收消息,如以下示例所示:

    @SpringBootApplication
    @EnableIntegration
    @Configuration(proxyBeanMethods = false)
    public class ServiceBusIntegrationApplication {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(ServiceBusIntegrationApplication.class, args);
            QueueSendConfiguration.QueueOutboundGateway outboundGateway = applicationContext.getBean(QueueSendConfiguration.QueueOutboundGateway.class);
            System.out.println("Sending a message to the queue");
            outboundGateway.send("Hello World");
        }
    
    }
    

    提示

    请务必添加 @EnableIntegration 批注,以便启用 Spring Integration 基础结构。

  5. 启动应用程序。 将显示类似于以下示例的日志:

    Message was sent successfully.
    New message received: 'Hello World'
    

使用 Spring Cloud Stream 服务总线 Binder

若要在 Spring Cloud Stream 应用程序中调用服务总线 API,请使用 Spring Cloud Azure 服务总线 Stream Binder 模块。

本指南介绍如何使用 Spring Cloud Stream 服务总线 Binder 将消息发送到服务总线并从服务总线接收消息。

添加服务总线依赖项

若要安装 Spring Cloud Azure 服务总线 Stream Binder 模块,请将以下依赖项添加到pom.xml文件:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.11.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.17.0。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。

  • Spring Cloud Azure 服务总线集成项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

  1. 配置服务总线的命名空间,如以下示例所示:

    spring.cloud.azure.servicebus.namespace=<service-bus-namespace-name>
    
  2. 创建消息接收器。

    若要将应用程序用作事件接收器,请通过指定以下信息来配置输入绑定器:

    • 声明定义 Consumer 消息处理逻辑的 bean。 例如,以下 Consumer bean 命名 consume为:

      @Bean
      public Consumer<Message<String>> consume() {
          return message -> {
              System.out.printf("New message received: '%s'.%n", message.getPayload());
          };
      }
      
    • 添加配置以通过替换<service-bus-queue-name>占位符来指定queue使用的名称,如以下示例所示:

      # name for the `Consumer` bean
      spring.cloud.function.definition=consume
      spring.cloud.stream.bindings.consume-in-0.destination=<service-bus-queue-name>
      

      注意

      若要从服务总线订阅使用,请务必更改consume-in-0绑定属性,如以下示例所示:

      spring.cloud.stream.bindings.consume-in-0.destination=<service-bus-topic-name>
      spring.cloud.stream.bindings.consume-in-0.group=<service-bus-subscription-name>
      
  3. 创建邮件发件人。

    若要将应用程序用作事件源,请通过指定以下信息来配置输出绑定器:

    • 定义一个 Supplier bean,用于定义消息来自应用程序中的位置。

      @Bean
      return () -> {
              System.out.println("Sending a message.");
              return MessageBuilder.withPayload("Hello world").build();
          };
      }
      
    • 添加配置以 queue 指定发送的名称,方法是替换 <your-servicebus-queue-name> 以下示例中的占位符:

      # "consume" is added from the previous step
      spring.cloud.function.definition=consume;supply
      spring.cloud.stream.bindings.supply-out-0.destination=<your-servicebus-queue-name>
      spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
      

      注意

      若要发送到服务总线主题,请务必将其更改为 entity-typetopic

  4. 启动应用程序。 你会看到类似于以下示例的日志:

    Sending a message.
    New message received: 'Hello world'.
    

部署到 Azure Spring Apps

现在,你已在本地运行 Spring Boot 应用程序,现在可以将其移动到生产环境。 使用 Azure Spring Apps 可以轻松地将 Spring Boot 应用程序部署到 Azure,而无需进行任何代码更改。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps 可以通过以下方法提供生命周期管理:综合性监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等。 若要将应用程序部署到 Azure Spring Apps,请参阅将 第一个应用程序部署到 Azure Spring Apps

后续步骤

另请参阅

有关适用于 Microsoft Azure 的 Spring Boot 初学者的详细信息,请参阅 什么是 Spring Cloud Azure?