具有 Azure 服务总线 的 Spring Cloud Stream
本文介绍如何使用 Spring Cloud Stream Binder 通过服务总线 queues
和 topics
收发消息。
Azure 提供了一个异步消息平台,称为 Azure 服务总线(“服务总线”),该平台基于高级消息队列协议 1.0(“AMQP 1.0”)标准。 服务总线可用于各种受支持的 Azure 平台。
先决条件
Azure 订阅 - 免费创建订阅。
Java 开发工具包 (JDK) 版本 8 或更高版本。
Apache Maven 版本 3.2 或更高版本。
用来测试功能的 cURL 或类似的 HTTP 实用工具。
Spring Boot 应用程序。 如果没有,请使用 Spring Initializr 创建一个 Maven 项目。 请务必选择 Maven 项目,并在“依赖项”下添加 Spring Web 和 Azure 支持依赖项,然后选择 Java 版本 8 或更高版本。
注意
若要授予帐户对Azure 服务总线资源的访问权限,请将该Azure Service Bus Data Sender
帐户和Azure Service Bus Data Receiver
角色分配给当前使用的 Microsoft Entra 帐户。 有关授予访问权限角色的详细信息,请参阅使用 Azure 门户 分配 Azure 角色,并使用 Microsoft Entra ID 授权应用程序访问Azure 服务总线实体。
重要
完成本文中的步骤需要 Spring Boot 2.5 或更高版本。
从Azure 服务总线发送和接收消息
对于Azure 服务总线的队列或主题,可以使用 Spring Cloud Azure Stream Binder 服务总线发送和接收消息。
若要安装 Spring Cloud Azure Stream Binder 服务总线 模块,请将以下依赖项添加到pom.xml文件:
Spring Cloud Azure 材料清单(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.11.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.17.0
。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>
。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure Stream Binder 服务总线项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
编写应用程序代码
使用以下步骤将应用程序配置为使用服务总线队列或主题发送和接收消息。
在配置文件
application.properties
中配置服务总线凭据。spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
下表描述了配置中的字段:
字段 说明 spring.cloud.azure.servicebus.namespace
指定从Azure 门户服务总线中获取的命名空间。 spring.cloud.stream.bindings.consume-in-0.destination
指定在本教程中使用的服务总线队列或服务总线主题。 spring.cloud.stream.bindings.supply-out-0.destination
指定用于输入目标的相同值。 spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
指定是否自动解决消息。 如果设置为 false,则会添加消息标头 Checkpointer
,使开发人员能够手动解决消息。spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
指定输出绑定的实体类型,可以是 queue
或topic
。spring.cloud.function.definition
指定要将哪个功能 bean 绑定到由绑定公开的外部目标。 spring.cloud.stream.poller.fixed-delay
为默认轮询程序指定固定延迟(以毫秒为单位)。 默认值为 1000 L。建议的值为 60000。 spring.cloud.stream.poller.initial-delay
指定定期触发器的初始延迟。 默认值为 0。 编辑启动类文件以显示以下内容。
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
提示
在本教程中,配置或代码中没有身份验证操作。 但是,连接到 Azure 服务需要身份验证。 要完成身份验证,需要使用 Azure 标识。 Spring Cloud Azure 使用
DefaultAzureCredential
Azure 标识库提供的帮助获取凭据,而无需进行任何代码更改。DefaultAzureCredential
支持多种身份验证方法,并确定应在运行时使用哪种方法。 此方法使应用能够在不同环境(如本地环境和生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅 DefaultAzureCredential。若要在本地开发环境中完成身份验证,可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 有关详细信息,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用用户分配的托管标识。 有关详细信息,请参阅什么是 Azure 资源的托管标识?
启动应用程序。 类似于以下示例的消息将发布到应用程序日志中:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed
后续步骤
反馈
https://aka.ms/ContentUserFeedback。
即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅:提交和查看相关反馈