Freigeben über


Spring Cloud Azure-Support für spring integration

Dieser Artikel bezieht sich auf: ✔️ Version 4.14.0 ✔️ Version 5.8.0

Spring Integration Extension for Azure stellt Spring Integration Adapter für die verschiedenen Dienste bereit, die vom Azure SDK für Java bereitgestellt werden. Wir bieten Unterstützung für Spring Integration für diese Azure-Dienste: Event Hubs, Service Bus, Storage Queue. Es folgt eine Liste der unterstützten Adapter:

Federintegration mit Azure Event Hubs

Wichtige Begriffe

Bei Azure Event Hubs handelt es sich um eine Big Data-Streamingplattform und einen Ereigniserfassungsdienst. Mit diesem Dienst können Millionen von Ereignissen pro Sekunde empfangen und verarbeitet werden. An einen Event Hub gesendete Daten können transformiert und mit einem beliebigen Echtzeitanalyse-Anbieter oder Batchverarbeitungs-/Speicheradapter gespeichert werden.

Spring Integration ermöglicht einfaches Messaging in Spring-basierten Anwendungen und unterstützt die Integration mit externen Systemen über deklarative Adapter. Diese Adapter bieten eine höhere Abstraktionsebene über die Unterstützung von Spring für Remoting, Messaging und Planung. Das Erweiterungsprojekt Spring Integration für Event Hubs bietet ein- und ausgehende Kanaladapter und Gateways für Azure Event Hubs.

Hinweis

RxJava-Support-APIs werden ab Version 4.0.0 gelöscht. Weitere Informationen finden Sie unter Javadoc.

Consumergruppe

Event Hubs bietet ähnliche Unterstützung von Consumergruppen wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Während Kafka alle zugesicherten Offsets im Broker speichert, müssen Sie Offsets von Event Hubs-Nachrichten speichern, die manuell verarbeitet werden. Event Hubs SDK stellt die Funktion zum Speichern solcher Offsets in Azure Storage bereit.

Unterstützung der Partitionierung

Event Hubs bietet ein ähnliches Konzept der physischen Partition wie Kafka. Im Gegensatz zu Kafkas automatischem Neuausgleich zwischen Verbrauchern und Partitionen bietet Event Hubs jedoch eine Art präemptiver Modus. Das Speicherkonto fungiert als Lease, um zu bestimmen, welche Partition im Besitz des Verbrauchers ist. Wenn ein neuer Verbraucher beginnt, versucht er, einige Partitionen von den meisten schwer geladenen Verbrauchern zu stehlen, um den Workloadausgleich zu erreichen.

Um die Lastenausgleichsstrategie anzugeben, können Entwickler für die Konfiguration verwenden EventHubsContainerProperties . Ein Beispiel für die Konfiguration finden Sie im folgenden Abschnitt.EventHubsContainerProperties

Batch-Consumerunterstützung

Dies EventHubsInboundChannelAdapter unterstützt den Batch-Nutzungsmodus. Um dies zu aktivieren, können Benutzer den Listenermodus so angeben, als ListenerMode.BATCH ob eine EventHubsInboundChannelAdapter Instanz erstellt wird. Wenn diese Option aktiviert ist, wird eine Nachricht , von der die Nutzlast eine Liste von Batchereignissen ist, empfangen und an den nachgeschalteten Kanal übergeben. Jeder Nachrichtenkopf wird auch als Liste konvertiert, von denen der Inhalt der zugeordnete Headerwert ist, der von jedem Ereignis analysiert wird. Für die kommunalen Kopfzeilen der Partitions-ID, des Prüfpunkts und der letzten Queued-Eigenschaften werden sie als einzelner Wert für den gesamten Batch von Ereignissen gemeinsam dargestellt. Weitere Informationen finden Sie im Abschnitt "Event Hubs Message Headers ".

Hinweis

Der Prüfpunktheader ist nur vorhanden, wenn der MANUAL-Prüfpunktmodus verwendet wird.

Die Prüfpunkterstellung von Batch-Consumer unterstützt zwei Modi: BATCH und MANUAL. BATCH der Modus ist ein modus für die automatische Prüfpunkterstellung, um den gesamten Batch von Ereignissen zusammen zu überwachen, sobald sie empfangen wurden. MANUAL der Modus besteht darin, die Ereignisse von Benutzern zu überprüfen. Wenn der Prüfpunkt verwendet wird, wird der Prüfpunkt in den Nachrichtenkopf übergeben, und Benutzer können ihn für die Prüfpunkterstellung verwenden.

Die Batchaufwendungsrichtlinie kann durch Eigenschaften und max-sizemax-wait-time, bei denen max-size es sich um eine erforderliche Eigenschaft handelt, angegeben werden, wenn max-wait-time sie optional ist. Um die Batchnutzungsstrategie anzugeben, können Entwickler für die Konfiguration verwenden EventHubsContainerProperties . Ein Beispiel für die Konfiguration finden Sie im folgenden Abschnitt.EventHubsContainerProperties

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfiguration

Dieser Start bietet die folgenden drei Konfigurationsoptionen:

Verbinden ion-Konfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Event Hubs verwendet werden.

Hinweis

Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-starter-integration-eventhubs:

Eigenschaft Typ Beschreibung
spring.cloud.azure.eventhubs.enabled boolean Gibt an, ob azure Event Hubs aktiviert ist.
spring.cloud.azure.eventhubs.connection-string String Event Hubs-Namespace Verbindungszeichenfolge Wert.
spring.cloud.azure.eventhubs.namespace String Event Hubs-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DoStandard Name bestehen.
spring.cloud.azure.eventhubs.do Standard-name String Do Standard Name eines Azure Event Hubs-Namespacewerts.
spring.cloud.azure.eventhubs.custom-endpoint-address String Benutzerdefinierte Endpunktadresse.
spring.cloud.azure.eventhubs.shared-connection Boolean Gibt an, ob der zugrunde liegende EventProcessorClient und EventHubProducerAsyncClient dieselbe Verbindung verwenden. Standardmäßig wird eine neue Verbindung für jeden erstellten Event Hub-Client erstellt und verwendet.

Prüfpunktkonfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen für den Speicher-Blobs-Dienst, der zum Beibehalten des Partitionsbesitzes und der Prüfpunktinformationen verwendet wird.

Hinweis

Ab Version 4.0.0 wird, wenn die Eigenschaft von spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nicht manuell aktiviert ist, kein Speichercontainer automatisch erstellt.

Prüfpunkt konfigurierbarer Eigenschaften von spring-cloud-azure-starter-integration-eventhubs:

Eigenschaft Typ Beschreibung
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean Gibt an, ob das Erstellen von Containern zulässig ist, falls nicht vorhanden.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Name für das Speicherkonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Zugriffsschlüssel für das Speicherkonto
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Name des Speichercontainers.

Allgemeine Azure Service SDK-Konfigurationsoptionen sind auch für den Speicherspeicher für Speicher-Blob-Prüfpunktspeicher konfigurierbar. Die unterstützten Konfigurationsoptionen werden in der Spring Cloud Azure-Konfiguration eingeführt und können entweder mit dem einheitlichen Präfix oder dem Präfix spring.cloud.azure. von spring.cloud.azure.eventhubs.processor.checkpoint-storekonfiguriert werden.

Konfigurationseigenschaften des Event Hub-Prozessors

Die EventHubsInboundChannelAdapter Verwendung der EventProcessorClient Nachrichten von einem Event Hub verwendet, um die Gesamteigenschaften eines EventProcessorClientEreignisses zu konfigurieren, können Entwickler für die Konfiguration verwenden EventHubsContainerProperties . Lesen Sie den folgenden Abschnitt zum Arbeiten mit EventHubsInboundChannelAdapter.

Grundlegende Verwendung

Senden von Nachrichten an Azure Event Hubs

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  1. Erstellen Sie DefaultMessageHandler mit der EventHubsTemplate Bohnen, um Nachrichten an Event Hubs zu senden.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Senden von Nachrichten mithilfe des Gateways.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Empfangen von Nachrichten von Azure Event Hubs

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

  2. Erstellen Sie einen Nachrichtenkanal als Eingabekanal.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Erstellen Sie EventHubsInboundChannelAdapter mit der EventHubsMessageListenerContainer Bohnen, um Nachrichten von Event Hubs zu empfangen.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Erstellen Sie eine Nachrichtenempfängerbindung mit EventHubsInboundChannelAdapter über den zuvor erstellten Nachrichtenkanal.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurieren von EventHubsMessageConverter zum Anpassen von objectMapper

EventHubsMessageConverter wird als konfigurierbare Bohnen erstellt, damit Benutzer ObjectMapper anpassen können.

Batch-Consumerunterstützung

Um Nachrichten von Event Hubs in Batches zu nutzen, ähnelt dem obigen Beispiel, neben den Benutzern sollten die batchaufwendigen zugehörigen Konfigurationsoptionen für EventHubsInboundChannelAdapter.

Beim Erstellen EventHubsInboundChannelAdaptersollte der Listenermodus als BATCHfestgelegt werden. Legen Sie beim Erstellen der Beannung des EventHubsMessageListenerContainerPrüfpunkts den Prüfpunktmodus als " MANUAL oder BATCH" fest, und die Batchoptionen können bei Bedarf konfiguriert werden.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs-Nachrichtenkopfzeilen

In der folgenden Tabelle wird veranschaulicht, wie Die Nachrichteneigenschaften von Event Hubs den Kopfzeilen von Spring-Nachrichten zugeordnet werden. Für Azure Event Hubs wird die Nachricht als event.

Zuordnung zwischen Event Hubs-Nachrichten-/Ereigniseigenschaften und Federnachrichtenkopfzeilen im Datensatzlistenermodus:

Event Hubs-Ereigniseigenschaften Federnachrichtenkopfkonstanten type Beschreibung
Enqueued time EventHubsHeaders#ENQUEUED_TIME Sofort Der Zeitpunkt, zu dem das Ereignis in UTC in der Event Hub-Partition enqueuiert wurde.
Abstand EventHubsHeaders#OFFSET Long Der Offset des Ereignisses, als es von der zugeordneten Event Hub-Partition empfangen wurde.
Partitionsschlüssel AzureHeaders#PARTITION_KEY String Der Partitionshashschlüssel, wenn er beim ursprünglichen Veröffentlichen des Ereignisses festgelegt wurde.
Partitions-ID AzureHeaders#RAW_PARTITION_ID String Die Partitions-ID des Event Hub.
Sequenznummer EventHubsHeaders#SEQUENCE_NUMBER Long Die Sequenznummer, die dem Ereignis zugewiesen wurde, als es in der zugeordneten Event Hub-Partition enqueuiert wurde.
Letzte Queued-Ereigniseigenschaften EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Die Eigenschaften des letzten enqueued-Ereignisses in dieser Partition.
Nicht verfügbar AzureHeaders#CHECKPOINTER Prüfpunkt Die Kopfzeile für den Prüfpunkt der jeweiligen Nachricht.

Benutzer können die Nachrichtenkopfzeilen für die zugehörigen Informationen jedes Ereignisses analysieren. Um einen Nachrichtenkopf für das Ereignis festzulegen, werden alle angepassten Kopfzeilen als Anwendungseigenschaft eines Ereignisses platziert, wobei der Header als Eigenschaftsschlüssel festgelegt wird. Wenn Ereignisse von Event Hubs empfangen werden, werden alle Anwendungseigenschaften in den Nachrichtenkopf konvertiert.

Hinweis

Nachrichtenkopfzeilen des Partitionsschlüssels, enqueuierte Zeit, Offset- und Sequenznummer werden nicht unterstützt, um manuell festgelegt zu werden.

Wenn der Batch-Consumer-Modus aktiviert ist, werden die spezifischen Kopfzeilen von Batchnachrichten wie folgt aufgelistet, die eine Liste der Werte aus jedem einzelnen Event Hubs-Ereignis enthalten.

Zuordnung zwischen Event Hubs Message/Event Properties und Spring Message Headers im Batchlistener-Modus:

Event Hubs-Ereigniseigenschaften Spring Batch-Nachrichtenkopfkonstanten type Beschreibung
Enqueued time EventHubsHeaders#ENQUEUED_TIME Liste der Sofortnachrichten Liste der sofortigen Ereignisse in UTC, wann jedes Ereignis in der Event Hub-Partition enqueuiert wurde.
Abstand EventHubsHeaders#OFFSET Liste der langen Liste des Offsets jedes Ereignisses, wenn es von der zugeordneten Event Hub-Partition empfangen wurde.
Partitionsschlüssel AzureHeaders#PARTITION_KEY Liste der Zeichenfolgen Liste des Partitionshashingschlüssels, wenn er beim ursprünglichen Veröffentlichen jedes Ereignisses festgelegt wurde.
Sequenznummer EventHubsHeaders#SEQUENCE_NUMBER Liste der langen Liste der Sequenznummer, die jedem Ereignis zugewiesen wurde, als es in der zugeordneten Event Hub-Partition abgefragt wurde.
Systemeigenschaften EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Liste der Karte Liste der Systemeigenschaften jedes Ereignisses.
Anwendungseigenschaften EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Liste der Karte Liste der Anwendungseigenschaften jedes Ereignisses, in denen alle angepassten Nachrichtenkopfzeilen oder Ereigniseigenschaften platziert werden.

Hinweis

Bei der Veröffentlichung von Nachrichten werden alle oben genannten Batchheader aus den Nachrichten entfernt, sofern vorhanden.

Beispiele

Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.

Spring-Integration in Azure Service Bus

Wichtige Begriffe

Spring Integration ermöglicht einfaches Messaging in Spring-basierten Anwendungen und unterstützt die Integration mit externen Systemen über deklarative Adapter.

Das Spring Integration for Azure Service Bus-Erweiterungsprojekt bietet ein- und ausgehende Kanaladapter für Azure Service Bus.

Hinweis

CompletableFuture-Support-APIs sind ab Version 2.10.0 veraltet und werden von Reaktorkern ab Version 4.0.0 ersetzt. Weitere Informationen finden Sie unter Javadoc.

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfiguration

Dieser Start bietet die folgenden 2 Teile der Konfigurationsoptionen:

Verbinden ion-Konfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Service Bus verwendet werden.

Hinweis

Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-starter-integration-servicebus:

Eigenschaft Typ Beschreibung
spring.cloud.azure.servicebus.enabled boolean Gibt an, ob ein Azure Service Bus aktiviert ist.
spring.cloud.azure.servicebus.connection-string String Service Bus Namespace Verbindungszeichenfolge Wert.
spring.cloud.azure.servicebus.namespace String Service Bus-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DoStandard Name bestehen.
spring.cloud.azure.servicebus.do Standard-name String Do Standard Name eines Azure Service Bus-Namespacewerts.

Konfigurationseigenschaften des Dienstbusprozessors

Die ServiceBusInboundChannelAdapter Verwendung der ServiceBusProcessorClient zu verwendenden Nachrichten, zum Konfigurieren der allgemeinen Eigenschaften eines ServiceBusProcessorClient, Entwickler können für die Konfiguration verwenden ServiceBusContainerProperties . Lesen Sie den folgenden Abschnitt zum Arbeiten mit ServiceBusInboundChannelAdapter.

Grundlegende Verwendung

Senden von Nachrichten an Azure Service Bus

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  1. Erstellen Sie DefaultMessageHandler mit dem ServiceBusTemplate Anker zum Senden von Nachrichten an Service Bus, legen Sie den Entitätstyp für die ServiceBusTemplate fest. In diesem Beispiel wird die Servicebuswarteschlange als Beispiel akzeptiert.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Senden von Nachrichten mithilfe des Gateways.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Empfangen von Nachrichten von Azure Service Bus

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

  2. Erstellen Sie einen Nachrichtenkanal als Eingabekanal.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Erstellen Sie ServiceBusInboundChannelAdapter mit der ServiceBusMessageListenerContainer Bohnen, um Nachrichten an Service Bus zu empfangen. In diesem Beispiel wird die Servicebuswarteschlange als Beispiel akzeptiert.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Erstellen Sie eine Nachrichtenempfängerbindung mit ServiceBusInboundChannelAdapter dem zuvor erstellten Nachrichtenkanal.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurieren von ServiceBusMessageConverter zum Anpassen von objectMapper

ServiceBusMessageConverter wird als konfigurierbare Bohnen erstellt, damit Benutzer anpassen ObjectMapperkönnen.

Dienstbus-Nachrichtenkopfzeilen

Bei einigen ServiceBus-Headern, die mehreren Springheaderkonstanten zugeordnet werden können, wird die Priorität verschiedener Federheader aufgelistet.

Zuordnung zwischen ServiceBus-Headern und Federkopfzeilen:

Service Bus-Nachrichtenkopfzeilen und -eigenschaften Federnachrichtenkopfkonstanten Typ Variantenkonfigurator verwenden Beschreibung
Inhaltstyp MessageHeaders#CONTENT_TYPE String Ja Der RFC2045 Inhaltstypdeskriptor der Nachricht.
Korrelations-ID ServiceBusMessageHeaders#CORRELATION_ID String Ja Die Korrelations-ID der Nachricht
Meldungs-ID ServiceBusMessageHeaders#MESSAGE_ID String Ja Die Nachrichten-ID der Nachricht, diese Kopfzeile hat eine höhere Priorität als MessageHeaders#ID.
Meldungs-ID MessageHeaders#ID UUID Ja Die Nachrichten-ID der Nachricht, diese Kopfzeile hat eine niedrigere Priorität als ServiceBusMessageHeaders#MESSAGE_ID.
Partitionsschlüssel ServiceBusMessageHeaders#PARTITION_KEY String Ja Der Partitionsschlüssel zum Senden der Nachricht an eine partitionierte Entität.
Antwort an MessageHeaders#REPLY_CHANNEL String Ja Die Adresse einer Entität, an die Antworten gesendet werden sollen.
Antworten auf sitzungs-ID ServiceBusMessageHeaders#REPLY_TO_SESSION_ID String Ja Der Wert der ReplyToGroupId-Eigenschaft der Nachricht.
Geplante Queue-Zeit utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Ja Der Zeitpunkt, zu dem die Nachricht in Service Bus queuiert werden soll, hat diese Kopfzeile eine höhere Priorität als AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Geplante Queue-Zeit utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Integer Ja Der Zeitpunkt, zu dem die Nachricht in Service Bus queuiert werden soll, hat diese Kopfzeile eine niedrigere Priorität als ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Sitzungs-ID ServiceBusMessageHeaders#SESSION_ID String Ja Der Sitzungs-IDentifier für eine sitzungsfähige Entität.
Gültigkeitsdauer ServiceBusMessageHeaders#TIME_TO_LIVE Duration Ja Die Dauer, bis diese Nachricht abläuft.
Beschreibung ServiceBusMessageHeaders#TO String Ja Die "to"-Adresse der Nachricht, die für die zukünftige Verwendung in Routingszenarien reserviert ist und derzeit vom Broker selbst ignoriert wird.
Subject ServiceBusMessageHeaders#SUBJECT String Ja Der Betreff für die Nachricht.
Beschreibung des Fehlers "Inaktiver Buchstabe" ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION String Nein Die Beschreibung für eine Nachricht, die inaktiv war.
Grund für inaktive Buchstaben ServiceBusMessageHeaders#DEAD_LETTER_REASON String Nein Der Grund, warum eine Nachricht in totschreibend war.
Quelle für inaktive Buchstaben ServiceBusMessageHeaders#DEAD_LETTER_SOURCE String Nein Die Entität, in der die Nachricht inaktiv war.
Lieferanzahl ServiceBusMessageHeaders#DELIVERY_COUNT lang Nein Die Häufigkeit, mit der diese Nachricht an Clients übermittelt wurde.
Nummer der Enqueued-Sequenz ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER lang Nein Die queuierte Sequenznummer, die einer Nachricht von Service Bus zugewiesen ist.
Enqueued time ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nein Das Datum, zu dem diese Nachricht in Service Bus enqueuiert wurde.
Ablaufzeit ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nein Das Datum, zu dem diese Nachricht abläuft.
Sperrtoken ServiceBusMessageHeaders#LOCK_TOKEN String Nein Das Sperrtoken für die aktuelle Nachricht.
Gesperrt bis ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nein Das Datum, zu dem die Sperre dieser Nachricht abläuft.
Sequenznummer ServiceBusMessageHeaders#SEQUENCE_NUMBER lang Nein Die eindeutige Nummer, die einer Nachricht von Service Bus zugewiesen ist.
Zustand ServiceBusMessageHeaders#STATE ServiceBusMessageState Nein Der Status der Nachricht, die aktiv, verzögert oder geplant sein kann.

Unterstützung des Partitionsschlüssels

Dieser Starter unterstützt die Dienstbuspartitionierung , indem die Einstellung des Partitionsschlüssels und der Sitzungs-ID im Nachrichtenkopf zugelassen wird. In diesem Abschnitt wird erläutert, wie Sie den Partitionsschlüssel für Nachrichten festlegen.

Empfohlen: Verwenden Sie ServiceBusMessageHeaders.PARTITION_KEY als Schlüssel der Kopfzeile.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nicht empfohlen, derzeit unterstützt:AzureHeaders.PARTITION_KEY als Schlüssel der Kopfzeile.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Hinweis

Wenn beide ServiceBusMessageHeaders.PARTITION_KEY und AzureHeaders.PARTITION_KEY in den Nachrichtenkopfzeilen festgelegt werden, ServiceBusMessageHeaders.PARTITION_KEY wird bevorzugt.

Sitzungsunterstützung

In diesem Beispiel wird veranschaulicht, wie Die Sitzungs-ID einer Nachricht in der Anwendung manuell festgelegt wird.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Hinweis

Wenn der ServiceBusMessageHeaders.SESSION_ID Wert in den Nachrichtenkopfzeilen festgelegt ist und auch eine andere ServiceBusMessageHeaders.PARTITION_KEY Kopfzeile festgelegt wird, wird der Wert der Sitzungs-ID schließlich verwendet, um den Wert des Partitionsschlüssels zu überschreiben.

Beispiele

Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.

Spring-Integration in Azure Storage-Warteschlangen

Wichtige Begriffe

Azure Queue Storage ist ein Dienst für die Speicherung großer Nachrichtenmengen. Sie können überall auf der Welt über authentifizierte Aufrufe mithilfe von HTTP oder HTTPS auf Nachrichten zugreifen. Eine Warteschlangennachricht kann bis zu 64 KB groß sein. Eine Warteschlange kann Millionen Nachrichten enthalten, bis die maximale Kapazität eines Speicherkontos erreicht ist. Warteschlangen werden häufig verwendet, um ein Arbeits-Backlog zur asynchronen Verarbeitung zu erstellen.

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfiguration

Dieser Start bietet die folgenden Konfigurationsoptionen:

Verbinden ion-Konfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit der Azure Storage-Warteschlange verwendet werden.

Hinweis

Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-starter-integration-storage-queue:

Eigenschaft Typ Beschreibung
spring.cloud.azure.storage.queue.enabled boolean Gibt an, ob eine Azure Storage-Warteschlange aktiviert ist.
spring.cloud.azure.storage.queue.connection-string String Speicherwarteschlangennamespace Verbindungszeichenfolge Wert.
spring.cloud.azure.storage.queue.accountName String Name des Speicherwarteschlangenkontos.
spring.cloud.azure.storage.queue.accountKey String Speicherwarteschlangenkontoschlüssel.
spring.cloud.azure.storage.queue.endpoint String Endpunkt des Speicherwarteschlangendiensts.
spring.cloud.azure.storage.queue.sasToken String Sas-Token-Anmeldeinformationen
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion, die beim Erstellen von API-Anforderungen verwendet wird.
spring.cloud.azure.storage.queue.messageEncoding String Codierung von Warteschlangennachrichten.

Grundlegende Verwendung

Senden von Nachrichten an die Azure Storage-Warteschlange

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  1. Erstellen Sie DefaultMessageHandler mit der Bohnen, um Nachrichten an die StorageQueueTemplate Speicherwarteschlange zu senden.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Senden von Nachrichten mithilfe des Gateways.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Empfangen von Nachrichten aus der Azure Storage-Warteschlange

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

  2. Erstellen Sie einen Nachrichtenkanal als Eingabekanal.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Erstellen Sie StorageQueueMessageSource mit der Bohnen, um Nachrichten an die StorageQueueTemplate Speicherwarteschlange zu empfangen.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Erstellen Sie eine Nachrichtenempfängerbindung mit StorageQueueMessageSource, die im letzten Schritt über den zuvor erstellten Nachrichtenkanal erstellt wurde.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Beispiele

Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.