Spring Cloud Stream with Azure Service Bus

この記事では、Spring Cloud Stream Binder を使用して Service Bus queues および topics との間でメッセージを送受信する方法について説明します。

Azure には、Azure Service Bus ("Service Bus") という、Advanced Message Queueing Protocol 1.0 ("AMQP 1.0") 標準に基づいた非同期のメッセージング プラットフォームが用意されています。 Service Bus は、サポートされている Azure プラットフォームの範囲全体で使用することができます。

前提条件

Note

アカウントに Azure Service Bus リソースへのアクセス権を付与するには、現在使用している Microsoft Entra アカウントに Azure Service Bus Data SenderAzure Service Bus Data Receiver のロールを割り当てます。 アクセス ロールの付与の詳細については、「Azure portal を使用して Azure ロールを割り当てる」および「Microsoft Entra ID を使用してアプリケーションを認証および承認して Azure Service Bus エンティティにアクセスする」を参照してください。

重要

この記事の手順を完了するには、Spring Boot 2.5 以降のバージョンが必要です。

Azure Service Bus からメッセージを送受信する

Azure Service Bus のキューまたはトピックを使用すると、Spring Cloud Azure Stream Binder Service Bus を使用してメッセージを送受信できます。

Spring Cloud Azure Stream Binder Service Bus モジュールをインストールするには、次の依存関係を pom.xml ファイルに追加します。

  • Spring Cloud Azure 部品表 (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Note

    Spring Boot 2.xを使用している場合は、spring-cloud-azure-dependenciesバージョンを4.17.0に設定してください。 この部品表(BOM)は、<dependencyManagement>pom.xmlファイルのセクションで設定する必要があります。 これにより、すべてのSpring Cloud Azure依存関係が同じバージョンを使用していることが保証されます。 このBOMに使用されるバージョンの詳細については、「Spring Cloud Azureのどのバージョンを使うべきか」を参照してください。

  • Spring Cloud Azure Stream Binder Service Bus アーティファクト:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

アプリケーションをコーディングする

次の手順を実行して、Service Bus キューまたはトピックを使用してメッセージを送受信するようにアプリケーションを構成します。

  1. 構成ファイル application.propertiesで Service Bus の資格情報を構成します。

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    次の表では、構成のフィールドについて説明します。

    フィールド 説明
    spring.cloud.azure.servicebus.namespace Azure portal の自分の Service Bus で取得した名前空間を指定します。
    spring.cloud.stream.bindings.consume-in-0.destination このチュートリアルで自分が使用した Service Bus キューまたは Service Bus トピックを指定します。
    spring.cloud.stream.bindings.supply-out-0.destination 入力先に使用したものと同じ値を指定します。
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete メッセージを自動的に取得するかどうかを指定します。 false に設定すると、開発者がメッセージを手動で決済できるように、メッセージ ヘッダー Checkpointer が追加されます。
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type 出力バインドのエンティティの種類を指定します。queue または topic に指定できます。
    spring.cloud.function.definition バインドによって公開されている外部送信先にバインドする、機能 Bean を指定します。
    spring.cloud.stream.poller.fixed-delay デフォルトのポーラーの固定遅延をミリ秒単位で指定します。 既定値は 1000 Lで、推奨値は 60000 です。
    spring.cloud.stream.poller.initial-delay 定期的なトリガーの初期遅延を指定します。 既定値は0です。
  2. スタートアップ クラス ファイルを編集して、次の内容を表示します。

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    ヒント

    このチュートリアルでは、構成またはコードに認証操作はありません。 ただし、Azure サービスに接続するには認証が必要です。 認証を完了するには、Azure ID を使用する必要があります。 Spring Cloud Azure では、DefaultAzureCredential を使用します。これは、コードを変更せずに資格情報を取得できるようにするために、Azure ID ライブラリで提供されます。

    DefaultAzureCredential は複数の認証方法をサポートしており、実行時に使用する方法が決定されます。 このアプローチを採用すると、環境固有のコードを実装することなく、異なる環境 (ローカルと運用環境など) で異なる認証方法をアプリに使用できます。 詳細については、DefaultAzureCredential を参照してください。

    ローカル開発環境で認証を完了するには、Azure CLI、Visual Studio Code、PowerShell、またはその他の方法を使用できます。 詳細については、「Java 開発環境での Azure 認証」を参照してください。 Azure ホスティング環境で認証を完了するには、ユーザー割り当てマネージド ID を使用することをお勧めします。 詳細については、「Azure リソースのマネージド ID とは」を参照してください。

  3. アプリケーションを起動します。 次の例のようなメッセージは、アプリケーション ログに投稿されます。

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

次のステップ