将 Spring Kafka 与适用于 Kafka 的 Azure 事件中心 API 配合使用

本教程介绍如何将基于 Java 的 Spring Cloud Stream Binder 配置为使用 Kafka 的 Azure 事件中心通过 Azure 事件中心 发送和接收消息。 有关详细信息,请参阅从 Apache Kafka 应用程序使用Azure 事件中心

在本教程中,我们将包括两种身份验证方法: Microsoft Entra 身份验证共享访问签名(SAS)身份验证。 无密码选项卡显示 Microsoft Entra 身份验证,连接字符串选项卡显示 SAS 身份验证。

Microsoft Entra 身份验证是一种使用 Microsoft Entra ID 中定义的标识连接到 Kafka Azure 事件中心的机制。 通过 Microsoft Entra 身份验证,可以在一个中心位置集中管理数据库用户标识和其他 Microsoft 服务,从而简化权限管理。

SAS 身份验证使用Azure 事件中心命名空间的连接字符串来委派对 Kafka 的事件中心的访问权限。 如果选择使用共享访问签名作为凭据,则需要自行管理连接字符串。

先决条件

重要

完成本教程中的步骤需要 Spring Boot 2.5 或更高版本。

准备凭据

Azure 事件中心支持使用 Microsoft Entra ID 对事件中心资源请求进行授权。 使用 Microsoft Entra ID,可以使用 Azure 基于角色的访问控制 (Azure RBAC) 向安全主体(可能是用户或应用程序服务主体)授予权限。

如果要使用 Microsoft Entra 身份验证在本地运行此示例,请确保用户帐户已通过用于 IntelliJ 的 Azure 工具包、Visual Studio Code Azure 帐户插件或 Azure CLI 进行身份验证。 此外,请确保帐户已授予足够的权限。

注意

使用无密码连接时,需要授予帐户对资源的访问权限。 在Azure 事件中心中,将Azure Event Hubs Data ReceiverAzure Event Hubs Data Sender角色分配给当前使用的 Microsoft Entra 帐户。 有关授予访问权限角色的详细信息,请参阅使用 microsoft Entra ID 使用 Azure 门户 分配 Azure 角色并授权访问事件中心资源。

发送和接收来自Azure 事件中心的消息

使用 Azure 事件中心,可以使用 Spring Cloud Azure 发送和接收消息。

若要安装 Spring Cloud Azure Starter 模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.10.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.16.0。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。

  • Spring Cloud Azure Starter 项目:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    

编写应用程序代码

使用以下步骤将应用程序配置为使用Azure 事件中心生成和使用消息。

  1. 通过将以下属性添加到 application.properties 文件来配置事件中心凭据。

    spring.cloud.stream.kafka.binder.brokers=${AZ_EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093
    spring.cloud.function.definition=consume;supply
    spring.cloud.stream.bindings.consume-in-0.destination=${AZ_EVENTHUB_NAME}
    spring.cloud.stream.bindings.consume-in-0.group=$Default
    spring.cloud.stream.bindings.supply-out-0.destination=${AZ_EVENTHUB_NAME}
    

    提示

    如果使用版本spring-cloud-azure-dependencies:4.3.0,则应使用值com.azure.spring.cloud.autoconfigure.kafka.AzureKafkaSpringCloudStreamConfiguration添加属性spring.cloud.stream.binders.<kafka-binder-name>.environment.spring.main.sources

    由于 4.4.0会自动添加此属性,因此无需手动添加此属性。

    下表描述了配置中的字段:

    字段 说明
    spring.cloud.stream.kafka.binder.brokers 指定Azure 事件中心终结点。
    spring.cloud.stream.bindings.consume-in-0.destination 指定输入目标事件中心,本教程是之前创建的中心。
    spring.cloud.stream.bindings.consume-in-0.group 指定Azure 事件中心中的使用者组,该组可以设置为$Default使用创建Azure 事件中心实例时创建的基本使用者组。
    spring.cloud.stream.bindings.supply-out-0.destination 指定输出目标事件中心,本教程的输出目标事件中心与输入目标相同。

    注意

    如果启用自动主题创建,请确保添加配置项目 spring.cloud.stream.kafka.binder.replicationFactor,并将值设置为至少 1。 有关详细信息,请参阅 Spring Cloud Stream Kafka Binder Reference Guide(Spring Cloud Stream Kafka Binder 参考指南)。

  2. 编辑启动类文件以显示以下内容。

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.GenericMessage;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    
    @SpringBootApplication
    public class EventHubKafkaBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaBinderApplication.class);
    
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubKafkaBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->LOGGER.info("New message received: '{}'", message.getPayload());
        }
    
        @Override
        public void run(String... args) {
            many.emitNext(new GenericMessage<>("Hello World"), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    提示

    在本教程中,配置或代码中没有身份验证操作。 但是,连接到 Azure 服务需要身份验证。 要完成身份验证,需要使用 Azure 标识。 Spring Cloud Azure 使用 DefaultAzureCredentialAzure 标识库提供的帮助获取凭据,而无需进行任何代码更改。

    DefaultAzureCredential 支持多种身份验证方法,并确定应在运行时使用哪种方法。 此方法使应用能够在不同环境(如本地环境和生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅 DefaultAzureCredential

    若要在本地开发环境中完成身份验证,可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 有关详细信息,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用用户分配的托管标识。 有关详细信息,请参阅什么是 Azure 资源的托管标识?

  3. 启动应用程序。 类似于以下示例的消息将发布到应用程序日志中:

    Kafka version: 3.0.1
    Kafka commitId: 62abe01bee039651
    Kafka startTimeMs: 1622616433956
    New message received: 'Hello World'
    

部署到 Azure Spring Apps

现在,你已在本地运行 Spring Boot 应用程序,现在可以将其移动到生产环境。 使用 Azure Spring Apps 可以轻松地将 Spring Boot 应用程序部署到 Azure,而无需进行任何代码更改。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps 可以通过以下方法提供生命周期管理:综合性监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等。 若要将应用程序部署到 Azure Spring Apps,请参阅将 第一个应用程序部署到 Azure Spring Apps

后续步骤