Spring Cloud Stream con bus di servizio di Azure

Questo articolo illustra come usare Spring Cloud Stream Binder per inviare e ricevere messaggi da queues e topics del bus di servizio.

Azure fornisce una piattaforma di messaggistica asincrona denominata bus di servizio di Azure ("bus di servizio") basata sullo standard Advanced Message Queueing Protocol 1.0 ("AMQP 1.0"). Il bus di servizio può essere usato nella gamma di piattaforme di Azure supportate.

Prerequisiti

  • Una sottoscrizione di Azure: creare un account gratuitamente.

  • Java Development Kit (JDK) versione 8 o successiva.

  • Apache Maven versione 3.2 o successiva.

  • cURL o un'utilità HTTP simile per testare la funzionalità.

  • Coda o argomento per bus di servizio di Azure. Se non è disponibile, creare una coda di bus di servizio o creare un argomento bus di servizio.

  • Applicazione Spring Boot. Se non è disponibile, creare un progetto Maven con Spring Initializr. Assicurarsi di selezionare Progetto Maven e, in Dipendenze, aggiungere le dipendenze Spring Web e Supporto di Azure e quindi selezionare Java versione 8 o successiva.

Nota

Per concedere all'account l'accesso alle risorse bus di servizio di Azure, assegnare il Azure Service Bus Data Sender ruolo e Azure Service Bus Data Receiver all'account Microsoft Entra attualmente in uso. Per altre informazioni sulla concessione dei ruoli di accesso, vedere Assegnare i ruoli di Azure usando il portale di Azure e Autenticare e autorizzare un'applicazione con l'ID Microsoft Entra per accedere alle entità bus di servizio di Azure.

Importante

Spring Boot versione 2.5 o successiva è necessario per completare i passaggi descritti in questo articolo.

Inviare e ricevere messaggi da bus di servizio di Azure

Con una coda o un argomento per bus di servizio di Azure, è possibile inviare e ricevere messaggi usando Spring Cloud Azure Stream Binder bus di servizio.

Per installare il modulo Spring Cloud Azure Stream Binder bus di servizio, aggiungere le dipendenze seguenti al file pom.xml:

  • Spring Cloud Azure Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Nota

    Se si usa Spring Boot 2.x, assicurarsi di impostare la spring-cloud-azure-dependencies versione su 4.17.0. Questa distinta base deve essere configurata nella <dependencyManagement> sezione del file di pom.xml . In questo modo tutte le dipendenze di Spring Cloud Azure usano la stessa versione. Per altre informazioni sulla versione usata per questa distinta base, vedere La versione di Spring Cloud azure da usare.

  • Spring Cloud Azure Stream Binder bus di servizio artefatto:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Codice dell'applicazione

Usare la procedura seguente per configurare l'applicazione in modo da usare una coda o un argomento bus di servizio per inviare e ricevere messaggi.

  1. Configurare le credenziali di bus di servizio nel file application.propertiesdi configurazione .

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    La tabella seguente descrive i campi nella configurazione:

    Campo Descrizione
    spring.cloud.azure.servicebus.namespace Specificare lo spazio dei nomi ottenuto nel bus di servizio dal portale di Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Specificare la coda o l'argomento del bus di servizio usato in questa esercitazione.
    spring.cloud.stream.bindings.supply-out-0.destination Specificare lo stesso valore usato per la destinazione input.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Specificare se risolvere automaticamente i messaggi. Se impostato su false, verrà aggiunta un'intestazione di messaggio di Checkpointer per consentire agli sviluppatori di risolvere manualmente i messaggi.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Specificare il tipo di entità per l'associazione di output, può essere queue o topic.
    spring.cloud.function.definition Specificare il bean funzionale da associare alle destinazioni esterne esposte dalle associazioni.
    spring.cloud.stream.poller.fixed-delay Specificare il ritardo fisso per il poller predefinito in millisecondi. Il valore predefinito è 1000 L. Il valore consigliato è 60000.
    spring.cloud.stream.poller.initial-delay Specificare il ritardo iniziale per i trigger periodici. Il valore predefinito è 0.
  2. Modificare il file della classe di avvio per visualizzare il contenuto seguente.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Suggerimento

    In questa esercitazione non sono presenti operazioni di autenticazione nelle configurazioni o nel codice. Tuttavia, la connessione ai servizi di Azure richiede l'autenticazione. Per completare l'autenticazione, è necessario usare Identità di Azure. Spring Cloud Azure usa DefaultAzureCredential, che la libreria di identità di Azure fornisce per ottenere le credenziali senza modifiche al codice.

    DefaultAzureCredential supporta più metodi di autenticazione e determina il metodo da usare in fase di esecuzione. Questo approccio consente all'app di usare metodi di autenticazione diversi in ambienti diversi (ad esempio ambienti locali e di produzione) senza implementare codice specifico dell'ambiente. Per altre informazioni, vedere DefaultAzureCredential.

    Per completare l'autenticazione negli ambienti di sviluppo locali, è possibile usare l'interfaccia della riga di comando di Azure, Visual Studio Code, PowerShell o altri metodi. Per altre informazioni, vedere Autenticazione di Azure in ambienti di sviluppo Java. Per completare l'autenticazione negli ambienti di hosting di Azure, è consigliabile usare l'identità gestita assegnata dall'utente. Per altre informazioni, vedere Informazioni sulle identità gestite per le risorse di Azure

  3. Avviare l’applicazione. I messaggi come l'esempio seguente verranno inseriti nel log applicazioni:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Passaggi successivi