在 Spring 中使用Azure 事件网格

本文介绍如何使用Azure 事件网格将事件发送到主题,并使用 服务总线 队列作为事件处理程序在 Spring Boot 应用程序中接收。

Azure 事件网格服务是一种高度可缩放的完全托管的 Pub Sub 消息分发服务,它使用 MQTT 和 HTTP 协议提供灵活的消息使用模式。

先决条件

  • Azure 订阅 - 免费创建订阅

  • Java 开发工具包 (JDK) 版本 8 或更高版本。

  • Apache Maven 版本 3.0 或更高版本。

  • 事件网格主题实例。 如果没有自定义主题,请参阅Azure 事件网格中的“创建自定义主题”或“域”。

  • 服务总线队列实例。 如果没有队列,请参阅Azure 门户中创建队列。

  • Spring Boot 应用程序。 如果没有,请使用 Spring Initializr 创建一个 Maven 项目。 请务必选择 Maven 项目 ,然后选择 Java 版本 8 或更高版本。

订阅自定义主题

使用以下步骤创建事件订阅,以告知事件网格将事件发送到服务总线队列:

  1. 在Azure 门户中,导航到事件网格主题实例。
  2. 选择 工具栏上的事件订阅
  3. 在“ 创建事件订阅”页上,输入 事件订阅的名称 值。
  4. 对于终结点类型,请选择服务总线队列
  5. 选择一个终结点,然后选择前面创建的服务总线队列实例。

通过Azure 事件网格发送事件,并通过Azure 服务总线队列接收事件

使用Azure 事件网格资源,可以使用 Spring Cloud Azure 事件网格发送事件。 使用Azure 服务总线队列资源作为事件处理程序,可以使用 Spring Cloud Azure Stream Binder 接收事件,以便服务总线。

若要安装 Spring Cloud Azure 事件网格 Starter 模块和 Spring Cloud Azure Stream Binder 服务总线 模块,请将以下依赖项添加到pom.xml文件中:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.17.0。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。

  • Spring Cloud Azure 事件网格 Starter 项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-eventgrid</artifactId>
    </dependency>
    
  • Spring Cloud Azure Stream Binder 服务总线项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

编写应用程序代码

使用以下步骤将应用程序配置为使用事件网格发送事件,并使用服务总线队列接收事件。

  1. application.yaml 配置文件中配置Azure 事件网格和服务总线凭据,如以下示例所示:

    spring:
      cloud:
        azure:
          eventgrid:
            endpoint: ${AZURE_EVENTGRID_ENDPOINT}
            key: ${AZURE_EVENTGRID_KEY}
          servicebus:
            connection-string: ${AZURE_SERVICEBUS_CONNECTION_STRING}
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_SERVICEBUS_QUEUE_NAME}
          servicebus:
            bindings:
              consume-in-0:
                consumer:
                  auto-complete: false
    
  2. 编辑启动类文件以显示以下内容。 此代码生成完成。

    import com.azure.core.util.BinaryData;
    import com.azure.messaging.eventgrid.EventGridEvent;
    import com.azure.messaging.eventgrid.EventGridPublisherClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    
    import java.util.List;
    import java.util.function.Consumer;
    
    @SpringBootApplication
    public class EventGridSampleApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventGridSampleApplication.class);
    
        @Autowired
        EventGridPublisherClient<EventGridEvent> client;
    
        public static void main(String[] args) {
            SpringApplication.run(EventGridSampleApplication.class, args);
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message -> {
                List<EventGridEvent> eventData = EventGridEvent.fromString(message.getPayload());
                eventData.forEach(event -> {
                    LOGGER.info("New event received: '{}'", event.getData());
                });
            };
        }
    
        @Override
        public void run(String... args) throws Exception {
            String str = "FirstName: John, LastName: James";
            EventGridEvent event = new EventGridEvent("A user is created", "User.Created.Text", BinaryData.fromObject(str), "0.1");
    
            client.sendEvent(event);
            LOGGER.info("New event published: '{}'", event.getData());
        }
    }
    
    
  3. 启动应用程序。 启动后,应用程序生成类似于以下示例的日志:

    New event published: '"FirstName: John, LastName: James"'
    ...
    New event received: '"FirstName: John, LastName: James"'
    

部署到 Azure Spring Apps

现在,你已在本地运行 Spring Boot 应用程序,现在可以将其移动到生产环境。 使用 Azure Spring Apps 可以轻松地将 Spring Boot 应用程序部署到 Azure,而无需进行任何代码更改。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps 可以通过以下方法提供生命周期管理:综合性监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等。 若要将应用程序部署到 Azure Spring Apps,请参阅将 第一个应用程序部署到 Azure Spring Apps

后续步骤

若要了解有关 Spring 和 Azure 的详细信息,请继续访问“Azure 上的 Spring”文档中心。