Spring Cloud Stream com Hubs de Eventos do Azure

Este tutorial demonstra como enviar e receber mensagens usando Hubs de Eventos do Azure e Eventhubs do Spring Cloud Stream Binder em um aplicativo Spring Boot.

Pré-requisitos

Observação

Para conceder à sua conta acesso a recursos, nos Hubs de Eventos do Azure, atribua a Azure Event Hubs Data Receiver função e Azure Event Hubs Data Sender à conta do Microsoft Entra que você está usando no momento. Em seguida, na conta de Armazenamento do Azure, atribua a Storage Blob Data Contributor função à conta do Microsoft Entra que você está usando no momento. Para obter mais informações sobre como conceder funções de acesso, consulte Atribuir funções do Azure usando o portal do Azure e Autorizar acesso a recursos dos Hubs de Eventos usando a ID do Microsoft Entra.

Importante

O Spring Boot versão 2.5 ou superior é necessário para concluir as etapas deste tutorial.

Enviar e receber mensagens dos Hubs de Eventos do Azure

Com uma Conta de Armazenamento do Azure e um hub de Eventos do Azure, você pode enviar e receber mensagens usando os Hubs de Eventos do Spring Cloud Azure Stream Binder.

Para instalar o módulo Hubs de Eventos do Spring Cloud Azure Stream Binder, adicione as seguintes dependências ao arquivo pom.xml :

  • A lista de materiais do Spring Cloud Azure (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Observação

    Se você estiver usando o Spring Boot 2.x, certifique-se de definir a spring-cloud-azure-dependencies versão como 4.17.0. Esta lista de materiais (BOM) deve ser configurada <dependencyManagement> na seção do arquivo pom.xml . Isso garante que todas as dependências do Spring Cloud Azure estejam usando a mesma versão. Para obter mais informações sobre a versão usada para esta BOM, consulte Qual versão do Spring Cloud Azure devo usar.

  • O artefato dos Hubs de Eventos do Spring Cloud Azure Stream Binder:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Codificar o aplicativo

Use as etapas a seguir para configurar seu aplicativo para produzir e consumir mensagens usando os Hubs de Eventos do Azure.

  1. Configure as credenciais do hub de eventos adicionando as seguintes propriedades ao arquivo application.properties .

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    A tabela a seguir descreve os campos na configuração:

    Campo Descrição
    spring.cloud.azure.eventhubs.namespace Especifique o namespace obtido no hub de eventos do portal do Azure.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Especifique a conta de armazenamento criada neste tutorial.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Especifique o contêiner de sua conta de armazenamento.
    spring.cloud.stream.bindings.consume-in-0.destination Especifique o hub de eventos usado neste tutorial.
    spring.cloud.stream.bindings.consume-in-0.group Especifique os grupos de consumidores em sua instância dos Hubs de Eventos.
    spring.cloud.stream.bindings.supply-out-0.destination Especifique o mesmo hub de eventos usado neste tutorial.
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode Especifique MANUAL.
    spring.cloud.function.definition Especifique qual bean funcional deve ser associado aos destinos externos expostos pelas associações.
    spring.cloud.stream.poller.initial-delay Especifique o atraso inicial para gatilhos periódicos. O valor padrão é 0.
    spring.cloud.stream.poller.fixed-delay Especifique o atraso fixo para o poller padrão em milissegundos. O valor padrão é 1000 L.
  2. Edite o arquivo de classe de inicialização para mostrar o conteúdo a seguir.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Dica

    Neste tutorial, não há operações de autenticação nas configurações ou no código. No entanto, a conexão com os serviços do Azure requer autenticação. Para concluir a autenticação, você precisa usar a Identidade do Azure. O Spring Cloud Azure usa DefaultAzureCredentialo , que a biblioteca de Identidade do Azure fornece para ajudá-lo a obter credenciais sem alterações de código.

    DefaultAzureCredential dá suporte a vários métodos de autenticação e determina qual método usar no runtime. Essa abordagem permite que seu aplicativo use métodos de autenticação diferentes em ambientes diferentes (como ambientes locais e de produção) sem implementar código específico do ambiente. Para obter mais informações, consulte DefaultAzureCredential.

    Para concluir a autenticação em ambientes de desenvolvimento local, você pode usar a CLI do Azure, o Visual Studio Code, o PowerShell ou outros métodos. Para obter mais informações, consulte Autenticação do Azure em ambientes de desenvolvimento Java. Para concluir a autenticação em ambientes de hospedagem do Azure, recomendamos usar a identidade gerenciada atribuída pelo usuário. Para obter mais informações, confira O que são as identidades gerenciadas para recursos do Azure?

  3. Inicie o aplicativo. Mensagens como essa serão postadas no log do aplicativo, conforme mostrado na saída de exemplo a seguir:

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Implantar no Azure Spring Apps

Agora que você tem o aplicativo Spring Boot em execução localmente, é hora de movê-lo para a produção. Os Aplicativos Spring do Azure facilitam a implantação de aplicativos Spring Boot no Azure sem alterações de código. O serviço gerencia a infraestrutura dos aplicativos do Spring para que os desenvolvedores possam se concentrar no código. O Azure Spring Apps fornece gerenciamento de ciclo de vida usando monitoramento e diagnóstico abrangentes, gerenciamento de configuração, descoberta de serviços, integração de CI/CD, implantações em “blue-green” e muito mais. Para implantar seu aplicativo nos Aplicativos Spring do Azure, consulte Implantar seu primeiro aplicativo nos Aplicativos Spring do Azure.

Próximas etapas