Spring Cloud Azure-Support für spring integration
Dieser Artikel bezieht sich auf: ✔️ Version 4.14.0 ✔️ Version 5.8.0
Spring Integration Extension for Azure stellt Spring Integration Adapter für die verschiedenen Dienste bereit, die vom Azure SDK für Java bereitgestellt werden. Wir bieten Unterstützung für Spring Integration für diese Azure-Dienste: Event Hubs, Service Bus, Storage Queue. Es folgt eine Liste der unterstützten Adapter:
spring-cloud-azure-starter-integration-eventhubs
– weitere Informationen finden Sie unter Spring Integration mit Azure Event Hubsspring-cloud-azure-starter-integration-servicebus
– weitere Informationen finden Sie unter Spring Integration in Azure Service Busspring-cloud-azure-starter-integration-storage-queue
– weitere Informationen finden Sie unter Spring Integration in Azure Storage Queue
Federintegration mit Azure Event Hubs
Wichtige Begriffe
Bei Azure Event Hubs handelt es sich um eine Big Data-Streamingplattform und einen Ereigniserfassungsdienst. Mit diesem Dienst können Millionen von Ereignissen pro Sekunde empfangen und verarbeitet werden. An einen Event Hub gesendete Daten können transformiert und mit einem beliebigen Echtzeitanalyse-Anbieter oder Batchverarbeitungs-/Speicheradapter gespeichert werden.
Spring Integration ermöglicht einfaches Messaging in Spring-basierten Anwendungen und unterstützt die Integration mit externen Systemen über deklarative Adapter. Diese Adapter bieten eine höhere Abstraktionsebene über die Unterstützung von Spring für Remoting, Messaging und Planung. Das Erweiterungsprojekt Spring Integration für Event Hubs bietet ein- und ausgehende Kanaladapter und Gateways für Azure Event Hubs.
Hinweis
RxJava-Support-APIs werden ab Version 4.0.0 gelöscht. Weitere Informationen finden Sie unter Javadoc.
Consumergruppe
Event Hubs bietet ähnliche Unterstützung von Consumergruppen wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Während Kafka alle zugesicherten Offsets im Broker speichert, müssen Sie Offsets von Event Hubs-Nachrichten speichern, die manuell verarbeitet werden. Event Hubs SDK stellt die Funktion zum Speichern solcher Offsets in Azure Storage bereit.
Unterstützung der Partitionierung
Event Hubs bietet ein ähnliches Konzept der physischen Partition wie Kafka. Im Gegensatz zu Kafkas automatischem Neuausgleich zwischen Verbrauchern und Partitionen bietet Event Hubs jedoch eine Art präemptiver Modus. Das Speicherkonto fungiert als Lease, um zu bestimmen, welche Partition im Besitz des Verbrauchers ist. Wenn ein neuer Verbraucher beginnt, versucht er, einige Partitionen von den meisten schwer geladenen Verbrauchern zu stehlen, um den Workloadausgleich zu erreichen.
Um die Lastenausgleichsstrategie anzugeben, können Entwickler für die Konfiguration verwenden EventHubsContainerProperties
. Ein Beispiel für die Konfiguration finden Sie im folgenden Abschnitt.EventHubsContainerProperties
Batch-Consumerunterstützung
Dies EventHubsInboundChannelAdapter
unterstützt den Batch-Nutzungsmodus. Um dies zu aktivieren, können Benutzer den Listenermodus so angeben, als ListenerMode.BATCH
ob eine EventHubsInboundChannelAdapter
Instanz erstellt wird.
Wenn diese Option aktiviert ist, wird eine Nachricht , von der die Nutzlast eine Liste von Batchereignissen ist, empfangen und an den nachgeschalteten Kanal übergeben. Jeder Nachrichtenkopf wird auch als Liste konvertiert, von denen der Inhalt der zugeordnete Headerwert ist, der von jedem Ereignis analysiert wird. Für die kommunalen Kopfzeilen der Partitions-ID, des Prüfpunkts und der letzten Queued-Eigenschaften werden sie als einzelner Wert für den gesamten Batch von Ereignissen gemeinsam dargestellt. Weitere Informationen finden Sie im Abschnitt "Event Hubs Message Headers ".
Hinweis
Der Prüfpunktheader ist nur vorhanden, wenn der MANUAL-Prüfpunktmodus verwendet wird.
Die Prüfpunkterstellung von Batch-Consumer unterstützt zwei Modi: BATCH
und MANUAL
. BATCH
der Modus ist ein modus für die automatische Prüfpunkterstellung, um den gesamten Batch von Ereignissen zusammen zu überwachen, sobald sie empfangen wurden. MANUAL
der Modus besteht darin, die Ereignisse von Benutzern zu überprüfen. Wenn der Prüfpunkt verwendet wird, wird der Prüfpunkt in den Nachrichtenkopf übergeben, und Benutzer können ihn für die Prüfpunkterstellung verwenden.
Die Batchaufwendungsrichtlinie kann durch Eigenschaften und max-size
max-wait-time
, bei denen max-size
es sich um eine erforderliche Eigenschaft handelt, angegeben werden, wenn max-wait-time
sie optional ist.
Um die Batchnutzungsstrategie anzugeben, können Entwickler für die Konfiguration verwenden EventHubsContainerProperties
. Ein Beispiel für die Konfiguration finden Sie im folgenden Abschnitt.EventHubsContainerProperties
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfiguration
Dieser Start bietet die folgenden drei Konfigurationsoptionen:
Verbinden ion-Konfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Event Hubs verwendet werden.
Hinweis
Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-starter-integration-eventhubs:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Gibt an, ob azure Event Hubs aktiviert ist. |
spring.cloud.azure.eventhubs.connection-string | String | Event Hubs-Namespace Verbindungszeichenfolge Wert. |
spring.cloud.azure.eventhubs.namespace | String | Event Hubs-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DoStandard Name bestehen. |
spring.cloud.azure.eventhubs.do Standard-name | String | Do Standard Name eines Azure Event Hubs-Namespacewerts. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Benutzerdefinierte Endpunktadresse. |
spring.cloud.azure.eventhubs.shared-connection | Boolean | Gibt an, ob der zugrunde liegende EventProcessorClient und EventHubProducerAsyncClient dieselbe Verbindung verwenden. Standardmäßig wird eine neue Verbindung für jeden erstellten Event Hub-Client erstellt und verwendet. |
Prüfpunktkonfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen für den Speicher-Blobs-Dienst, der zum Beibehalten des Partitionsbesitzes und der Prüfpunktinformationen verwendet wird.
Hinweis
Ab Version 4.0.0 wird, wenn die Eigenschaft von spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nicht manuell aktiviert ist, kein Speichercontainer automatisch erstellt.
Prüfpunkt konfigurierbarer Eigenschaften von spring-cloud-azure-starter-integration-eventhubs:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean | Gibt an, ob das Erstellen von Containern zulässig ist, falls nicht vorhanden. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Name für das Speicherkonto. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | Zugriffsschlüssel für das Speicherkonto |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | Name des Speichercontainers. |
Allgemeine Azure Service SDK-Konfigurationsoptionen sind auch für den Speicherspeicher für Speicher-Blob-Prüfpunktspeicher konfigurierbar. Die unterstützten Konfigurationsoptionen werden in der Spring Cloud Azure-Konfiguration eingeführt und können entweder mit dem einheitlichen Präfix oder dem Präfix spring.cloud.azure.
von spring.cloud.azure.eventhubs.processor.checkpoint-store
konfiguriert werden.
Konfigurationseigenschaften des Event Hub-Prozessors
Die EventHubsInboundChannelAdapter
Verwendung der EventProcessorClient
Nachrichten von einem Event Hub verwendet, um die Gesamteigenschaften eines EventProcessorClient
Ereignisses zu konfigurieren, können Entwickler für die Konfiguration verwenden EventHubsContainerProperties
. Lesen Sie den folgenden Abschnitt zum Arbeiten mit EventHubsInboundChannelAdapter
.
Grundlegende Verwendung
Senden von Nachrichten an Azure Event Hubs
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Erstellen Sie
DefaultMessageHandler
mit derEventHubsTemplate
Bohnen, um Nachrichten an Event Hubs zu senden.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Senden von Nachrichten mithilfe des Gateways.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Empfangen von Nachrichten von Azure Event Hubs
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Erstellen Sie einen Nachrichtenkanal als Eingabekanal.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Erstellen Sie
EventHubsInboundChannelAdapter
mit derEventHubsMessageListenerContainer
Bohnen, um Nachrichten von Event Hubs zu empfangen.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Erstellen Sie eine Nachrichtenempfängerbindung mit EventHubsInboundChannelAdapter über den zuvor erstellten Nachrichtenkanal.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurieren von EventHubsMessageConverter zum Anpassen von objectMapper
EventHubsMessageConverter
wird als konfigurierbare Bohnen erstellt, damit Benutzer ObjectMapper anpassen können.
Batch-Consumerunterstützung
Um Nachrichten von Event Hubs in Batches zu nutzen, ähnelt dem obigen Beispiel, neben den Benutzern sollten die batchaufwendigen zugehörigen Konfigurationsoptionen für EventHubsInboundChannelAdapter
.
Beim Erstellen EventHubsInboundChannelAdapter
sollte der Listenermodus als BATCH
festgelegt werden. Legen Sie beim Erstellen der Beannung des EventHubsMessageListenerContainer
Prüfpunkts den Prüfpunktmodus als " MANUAL
oder BATCH
" fest, und die Batchoptionen können bei Bedarf konfiguriert werden.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Event Hubs-Nachrichtenkopfzeilen
In der folgenden Tabelle wird veranschaulicht, wie Die Nachrichteneigenschaften von Event Hubs den Kopfzeilen von Spring-Nachrichten zugeordnet werden. Für Azure Event Hubs wird die Nachricht als event
.
Zuordnung zwischen Event Hubs-Nachrichten-/Ereigniseigenschaften und Federnachrichtenkopfzeilen im Datensatzlistenermodus:
Event Hubs-Ereigniseigenschaften | Federnachrichtenkopfkonstanten | type | Beschreibung |
---|---|---|---|
Enqueued time | EventHubsHeaders#ENQUEUED_TIME | Sofort | Der Zeitpunkt, zu dem das Ereignis in UTC in der Event Hub-Partition enqueuiert wurde. |
Abstand | EventHubsHeaders#OFFSET | Long | Der Offset des Ereignisses, als es von der zugeordneten Event Hub-Partition empfangen wurde. |
Partitionsschlüssel | AzureHeaders#PARTITION_KEY | String | Der Partitionshashschlüssel, wenn er beim ursprünglichen Veröffentlichen des Ereignisses festgelegt wurde. |
Partitions-ID | AzureHeaders#RAW_PARTITION_ID | String | Die Partitions-ID des Event Hub. |
Sequenznummer | EventHubsHeaders#SEQUENCE_NUMBER | Long | Die Sequenznummer, die dem Ereignis zugewiesen wurde, als es in der zugeordneten Event Hub-Partition enqueuiert wurde. |
Letzte Queued-Ereigniseigenschaften | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Die Eigenschaften des letzten enqueued-Ereignisses in dieser Partition. |
Nicht verfügbar | AzureHeaders#CHECKPOINTER | Prüfpunkt | Die Kopfzeile für den Prüfpunkt der jeweiligen Nachricht. |
Benutzer können die Nachrichtenkopfzeilen für die zugehörigen Informationen jedes Ereignisses analysieren. Um einen Nachrichtenkopf für das Ereignis festzulegen, werden alle angepassten Kopfzeilen als Anwendungseigenschaft eines Ereignisses platziert, wobei der Header als Eigenschaftsschlüssel festgelegt wird. Wenn Ereignisse von Event Hubs empfangen werden, werden alle Anwendungseigenschaften in den Nachrichtenkopf konvertiert.
Hinweis
Nachrichtenkopfzeilen des Partitionsschlüssels, enqueuierte Zeit, Offset- und Sequenznummer werden nicht unterstützt, um manuell festgelegt zu werden.
Wenn der Batch-Consumer-Modus aktiviert ist, werden die spezifischen Kopfzeilen von Batchnachrichten wie folgt aufgelistet, die eine Liste der Werte aus jedem einzelnen Event Hubs-Ereignis enthalten.
Zuordnung zwischen Event Hubs Message/Event Properties und Spring Message Headers im Batchlistener-Modus:
Event Hubs-Ereigniseigenschaften | Spring Batch-Nachrichtenkopfkonstanten | type | Beschreibung |
---|---|---|---|
Enqueued time | EventHubsHeaders#ENQUEUED_TIME | Liste der Sofortnachrichten | Liste der sofortigen Ereignisse in UTC, wann jedes Ereignis in der Event Hub-Partition enqueuiert wurde. |
Abstand | EventHubsHeaders#OFFSET | Liste der langen | Liste des Offsets jedes Ereignisses, wenn es von der zugeordneten Event Hub-Partition empfangen wurde. |
Partitionsschlüssel | AzureHeaders#PARTITION_KEY | Liste der Zeichenfolgen | Liste des Partitionshashingschlüssels, wenn er beim ursprünglichen Veröffentlichen jedes Ereignisses festgelegt wurde. |
Sequenznummer | EventHubsHeaders#SEQUENCE_NUMBER | Liste der langen | Liste der Sequenznummer, die jedem Ereignis zugewiesen wurde, als es in der zugeordneten Event Hub-Partition abgefragt wurde. |
Systemeigenschaften | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Liste der Karte | Liste der Systemeigenschaften jedes Ereignisses. |
Anwendungseigenschaften | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Liste der Karte | Liste der Anwendungseigenschaften jedes Ereignisses, in denen alle angepassten Nachrichtenkopfzeilen oder Ereigniseigenschaften platziert werden. |
Hinweis
Bei der Veröffentlichung von Nachrichten werden alle oben genannten Batchheader aus den Nachrichten entfernt, sofern vorhanden.
Beispiele
Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.
Spring-Integration in Azure Service Bus
Wichtige Begriffe
Spring Integration ermöglicht einfaches Messaging in Spring-basierten Anwendungen und unterstützt die Integration mit externen Systemen über deklarative Adapter.
Das Spring Integration for Azure Service Bus-Erweiterungsprojekt bietet ein- und ausgehende Kanaladapter für Azure Service Bus.
Hinweis
CompletableFuture-Support-APIs sind ab Version 2.10.0 veraltet und werden von Reaktorkern ab Version 4.0.0 ersetzt. Weitere Informationen finden Sie unter Javadoc.
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfiguration
Dieser Start bietet die folgenden 2 Teile der Konfigurationsoptionen:
Verbinden ion-Konfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Service Bus verwendet werden.
Hinweis
Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-starter-integration-servicebus:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Gibt an, ob ein Azure Service Bus aktiviert ist. |
spring.cloud.azure.servicebus.connection-string | String | Service Bus Namespace Verbindungszeichenfolge Wert. |
spring.cloud.azure.servicebus.namespace | String | Service Bus-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DoStandard Name bestehen. |
spring.cloud.azure.servicebus.do Standard-name | String | Do Standard Name eines Azure Service Bus-Namespacewerts. |
Konfigurationseigenschaften des Dienstbusprozessors
Die ServiceBusInboundChannelAdapter
Verwendung der ServiceBusProcessorClient
zu verwendenden Nachrichten, zum Konfigurieren der allgemeinen Eigenschaften eines ServiceBusProcessorClient
, Entwickler können für die Konfiguration verwenden ServiceBusContainerProperties
. Lesen Sie den folgenden Abschnitt zum Arbeiten mit ServiceBusInboundChannelAdapter
.
Grundlegende Verwendung
Senden von Nachrichten an Azure Service Bus
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Erstellen Sie
DefaultMessageHandler
mit demServiceBusTemplate
Anker zum Senden von Nachrichten an Service Bus, legen Sie den Entitätstyp für die ServiceBusTemplate fest. In diesem Beispiel wird die Servicebuswarteschlange als Beispiel akzeptiert.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Senden von Nachrichten mithilfe des Gateways.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Empfangen von Nachrichten von Azure Service Bus
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Erstellen Sie einen Nachrichtenkanal als Eingabekanal.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Erstellen Sie
ServiceBusInboundChannelAdapter
mit derServiceBusMessageListenerContainer
Bohnen, um Nachrichten an Service Bus zu empfangen. In diesem Beispiel wird die Servicebuswarteschlange als Beispiel akzeptiert.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Erstellen Sie eine Nachrichtenempfängerbindung mit
ServiceBusInboundChannelAdapter
dem zuvor erstellten Nachrichtenkanal.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurieren von ServiceBusMessageConverter zum Anpassen von objectMapper
ServiceBusMessageConverter
wird als konfigurierbare Bohnen erstellt, damit Benutzer anpassen ObjectMapper
können.
Dienstbus-Nachrichtenkopfzeilen
Bei einigen ServiceBus-Headern, die mehreren Springheaderkonstanten zugeordnet werden können, wird die Priorität verschiedener Federheader aufgelistet.
Zuordnung zwischen ServiceBus-Headern und Federkopfzeilen:
Service Bus-Nachrichtenkopfzeilen und -eigenschaften | Federnachrichtenkopfkonstanten | Typ | Variantenkonfigurator verwenden | Beschreibung |
---|---|---|---|---|
Inhaltstyp | MessageHeaders#CONTENT_TYPE |
String | Ja | Der RFC2045 Inhaltstypdeskriptor der Nachricht. |
Korrelations-ID | ServiceBusMessageHeaders#CORRELATION_ID |
String | Ja | Die Korrelations-ID der Nachricht |
Meldungs-ID | ServiceBusMessageHeaders#MESSAGE_ID |
String | Ja | Die Nachrichten-ID der Nachricht, diese Kopfzeile hat eine höhere Priorität als MessageHeaders#ID . |
Meldungs-ID | MessageHeaders#ID |
UUID | Ja | Die Nachrichten-ID der Nachricht, diese Kopfzeile hat eine niedrigere Priorität als ServiceBusMessageHeaders#MESSAGE_ID . |
Partitionsschlüssel | ServiceBusMessageHeaders#PARTITION_KEY |
String | Ja | Der Partitionsschlüssel zum Senden der Nachricht an eine partitionierte Entität. |
Antwort an | MessageHeaders#REPLY_CHANNEL |
String | Ja | Die Adresse einer Entität, an die Antworten gesendet werden sollen. |
Antworten auf sitzungs-ID | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
String | Ja | Der Wert der ReplyToGroupId-Eigenschaft der Nachricht. |
Geplante Queue-Zeit utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Ja | Der Zeitpunkt, zu dem die Nachricht in Service Bus queuiert werden soll, hat diese Kopfzeile eine höhere Priorität als AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Geplante Queue-Zeit utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Integer | Ja | Der Zeitpunkt, zu dem die Nachricht in Service Bus queuiert werden soll, hat diese Kopfzeile eine niedrigere Priorität als ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Sitzungs-ID | ServiceBusMessageHeaders#SESSION_ID |
String | Ja | Der Sitzungs-IDentifier für eine sitzungsfähige Entität. |
Gültigkeitsdauer | ServiceBusMessageHeaders#TIME_TO_LIVE |
Duration | Ja | Die Dauer, bis diese Nachricht abläuft. |
Beschreibung | ServiceBusMessageHeaders#TO |
String | Ja | Die "to"-Adresse der Nachricht, die für die zukünftige Verwendung in Routingszenarien reserviert ist und derzeit vom Broker selbst ignoriert wird. |
Subject | ServiceBusMessageHeaders#SUBJECT |
String | Ja | Der Betreff für die Nachricht. |
Beschreibung des Fehlers "Inaktiver Buchstabe" | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
String | Nein | Die Beschreibung für eine Nachricht, die inaktiv war. |
Grund für inaktive Buchstaben | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
String | Nein | Der Grund, warum eine Nachricht in totschreibend war. |
Quelle für inaktive Buchstaben | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
String | Nein | Die Entität, in der die Nachricht inaktiv war. |
Lieferanzahl | ServiceBusMessageHeaders#DELIVERY_COUNT |
lang | Nein | Die Häufigkeit, mit der diese Nachricht an Clients übermittelt wurde. |
Nummer der Enqueued-Sequenz | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
lang | Nein | Die queuierte Sequenznummer, die einer Nachricht von Service Bus zugewiesen ist. |
Enqueued time | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Nein | Das Datum, zu dem diese Nachricht in Service Bus enqueuiert wurde. |
Ablaufzeit | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Nein | Das Datum, zu dem diese Nachricht abläuft. |
Sperrtoken | ServiceBusMessageHeaders#LOCK_TOKEN |
String | Nein | Das Sperrtoken für die aktuelle Nachricht. |
Gesperrt bis | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Nein | Das Datum, zu dem die Sperre dieser Nachricht abläuft. |
Sequenznummer | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
lang | Nein | Die eindeutige Nummer, die einer Nachricht von Service Bus zugewiesen ist. |
Zustand | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Nein | Der Status der Nachricht, die aktiv, verzögert oder geplant sein kann. |
Unterstützung des Partitionsschlüssels
Dieser Starter unterstützt die Dienstbuspartitionierung , indem die Einstellung des Partitionsschlüssels und der Sitzungs-ID im Nachrichtenkopf zugelassen wird. In diesem Abschnitt wird erläutert, wie Sie den Partitionsschlüssel für Nachrichten festlegen.
Empfohlen: Verwenden Sie ServiceBusMessageHeaders.PARTITION_KEY
als Schlüssel der Kopfzeile.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nicht empfohlen, derzeit unterstützt:AzureHeaders.PARTITION_KEY
als Schlüssel der Kopfzeile.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Hinweis
Wenn beide ServiceBusMessageHeaders.PARTITION_KEY
und AzureHeaders.PARTITION_KEY
in den Nachrichtenkopfzeilen festgelegt werden, ServiceBusMessageHeaders.PARTITION_KEY
wird bevorzugt.
Sitzungsunterstützung
In diesem Beispiel wird veranschaulicht, wie Die Sitzungs-ID einer Nachricht in der Anwendung manuell festgelegt wird.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Hinweis
Wenn der ServiceBusMessageHeaders.SESSION_ID
Wert in den Nachrichtenkopfzeilen festgelegt ist und auch eine andere ServiceBusMessageHeaders.PARTITION_KEY
Kopfzeile festgelegt wird, wird der Wert der Sitzungs-ID schließlich verwendet, um den Wert des Partitionsschlüssels zu überschreiben.
Beispiele
Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.
Spring-Integration in Azure Storage-Warteschlangen
Wichtige Begriffe
Azure Queue Storage ist ein Dienst für die Speicherung großer Nachrichtenmengen. Sie können überall auf der Welt über authentifizierte Aufrufe mithilfe von HTTP oder HTTPS auf Nachrichten zugreifen. Eine Warteschlangennachricht kann bis zu 64 KB groß sein. Eine Warteschlange kann Millionen Nachrichten enthalten, bis die maximale Kapazität eines Speicherkontos erreicht ist. Warteschlangen werden häufig verwendet, um ein Arbeits-Backlog zur asynchronen Verarbeitung zu erstellen.
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfiguration
Dieser Start bietet die folgenden Konfigurationsoptionen:
Verbinden ion-Konfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit der Azure Storage-Warteschlange verwendet werden.
Hinweis
Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-starter-integration-storage-queue:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.azure.storage.queue.enabled | boolean | Gibt an, ob eine Azure Storage-Warteschlange aktiviert ist. |
spring.cloud.azure.storage.queue.connection-string | String | Speicherwarteschlangennamespace Verbindungszeichenfolge Wert. |
spring.cloud.azure.storage.queue.accountName | String | Name des Speicherwarteschlangenkontos. |
spring.cloud.azure.storage.queue.accountKey | String | Speicherwarteschlangenkontoschlüssel. |
spring.cloud.azure.storage.queue.endpoint | String | Endpunkt des Speicherwarteschlangendiensts. |
spring.cloud.azure.storage.queue.sasToken | String | Sas-Token-Anmeldeinformationen |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion, die beim Erstellen von API-Anforderungen verwendet wird. |
spring.cloud.azure.storage.queue.messageEncoding | String | Codierung von Warteschlangennachrichten. |
Grundlegende Verwendung
Senden von Nachrichten an die Azure Storage-Warteschlange
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Erstellen Sie
DefaultMessageHandler
mit der Bohnen, um Nachrichten an dieStorageQueueTemplate
Speicherwarteschlange zu senden.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Senden von Nachrichten mithilfe des Gateways.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Empfangen von Nachrichten aus der Azure Storage-Warteschlange
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Erstellen Sie einen Nachrichtenkanal als Eingabekanal.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Erstellen Sie
StorageQueueMessageSource
mit der Bohnen, um Nachrichten an dieStorageQueueTemplate
Speicherwarteschlange zu empfangen.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Erstellen Sie eine Nachrichtenempfängerbindung mit StorageQueueMessageSource, die im letzten Schritt über den zuvor erstellten Nachrichtenkanal erstellt wurde.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Beispiele
Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für