Spring Cloud Stream se službou Azure Service Bus

V tomto článku se dozvíte, jak pomocí Spring Cloud Stream Binderu odesílat zprávy do front (queues) a témat (topics) služby Service Bus a přijímat je z ní.

Azure poskytuje platformu pro asynchronní zasílání zpráv s názvem Azure Service Bus („služba Service Bus“), která je založena na standardu Advanced Message Queueing Protocol 1.0 („protokol AMQP 1.0“). Službu Service Bus je možné používat na mnoha podporovaných platformách Azure.

Požadavky

Poznámka:

Pokud chcete účtu udělit přístup k prostředkům služby Azure Service Bus, přiřaďte Azure Service Bus Data SenderAzure Service Bus Data Receiver tuto roli k účtu Microsoft Entra, který aktuálně používáte. Další informace o udělení přístupových rolí najdete v tématu Přiřazení rolí Azure pomocí webu Azure Portal a ověřování a autorizace aplikace s ID Microsoft Entra pro přístup k entitě Azure Service Bus.

Důležité

K dokončení kroků v tomto článku se vyžaduje Spring Boot verze 2.5 nebo vyšší.

Odesílání a příjem zpráv ze služby Azure Service Bus

Pomocí fronty nebo tématu služby Azure Service Bus můžete odesílat a přijímat zprávy pomocí služby Spring Cloud Azure Stream Binder Service Bus.

Pokud chcete nainstalovat modul Spring Cloud Azure Stream Binder Service Bus, přidejte do souboru pom.xml následující závislosti:

  • Kusovník materiálů (BOM) Spring Cloud v Azure:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Poznámka:

    Pokud používáte Spring Boot 2.x, nezapomeňte nastavit spring-cloud-azure-dependencies verzi na 4.17.0. Tato faktura materiálu (BOM) by měla být nakonfigurována v <dependencyManagement> části vašeho pom.xml souboru. Tím se zajistí, že všechny závislosti Azure Spring Cloudu budou používat stejnou verzi. Další informace o verzi použité pro tuto kusovníku najdete v tématu Jakou verzi Spring Cloud Azure mám použít.

  • Artefakt Spring Cloud Azure Stream Binder Service Bus:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Vytvoření kódu aplikace

Pomocí následujících kroků nakonfigurujte aplikaci tak, aby k odesílání a příjmu zpráv používala frontu nebo téma služby Service Bus.

  1. Nakonfigurujte přihlašovací údaje služby Service Bus v konfiguračním souboru application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    Následující tabulka popisuje pole v konfiguraci:

    Pole Popis
    spring.cloud.azure.servicebus.namespace Zadejte obor názvů, který jste získali ve službě Service Bus z webu Azure Portal.
    spring.cloud.stream.bindings.consume-in-0.destination Zadejte frontu nebo téma služby Service Bus, které jste použili v tomto kurzu.
    spring.cloud.stream.bindings.supply-out-0.destination Zadejte stejnou hodnotu jako u cíle vstupu.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Určete, zda se mají zprávy automaticky urovnat. Pokud je nastavená hodnota false, přidá se hlavička Checkpointer zprávy, která vývojářům umožní ručně urovnat zprávy.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Zadejte typ entity pro výstupní vazbu, může být queue nebo topic.
    spring.cloud.function.definition Určete, která funkční bean se má svázat s externími cíli vystavenými vazbami.
    spring.cloud.stream.poller.fixed-delay Zadejte pevné zpoždění pro výchozí poller v milisekundách. Výchozí hodnota je 1000 L. Doporučená hodnota je 60000.
    spring.cloud.stream.poller.initial-delay Zadejte počáteční zpoždění pro pravidelné aktivační události. Výchozí hodnota je 0.
  2. Upravte soubor spouštěcí třídy, aby se zobrazil následující obsah.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Tip

    V tomto kurzu nejsou v konfiguracích ani kódu žádné ověřovací operace. Připojení ke službám Azure ale vyžaduje ověření. K dokončení ověřování je potřeba použít identitu Azure. Spring Cloud Azure používá DefaultAzureCredential, kterou poskytuje knihovna identit Azure, která vám pomůže získat přihlašovací údaje bez jakýchkoli změn kódu.

    DefaultAzureCredential podporuje více metod ověřování a určuje, kterou metodu použít za běhu. Tento přístup umožňuje vaší aplikaci používat různé metody ověřování v různých prostředích (například v místních a produkčních prostředích) bez implementace kódu specifického pro prostředí. Další informace naleznete v tématu DefaultAzureCredential.

    K dokončení ověřování v místních vývojových prostředích můžete použít Azure CLI, Visual Studio Code, PowerShell nebo jiné metody. Další informace najdete v tématu Ověřování Azure ve vývojových prostředích Java. K dokončení ověřování v hostitelských prostředích Azure doporučujeme použít spravovanou identitu přiřazenou uživatelem. Další informace najdete v tématu Co jsou spravované identity pro prostředky Azure?

  3. Spusťte aplikaci. Zprávy jako v následujícím příkladu se publikuje v protokolu vaší aplikace:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Další kroky