如何使用适用于 Azure 服务总线 JMS 的 Spring Boot Starter

本文介绍如何使用适用于 Azure 服务总线 JMS 的 Spring Boot Starter 从服务总线 queuestopics 收发消息。

Azure 提供了一个异步消息平台,称为 Azure 服务总线(“服务总线”),该平台基于高级消息队列协议 1.0(“AMQP 1.0”)标准。 服务总线可用于各种受支持的 Azure 平台。

适用于 Azure 服务总线 JMS 的 Spring Boot Starter 可用于将 Spring 与服务总线相集成。

以下视频介绍如何使用 JMS 2.0 将 Spring 应用程序与 Azure 服务总线集成。


必备条件

在本文中,需要满足以下先决条件:

  1. Azure 订阅;如果没有 Azure 订阅,可激活 MSDN 订阅者权益或注册免费帐户

  2. 支持的 Java 开发工具包 (JDK) 8 或更高版本。 有关在 Azure 上进行开发时可供使用的 JDK 的详细信息,请参阅 Azure 和 Azure Stack 上的 Java 支持

  3. Apache Maven 3.2 或更高版本。

  4. 如果已有一个已配置的服务总线队列或服务总线主题,请确保服务总线命名空间满足以下要求:

    1. 允许来自所有网络的访问
    2. 是高级版(或更高版本)
    3. 具有一个有关队列和主题的读/写访问权限的访问策略
  5. 如果没有已配置的服务总线队列或服务总线主题,请使用 Azure 门户创建服务总线队列创建服务总线主题。 确保该命名空间满足上一步中指定的要求。 另外,请记下该命名空间中的连接字符串,因为本教程的测试应用需要用到它。

  6. 如果没有 Spring Boot 应用程序,请使用 Spring Initializr 创建一个 Maven项目。 请记得选择“Maven 项目” ,然后在“依赖项” 下方,添加“Web” 依赖项。

使用 Azure 服务总线 JMS Starter

  1. 在应用的父目录中找到 pom.xml 文件,例如:

    C:\SpringBoot\servicebus\pom.xml

    /users/example/home/servicebus/pom.xml

  2. 在文本编辑器中打开 pom.xml 文件 。

  3. 将 Spring Boot Azure 服务总线 JMS Starter 添加到 <dependencies> 的列表:

    <dependency>
        <groupId>com.azure.spring</groupId>
        <artifactId>azure-spring-boot-starter-servicebus-jms</artifactId>
        <version>3.10.0</version>
    </dependency>
    
  4. 保存并关闭 pom.xml 文件。

针对服务总线配置应用

在本部分中,了解如何将应用配置为使用服务总线队列或服务总线主题。

使用服务总线队列

  1. 在应用的 resources 目录中找到 application.properties,例如:

    C:\SpringBoot\servicebus\application.properties

    /users/example/home/servicebus/application.properties

  2. 在文本编辑器中打开 application.properties 文件 。

  3. 将以下代码追加到 application.properties 文件的末尾。 将占位符值替换为服务总线的适当值,并且不要在值周围加引号。

    spring.jms.servicebus.connection-string=<ServiceBusNamespaceConnectionString>
    spring.jms.servicebus.idle-timeout=<IdleTimeout>
    spring.jms.servicebus.pricing-tier=<ServiceBusPricingTier> 
    

    字段说明

    字段 说明
    spring.jms.servicebus.connection-string 指定从 Azure 门户内你的服务总线命名空间中获取的连接字符串。
    spring.jms.servicebus.idle-timeout 指定空闲超时时间(以毫秒为单位)。 在本教程中,建议的值为 1800000。
    spring.jms.servicebus.pricing-tier 指定服务总线的定价层。 支持的值为:“高级”、“标准”和“基本”。 高级层使用 Java Message Service (JMS) 2.0,而标准层和基本层使用 JMS 1.0 与 Azure 服务总线交互。
  4. 保存并关闭 application.properties 文件 。

使用服务总线主题

  1. 在应用的 resources 目录中找到 application.properties,例如:

    C:\SpringBoot\servicebus\application.properties

    /users/example/home/servicebus/application.properties

  2. 在文本编辑器中打开 application.properties 文件 。

  3. 将以下代码追加到 application.properties 文件的末尾。 将占位符值替换为服务总线的适当值,并且不要在值周围加引号。

    spring.jms.servicebus.connection-string=<ServiceBusNamespaceConnectionString>
    spring.jms.servicebus.topic-client-id=<ServiceBusSubscriptionID>
    spring.jms.servicebus.idle-timeout=<IdleTimeout>
    spring.jms.servicebus.pricing-tier=<ServiceBusPricingTier> 
    

    字段说明

    字段 说明
    spring.jms.servicebus.connection-string 指定从 Azure 门户内你的服务总线命名空间中获取的连接字符串。
    spring.jms.servicebus.topic-client-id 指定 JMS 客户端 ID,它是 Azure 门户中的服务总线订阅 ID。
    spring.jms.servicebus.idle-timeout 指定空闲超时时间(以毫秒为单位)。 在本教程中,建议的值为 1800000。
    spring.jms.servicebus.pricing-tier 指定服务总线的定价层。 支持的值为:“高级”、“标准”和“基本”。 高级层使用 Java Message Service (JMS) 2.0,而标准层和基本层使用 JMS 1.0 与 Azure 服务总线交互。
  4. 保存并关闭 application.properties 文件。

实现基本的服务总线功能

在本部分中,创建所需的 Java 类,用于将消息发送到服务总线队列或服务总线主题,并从相应的队列或主题订阅接收消息。

修改主应用程序类

  1. 在应用的程序包目录中找到主应用程序 Java 文件,例如:

    C:\SpringBoot\servicebus\src\main\java\com\wingtiptoys\servicebus\ServiceBusJmsStarterApplication.java

    /users/example/home/servicebus/src/main/java/com/wingtiptoys/servicebus/ServiceBusJmsStarterApplication.java

  2. 在文本编辑器中打开主应用程序 Java 文件。

  3. 将以下代码添加到该文件:

     package com.wingtiptoys.servicebus;
    
     import org.springframework.boot.SpringApplication;
     import org.springframework.boot.autoconfigure.SpringBootApplication;
    
     @SpringBootApplication
     public class ServiceBusJmsStarterApplication {
    
         public static void main(String[] args) {
             SpringApplication.run(ServiceBusJmsStarterApplication.class, args);
         }
     }
    
  4. 保存并关闭该文件。

定义测试 Java 类

  1. 使用文本编辑器,在应用的包目录中创建名为 User.java 的 Java 文件。

  2. 定义一个泛型用户类,以存储和检索用户名:

    package com.wingtiptoys.servicebus;
    
    import java.io.Serializable;
    
    // Define a generic User class.
    public class User implements Serializable {
    
        private static final long serialVersionUID = -295422703255886286L;
    
        private String name;
    
        public User() {
        }
    
        public User(String name) {
            setName(name);
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
    }
    

    实现 Serializable 是为了使用 Spring 框架的 JmsTemplate 中的 send 方法。 否则,应定义自定义的 MessageConverter bean,以将内容序列化为文本格式的 json。 有关 MessageConverter 的详细信息,请参阅官方的 Spring JMS Starter 项目

  3. 保存并关闭 User.java 文件 。

为消息发送控制器创建新类

  1. 使用文本编辑器,在应用的包目录中创建名为 SendController.java 的 Java 文件

  2. 将以下代码添加到新文件:

    package com.wingtiptoys.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class SendController {
    
        private static final String DESTINATION_NAME = "<DestinationName>";
    
        private static final Logger logger = LoggerFactory.getLogger(SendController.class);
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @PostMapping("/messages")
        public String postMessage(@RequestParam String message) {
            logger.info("Sending message");
            jmsTemplate.convertAndSend(DESTINATION_NAME, new User(message));
            return message;
        }
    }
    

    备注

    <DestinationName> 替换为在服务总线命名空间中配置的你自己的队列名称或主题名称。

  3. 保存并关闭 SendController.java 。

为消息接收控制器创建类

从服务总线队列接收消息

  1. 使用文本编辑器,在应用的包目录中创建名为 QueueReceiveController.java 的 Java 文件

  2. 将以下代码添加到新文件:

    package com.wingtiptoys.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class QueueReceiveController {
    
        private static final String QUEUE_NAME = "<ServiceBusQueueName>";
    
        private final Logger logger = LoggerFactory.getLogger(QueueReceiveController.class);
    
        @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory")
        public void receiveMessage(User user) {
            logger.info("Received message: {}", user.getName());
        }
    }
    

    备注

    <ServiceBusQueueName> 替换为在服务总线命名空间中配置的你自己的队列名称。

  3. 保存并关闭 QueueReceiveController.java 文件。

从服务总线订阅接收消息

  1. 使用文本编辑器,在应用的包目录中创建名为 TopicReceiveController.java 的 Java 文件。

  2. 将以下代码添加到新文件。 将 <ServiceBusTopicName> 占位符替换为在服务总线命名空间中配置的你自己的主题名称。 将 <ServiceBusSubscriptionName> 占位符替换为服务总线主题中的你自己的订阅名称。

    package com.wingtiptoys.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicReceiveController {
    
        private static final String TOPIC_NAME = "<ServiceBusTopicName>";
    
        private static final String SUBSCRIPTION_NAME = "<ServiceBusSubscriptionName>";
    
        private final Logger logger = LoggerFactory.getLogger(TopicReceiveController.class);
    
        @JmsListener(destination = TOPIC_NAME, containerFactory = "topicJmsListenerContainerFactory",
                subscription = SUBSCRIPTION_NAME)
        public void receiveMessage(User user) {
            logger.info("Received message: {}", user.getName());
        }
    }
    
  3. 保存并关闭 TopicReceiveController.java 文件。

可选的服务总线功能

可以使用自定义的 MessageConverter bean 在 Java 对象与 JMS 消息之间进行转换。

设置消息的内容类型

下面的代码示例将 BytesMessage 内容类型设置为 application/json。 有关详细信息,请参阅消息、有效负载和序列化

package com.wingtiptoys.servicebus;

import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.qpid.jms.message.JmsBytesMessage;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.apache.qpid.proton.amqp.Symbol;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.stereotype.Component;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Session;
import java.io.IOException;

@Component
public class CustomMessageConverter extends MappingJackson2MessageConverter {

    private static final String TYPE_ID_PROPERTY = "_type";
    private static final Symbol CONTENT_TYPE = Symbol.valueOf("application/json");

    public CustomMessageConverter() {
        this.setTargetType(MessageType.BYTES);
        this.setTypeIdPropertyName(TYPE_ID_PROPERTY);
    }

    @Override
    protected BytesMessage mapToBytesMessage(Object object, Session session, ObjectWriter objectWriter)
        throws JMSException, IOException {
        final BytesMessage bytesMessage = super.mapToBytesMessage(object, session, objectWriter);
        JmsBytesMessage jmsBytesMessage = (JmsBytesMessage) bytesMessage;
        AmqpJmsMessageFacade facade = (AmqpJmsMessageFacade) jmsBytesMessage.getFacade();
        facade.setContentType(CONTENT_TYPE);
        return jmsBytesMessage;
    }
}

有关 MessageConverter 的详细信息,请参阅官方的 Spring JMS 指南

生成和测试应用程序

  1. 打开命令提示符,然后将目录更改为 pom.xml 的位置;例如:

    cd C:\SpringBoot\servicebus 
    

    -或-

    cd /users/example/home/servicebus 
    
  2. 使用 Maven 构建 Spring Boot 应用程序,然后运行该应用程序:

    mvn clean spring-boot:run
    
  3. 在应用程序运行后,你可以使用 curl 对其进行测试:

    curl -X POST localhost:8080/messages?message=hello
    

    此时会显示“发送消息”和“hello”已发布到应用程序日志:

    [nio-8080-exec-1] com.wingtiptoys.servicebus.SendController : Sending message
    [enerContainer-1] com.wingtiptoys.servicebus.ReceiveController : Received message: hello
    

清理资源

如果不再需要,请使用 Azure 门户删除本文中创建的资源,以避免产生意外的费用。

后续步骤