Spring Cloud Azure-Support für Spring Cloud Stream
Dieser Artikel bezieht sich auf: ✔️ Version 4.14.0 ✔️ Version 5.8.0
Spring Cloud Stream ist ein Framework für die Erstellung hoch skalierbarer ereignisgesteuerter Mikroservices, die mit freigegebenen Messagingsystemen verbunden sind.
Das Framework bietet ein flexibles Programmiermodell, das auf bereits etablierten und vertrauten Spring-Idioms und bewährten Methoden basiert. Zu diesen bewährten Methoden gehören Unterstützung für persistente Pub/Sub-Semantik, Consumergruppen und zustandsbehaftete Partitionen.
Zu den aktuellen Binderimplementierungen gehören:
spring-cloud-azure-stream-binder-eventhubs
– weitere Informationen finden Sie unter Spring Cloud Stream Binder für Azure Event Hubsspring-cloud-azure-stream-binder-servicebus
– weitere Informationen finden Sie unter Spring Cloud Stream Binder für Azure Service Bus
Spring Cloud Stream Binder für Azure Event Hubs
Wichtige Begriffe
Der Spring Cloud Stream Binder für Azure Event Hubs stellt die Bindungsimplementierung für das Spring Cloud Stream-Framework bereit. Diese Implementierung verwendet Spring Integration Event Hubs-Kanaladapter auf der Grundlage. Aus Der Perspektive des Designs ist Event Hubs ähnlich wie Kafka. Außerdem kann über die Kafka-API auf Event Hubs zugegriffen werden. Wenn Ihr Projekt eine enge Abhängigkeit von der Kafka-API hat, können Sie den Event Hub mit dem Kafka-API-Beispiel ausprobieren .
Consumergruppe
Event Hubs bietet ähnliche Unterstützung von Consumergruppen wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Während Kafka alle zugesicherten Offsets im Broker speichert, müssen Sie Offsets von Event Hubs-Nachrichten speichern, die manuell verarbeitet werden. Event Hubs SDK stellt die Funktion zum Speichern solcher Offsets in Azure Storage bereit.
Unterstützung der Partitionierung
Event Hubs bietet ein ähnliches Konzept der physischen Partition wie Kafka. Im Gegensatz zu Kafkas automatischer Rebalancing zwischen Verbrauchern und Partitionen bietet Event Hubs jedoch eine Art präemptiver Modus. Das Speicherkonto fungiert als Lease, um zu bestimmen, welcher Verbraucher welche Partition besitzt. Wenn ein neuer Verbraucher beginnt, versucht er, einige Partitionen von den am stärksten geladenen Verbrauchern zu stehlen, um die Arbeitsauslastungsbilanz zu erreichen.
Um die Lastenausgleichsstrategie anzugeben, werden Eigenschaften spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
bereitgestellt. Weitere Informationen finden Sie im Abschnitt "Consumer-Eigenschaften ".
Batch-Consumerunterstützung
Spring Cloud Azure Stream Event Hubs-Ordner unterstützt Spring Cloud Stream Batch Consumer-Feature.
Um mit dem Batch-Consumer-Modus zu arbeiten, legen Sie die spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
Eigenschaft auf true
. Wenn diese Option aktiviert ist, wird eine Nachricht mit einer Nutzlast einer Liste von Batchereignissen empfangen und an die Consumer
Funktion übergeben. Jeder Nachrichtenkopf wird auch in eine Liste konvertiert, deren Inhalt der zugeordnete Headerwert ist, der von jedem Ereignis analysiert wird. Die kommunalen Header der Partitions-ID, des Prüfpunkters und der letzten Enqueued-Eigenschaften werden als einzelner Wert dargestellt, da der gesamte Batch von Ereignissen denselben Wert aufweist. Weitere Informationen finden Sie im Abschnitt "Event Hubs-Nachrichtenkopfzeilen" von Spring Cloud Azure-Support for Spring Integration.
Hinweis
Der Prüfpunktheader ist nur vorhanden, wenn der MANUAL
Prüfpunktmodus verwendet wird.
Die Prüfpunkterstellung von Batch-Consumer unterstützt zwei Modi: BATCH
und MANUAL
. BATCH
der Modus ist ein automatischer Prüfpunktmodus, um den gesamten Batch von Ereignissen zusammen zu überwachen, sobald der Ordner sie empfängt. MANUAL
der Modus besteht darin, die Ereignisse von Benutzern zu überprüfen. Wenn sie verwendet wird, wird die Checkpointer
Nachricht an den Nachrichtenkopf übergeben, und Benutzer können sie verwenden, um Prüfpunkte zu erledigen.
Sie können die Batchgröße angeben, indem Sie die max-size
Eigenschaften max-wait-time
festlegen, die ein Präfix von spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. Die max-size
Eigenschaft ist erforderlich, und die max-wait-time
Eigenschaft ist optional. Weitere Informationen finden Sie im Abschnitt "Consumer-Eigenschaften ".
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Alternativ können Sie auch den Spring Cloud Azure Stream Event Hubs Starter verwenden, wie im folgenden Beispiel für Maven gezeigt:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfiguration
Der Ordner stellt die folgenden drei Konfigurationsoptionen bereit:
Verbinden ion-Konfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Event Hubs verwendet werden.
Hinweis
Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Gibt an, ob azure Event Hubs aktiviert ist. |
spring.cloud.azure.eventhubs.connection-string | String | Event Hubs-Namespace Verbindungszeichenfolge Wert. |
spring.cloud.azure.eventhubs.namespace | String | Event Hubs-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DoStandard Name bestehen. |
spring.cloud.azure.eventhubs.do Standard-name | String | Do Standard Name eines Azure Event Hubs-Namespacewerts. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Benutzerdefinierte Endpunktadresse. |
Tipp
Allgemeine Azure Service SDK-Konfigurationsoptionen können auch für den Feder Cloud Azure Stream Event Hubs-Ordner konfiguriert werden. Die unterstützten Konfigurationsoptionen werden in der Spring Cloud Azure-Konfiguration eingeführt und können entweder mit dem einheitlichen Präfix oder dem Präfix spring.cloud.azure.
von spring.cloud.azure.eventhubs.
konfiguriert werden.
Der Ordner unterstützt auch Spring Could Azure Resource Manager standardmäßig. Informationen zum Abrufen der Verbindungszeichenfolge mit Sicherheitsprinzipale, die nicht mit Data
verwandten Rollen gewährt werden, finden Sie im Abschnitt "Grundlegende Verwendung" von Spring Could Azure Resource Manager.
Eigenschaften der Prüfpunktkonfiguration
Dieser Abschnitt enthält die Konfigurationsoptionen für den Speicher-Blobs-Dienst, der zum Beibehalten des Partitionsbesitzes und der Prüfpunktinformationen verwendet wird.
Hinweis
Ab Version 4.0.0 wird, wenn die Eigenschaft von spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nicht manuell aktiviert ist, kein Speichercontainer automatisch mit dem Namen aus spring.cloud.stream.bindings.bindings.binding-name.destination erstellt.
Prüfpunkt konfigurierbarer Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean | Gibt an, ob das Erstellen von Containern zulässig ist, falls nicht vorhanden. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Name für das Speicherkonto. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | Zugriffsschlüssel für das Speicherkonto |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | Name des Speichercontainers. |
Tipp
Allgemeine Azure Service SDK-Konfigurationsoptionen sind auch für den Speicherspeicher für Speicher-Blob-Prüfpunktspeicher konfigurierbar. Die unterstützten Konfigurationsoptionen werden in der Spring Cloud Azure-Konfiguration eingeführt und können entweder mit dem einheitlichen Präfix oder dem Präfix spring.cloud.azure.
von spring.cloud.azure.eventhubs.processor.checkpoint-store
konfiguriert werden.
Azure Event Hubs Binding-Konfigurationseigenschaften
Die folgenden Optionen sind in vier Abschnitte unterteilt: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.
Consumereigenschaften
Diese Eigenschaften werden über EventHubsConsumerProperties
.
Konfigurierbare Verbrauchereigenschaften von spring-cloud-azure-stream-binder-eventhubs:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Prüfpunktmodus, der verwendet wird, wenn Verbraucher entscheiden, wie die Prüfpunktmeldung ausgeführt werden soll |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Integer | Bestimmt die Nachrichtenmenge für jede Partition, um einen Prüfpunkt zu erledigen. Wird nur wirksam, wenn PARTITION_COUNT der Prüfpunktmodus verwendet wird. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Duration | Legt das Zeitintervall fest, um einen Prüfpunkt zu erledigen. Wird nur wirksam, wenn TIME der Prüfpunktmodus verwendet wird. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Integer | Die maximale Anzahl von Ereignissen in einem Batch. Erforderlich für den Batch-Consumer-Modus. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Duration | Die maximale Zeitdauer für die Batchaufwärmung. Wird nur wirksam, wenn der Batch-Consumer-Modus aktiviert ist und optional ist. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Duration | Die Intervallzeitdauer für die Aktualisierung. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | Die Lastenausgleichsstrategie. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Duration | Die Zeitdauer, nach der der Besitz der Partition abläuft. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Boolean | Gibt an, ob der Ereignisprozessor Informationen zum letzten enqueued-Ereignis auf der zugehörigen Partition anfordern soll und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Integer | Die Anzahl, die vom Consumer verwendet wird, um die Anzahl der Ereignisse zu steuern, die der Event Hub-Consumer aktiv empfängt und lokal in die Warteschlange stellt. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Zuordnen des Schlüssels als Partitions-ID und Werte von StartPositionProperties |
Die Zuordnung, die die Ereignisposition enthält, die für jede Partition verwendet werden soll, wenn kein Prüfpunkt für die Partition im Prüfpunktspeicher vorhanden ist. Diese Zuordnung wird von der Partitions-ID abgeschlüsselt. |
Hinweis
Die initial-partition-event-position
Konfiguration akzeptiert eine map
, um die Anfangsposition für jeden Event Hub anzugeben. Daher ist der Schlüssel die Partitions-ID, und der Wert enthält StartPositionProperties
Eigenschaften von Offset, Sequenznummer, enqueuierte Datumszeit und ob einschließlich. Sie können sie z. B. als
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Erweiterte Consumerkonfiguration
Die oben genannten Verbindungen, Prüfpunkte und die allgemeine Azure SDK-Clientkonfiguration unterstützen Anpassungen für jeden Sammelordner-Consumer, die Sie mit dem Präfix spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
konfigurieren können.
Produzenteneigenschaften
Diese Eigenschaften werden über EventHubsProducerProperties
.
Konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | boolean | Die Switch-Kennzeichnung für die Synchronisierung des Produzenten. Wenn true, wartet der Produzent nach einem Sendevorgang auf eine Antwort. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | lang | Die Zeitspanne, die nach einem Sendevorgang auf eine Antwort wartet. Wird nur wirksam, wenn ein Synchronisierungsproduzent aktiviert ist. |
Erweiterte Produzentenkonfiguration
Die oben genannte Verbindung und die allgemeine Azure SDK-Clientkonfiguration unterstützen anpassungen für jeden Ordnerhersteller, den Sie mit dem Präfix spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
konfigurieren können.
Grundlegende Verwendung
Senden und Empfangen von Nachrichten von/an Event Hubs
Füllen Sie die Konfigurationsoptionen mit Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Definieren Sie Lieferanten und Verbraucher.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Unterstützung der Partitionierung
Es wird eine PartitionSupplier
mit vom Benutzer bereitgestellte Partitionsinformationen erstellt, um die Partitionsinformationen über die zu sendende Nachricht zu konfigurieren. Das folgende Flussdiagramm zeigt den Vorgang zum Abrufen verschiedener Prioritäten für die Partitions-ID und den Schlüssel:
Batch-Consumerunterstützung
Stellen Sie die Batchkonfigurationsoptionen bereit, wie im folgenden Beispiel gezeigt:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Definieren Sie Lieferanten und Verbraucher.
Für den Prüfpunktmodus als
BATCH
, können Sie den folgenden Code verwenden, um Nachrichten zu senden und in Batches zu nutzen.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Für den Prüfpunktmodus als
MANUAL
, können Sie den folgenden Code verwenden, um Nachrichten zu senden und nachrichten zu verbrauchen/prüfpunkt in Batches zu verwenden.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Hinweis
Im Batch-Nutzungsmodus ist application/json
der Standardinhaltstyp des Spring Cloud Stream-Sammelbands. Stellen Sie daher sicher, dass die Nachrichtennutzlast am Inhaltstyp ausgerichtet ist. Wenn Sie z. B. den Standardinhaltstyp zum Empfangen von application/json
Nachrichten mit String
Nutzlast verwenden, sollte die Nutzlast mit doppelten Anführungszeichen für den ursprünglichen String
Text umgeben seinJSON String
. Bei Inhaltstyp text/plain
kann es sich um ein String
Objekt direkt handeln. Weitere Informationen finden Sie unter Spring Cloud Stream Content Type Negotiation.
Behandeln von Fehlermeldungen
Behandeln von Ausgehenden Bindungsfehlermeldungen
Standardmäßig erstellt Spring Integration einen globalen Fehlerkanal namens
errorChannel
. Konfigurieren Sie den folgenden Nachrichtenendpunkt für die Behandlung von Fehlermeldungen für ausgehende Bindung:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Behandeln eingehender Bindungsfehlermeldungen
Spring Cloud Stream Event Hubs Binder unterstützt zwei Lösungen zur Behandlung von Fehlern für eingehende Nachrichtenbindungen: benutzerdefinierte Fehlerkanäle und Handler.
Fehlerkanal:
Spring Cloud Stream stellt einen Fehlerkanal für jede eingehende Bindung bereit. Es
ErrorMessage
wird an den Fehlerkanal gesendet. Weitere Informationen finden Sie in der Spring Cloud Stream-Dokumentation zur Behandlung von Fehlern .Standardfehlerkanal
Sie können einen globalen Fehlerkanal verwenden, der benannt ist
errorChannel
, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Um diese Nachrichten zu verarbeiten, konfigurieren Sie den folgenden Nachrichtenendpunkt:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Bindungsspezifischer Fehlerkanal
Sie können einen bestimmten Fehlerkanal verwenden, um die spezifischen eingehenden Bindungsfehlermeldungen mit einer höheren Priorität zu nutzen als der Standardfehlerkanal. Um diese Nachrichten zu verarbeiten, konfigurieren Sie den folgenden Nachrichtenendpunkt:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Hinweis
Der bindungsspezifische Fehlerkanal schließt sich gegenseitig mit anderen bereitgestellten Fehlerhandlern und Kanälen aus.
Fehlerhandler:
Spring Cloud Stream macht einen Mechanismus verfügbar, mit dem Sie einen benutzerdefinierten Fehlerhandler bereitstellen können, indem Sie eine
Consumer
ErrorMessage
Instanz hinzufügen. Weitere Informationen finden Sie in der Dokumentation zu Spring Cloud Stream unter "Fehlerbehandlung ".Hinweis
Wenn ein Bindungsfehlerhandler konfiguriert ist, kann er mit dem Standardfehlerkanal arbeiten.
Bindungsstandardfehlerhandler
Konfigurieren Sie eine einzelne
Consumer
Bohnen, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Standardfunktion abonniert jeden eingehenden Bindungsfehlerkanal:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Sie müssen die
spring.cloud.stream.default.error-handler-definition
Eigenschaft auch auf den Funktionsnamen festlegen.Bindungsspezifischer Fehlerhandler
Konfigurieren Sie eine
Consumer
Bohnen, um die spezifischen eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Funktion abonniert den spezifischen Fehlerkanal für eingehende Bindung und hat eine höhere Priorität als der Bindungsstandardfehlerhandler:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Sie müssen die
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
Eigenschaft auch auf den Funktionsnamen festlegen.
Event Hubs-Nachrichtenkopfzeilen
Die unterstützten grundlegenden Nachrichtenkopfzeilen finden Sie im Abschnitt "Event Hubs-Nachrichtenkopfzeilen" von Spring Cloud Azure-Support for Spring Integration.
Unterstützung mehrerer Ordner
Verbinden ion auf mehrere Event Hubs-Namespaces wird auch mithilfe mehrerer Ordner unterstützt. In diesem Beispiel wird ein Verbindungszeichenfolge als Beispiel akzeptiert. Anmeldeinformationen von Dienstprinzipalen und verwalteten Identitäten werden ebenfalls unterstützt. Sie können verwandte Eigenschaften in den Umgebungseinstellungen der einzelnen Ordner festlegen.
Um mehrere Ordner mit Event Hubs zu verwenden, konfigurieren Sie die folgenden Eigenschaften in der Datei application.yml :
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Hinweis
In der vorherigen Anwendungsdatei wird gezeigt, wie Sie einen einzelnen Standardabfrager für die Anwendung für alle Bindungen konfigurieren. Wenn Sie den Poller für eine bestimmte Bindung konfigurieren möchten, können Sie eine Konfiguration wie
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
z. B. verwenden.Wir müssen zwei Lieferanten und zwei Verbraucher definieren:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Ressourcenbereitstellung
Event Hubs-Ordner unterstützt die Bereitstellung von Event Hub- und Consumergruppen. Benutzer können die folgenden Eigenschaften verwenden, um die Bereitstellung zu aktivieren.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Beispiele
Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.
Azure Spring Cloud Stream Binder für Azure Service Bus
Wichtige Begriffe
Der Spring Cloud Stream Binder für Azure Service Bus stellt die Bindungsimplementierung für das Spring Cloud Stream Framework bereit. Diese Implementierung verwendet Spring Integration Service Bus Channel Adapter auf der Grundlage.
Geplante Nachricht
Dieser Ordner unterstützt das Senden von Nachrichten an ein Thema zur verzögerten Verarbeitung. Benutzer können geplante Nachrichten mit Kopfzeilen x-delay
senden, die in Millisekunden eine Verzögerungszeit für die Nachricht ausdrücken. Die Nachricht wird nach x-delay
Millisekunden an die jeweiligen Themen übermittelt.
Consumergruppe
Service Bus Topic bietet ähnliche Unterstützung der Consumergruppe wie Apache Kafka, aber mit leichter unterschiedlicher Logik.
Dieser Ordner basiert auf Subscription
einem Thema, das als Verbrauchergruppe fungiert.
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Alternativ können Sie auch den Spring Cloud Azure Stream Service Bus Starter verwenden, wie im folgenden Beispiel für Maven gezeigt:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfiguration
Der Ordner stellt die folgenden beiden Konfigurationsoptionen bereit:
Verbinden ion-Konfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Service Bus verwendet werden.
Hinweis
Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-servicebus:
Eigenschaft | Typ | Beschreibung |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Gibt an, ob ein Azure Service Bus aktiviert ist. |
spring.cloud.azure.servicebus.connection-string | String | Service Bus Namespace Verbindungszeichenfolge Wert. |
spring.cloud.azure.servicebus.namespace | String | Service Bus-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DoStandard Name bestehen. |
spring.cloud.azure.servicebus.do Standard-name | String | Do Standard Name eines Azure Service Bus-Namespacewerts. |
Hinweis
Allgemeine Azure Service SDK-Konfigurationsoptionen können auch für den Feder Cloud Azure Stream Service Bus-Ordner konfiguriert werden. Die unterstützten Konfigurationsoptionen werden in der Spring Cloud Azure-Konfiguration eingeführt und können entweder mit dem einheitlichen Präfix oder dem Präfix spring.cloud.azure.
von spring.cloud.azure.servicebus.
konfiguriert werden.
Der Ordner unterstützt auch Spring Could Azure Resource Manager standardmäßig. Informationen zum Abrufen der Verbindungszeichenfolge mit Sicherheitsprinzipale, die nicht mit Data
verwandten Rollen gewährt werden, finden Sie im Abschnitt "Grundlegende Verwendung" von Spring Could Azure Resource Manager.
Azure Service Bus-Bindungskonfigurationseigenschaften
Die folgenden Optionen sind in vier Abschnitte unterteilt: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.
Consumereigenschaften
Diese Eigenschaften werden über ServiceBusConsumerProperties
.
Konfigurierbare Verbrauchereigenschaften von spring-cloud-azure-stream-binder-servicebus:
Eigenschaft | type | Standard | Beschreibung |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | boolean | false | Wenn die fehlgeschlagenen Nachrichten an den DLQ weitergeleitet werden. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Integer | 1 | Max. gleichzeitige Nachrichten, die der Dienstbusprozessorclient verarbeiten soll. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Integer | NULL | Maximale Anzahl gleichzeitiger Sitzungen, die zu einem bestimmten Zeitpunkt verarbeitet werden sollen. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Boolean | NULL | Gibt an, ob die Sitzung aktiviert ist. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Integer | 0 | Die Prefetch-Anzahl des Service Bus-Prozessorclients. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | Keine | Der Typ der Unterwarteschlange, mit der eine Verbindung hergestellt werden soll. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Duration | 5 m | Die Zeitspanne, um die automatische Verlängerung der Sperre fortzusetzen. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Der Empfangsmodus des Service Bus-Prozessorclients. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Boolesch | true | Gibt an, ob Nachrichten automatisch abgleichen sollen. Wenn dieser Wert auf "false" festgelegt ist, wird eine Nachrichtenkopfzeile Checkpointer hinzugefügt, mit der Entwickler Nachrichten manuell abgleichen können. |
Erweiterte Consumerkonfiguration
Die oben genannte Verbindung und die allgemeine Azure SDK-Clientkonfiguration unterstützen anpassungen für jeden Sammelordner-Consumer, den Sie mit dem Präfix spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
konfigurieren können.
Produzenteneigenschaften
Diese Eigenschaften werden über ServiceBusProducerProperties
.
Konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-servicebus:
Eigenschaft | type | Standard | Beschreibung |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | boolean | false | Switch flag for sync of producer. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | lang | 10000 | Timeoutwert für das Senden des Produzenten. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | NULL | Service Bus-Entitätstyp des Produzenten, erforderlich für den Bindungshersteller. |
Wichtig
Bei Verwendung des Bindungsherstellers muss die Eigenschaft konfiguriert spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
werden.
Erweiterte Produzentenkonfiguration
Die oben genannte Verbindung und die allgemeine Azure SDK-Clientkonfiguration unterstützen anpassungen für jeden Ordnerhersteller, den Sie mit dem Präfix spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
konfigurieren können.
Grundlegende Verwendung
Senden und Empfangen von Nachrichten von/an Service Bus
Füllen Sie die Konfigurationsoptionen mit Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Definieren Sie Lieferanten und Verbraucher.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Unterstützung des Partitionsschlüssels
Der Ordner unterstützt die Dienstbuspartitionierung , indem die Einstellung des Partitionsschlüssels und der Sitzungs-ID im Nachrichtenkopf zugelassen wird. In diesem Abschnitt wird erläutert, wie Sie den Partitionsschlüssel für Nachrichten festlegen.
Spring Cloud Stream stellt eine SpEL-Ausdruckseigenschaft spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
des Partitionsschlüssels bereit. Legen Sie diese Eigenschaft z. B. als "'partitionKey-' + headers[<message-header-key>]"
Kopfzeile fest, und fügen Sie eine Kopfzeile mit dem Namen "Message-header-key" hinzu. Spring Cloud Stream verwendet den Wert für diesen Header beim Auswerten des Ausdrucks, um einen Partitionsschlüssel zuzuweisen. Der folgende Code stellt einen Beispielhersteller bereit:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Sitzungsunterstützung
Der Ordner unterstützt Nachrichtensitzungen von Service Bus. Die Sitzungs-ID einer Nachricht kann über den Nachrichtenkopf festgelegt werden.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Hinweis
Gemäß der Dienstbuspartitionierung hat die Sitzungs-ID eine höhere Priorität als Partitionsschlüssel. Wenn also sowohl header ServiceBusMessageHeaders#SESSION_ID
ServiceBusMessageHeaders#PARTITION_KEY
als auch Header festgelegt werden, wird der Wert der Sitzungs-ID schließlich verwendet, um den Wert des Partitionsschlüssels zu überschreiben.
Behandeln von Fehlermeldungen
Behandeln von Ausgehenden Bindungsfehlermeldungen
Standardmäßig erstellt Spring Integration einen globalen Fehlerkanal namens
errorChannel
. Konfigurieren Sie den folgenden Nachrichtenendpunkt, um die Fehlermeldung für ausgehende Bindung zu behandeln.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Behandeln eingehender Bindungsfehlermeldungen
Spring Cloud Stream Service Bus Binder unterstützt drei Lösungen zur Behandlung von Fehlern für eingehende Nachrichtenbindungen: der Sammelordnerfehlerhandler, benutzerdefinierte Fehlerkanäle und Handler.
Sammelmappenfehlerhandler:
Der Standardordnerfehlerhandler behandelt die eingehende Bindung. Sie verwenden diesen Handler, um fehlgeschlagene Nachrichten an die Warteschlange mit inaktiven Buchstaben zu senden, wenn
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
diese aktiviert ist. Andernfalls werden die fehlgeschlagenen Nachrichten abgebrochen. Mit Ausnahme der Konfiguration des bindungsspezifischen Fehlerkanals wird der Ordnerfehlerhandler immer wirksam, unabhängig davon, ob andere benutzerdefinierte Fehlerhandler oder Kanäle vorhanden sind.Fehlerkanal:
Spring Cloud Stream stellt einen Fehlerkanal für jede eingehende Bindung bereit. Es
ErrorMessage
wird an den Fehlerkanal gesendet. Weitere Informationen finden Sie in der Spring Cloud Stream-Dokumentation zur Behandlung von Fehlern .Standardfehlerkanal
Sie können einen globalen Fehlerkanal verwenden, der benannt ist
errorChannel
, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Um diese Nachrichten zu verarbeiten, konfigurieren Sie den folgenden Nachrichtenendpunkt:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Bindungsspezifischer Fehlerkanal
Sie können einen bestimmten Fehlerkanal verwenden, um die spezifischen eingehenden Bindungsfehlermeldungen mit einer höheren Priorität zu nutzen als der Standardfehlerkanal. Um diese Nachrichten zu verarbeiten, konfigurieren Sie den folgenden Nachrichtenendpunkt:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Hinweis
Der bindungsspezifische Fehlerkanal schließt sich gegenseitig mit anderen bereitgestellten Fehlerhandlern und Kanälen aus.
Fehlerhandler:
Spring Cloud Stream macht einen Mechanismus verfügbar, mit dem Sie einen benutzerdefinierten Fehlerhandler bereitstellen können, indem Sie eine
Consumer
ErrorMessage
Instanz hinzufügen. Weitere Informationen finden Sie in der Dokumentation zu Spring Cloud Stream unter "Fehlerbehandlung ".Hinweis
Wenn ein Bindungsfehlerhandler konfiguriert ist, kann er mit dem Standardfehlerkanal und dem Ordnerfehlerhandler arbeiten.
Bindungsstandardfehlerhandler
Konfigurieren Sie eine einzelne
Consumer
Bohnen, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Standardfunktion abonniert jeden eingehenden Bindungsfehlerkanal:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Sie müssen die
spring.cloud.stream.default.error-handler-definition
Eigenschaft auch auf den Funktionsnamen festlegen.Bindungsspezifischer Fehlerhandler
Konfigurieren Sie eine
Consumer
Bohnen, um die spezifischen eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Funktion abonniert den spezifischen Eingehenden Bindungsfehlerkanal mit einer höheren Priorität als der Bindungsstandardfehlerhandler.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Sie müssen die
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
Eigenschaft auch auf den Funktionsnamen festlegen.
Dienstbus-Nachrichtenkopfzeilen
Die unterstützten grundlegenden Nachrichtenkopfzeilen finden Sie im Abschnitt "Service Bus-Nachrichtenkopfzeilen" von Spring Cloud Azure-Support for Spring Integration.
Hinweis
Beim Festlegen des Partitionsschlüssels ist die Priorität des Nachrichtenkopfs höher als die Spring Cloud Stream-Eigenschaft. Wird also spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
nur wirksam, wenn keines der ServiceBusMessageHeaders#SESSION_ID
Header konfiguriert ServiceBusMessageHeaders#PARTITION_KEY
ist.
Unterstützung mehrerer Ordner
Verbinden ion an mehrere ServiceBus-Namespaces wird auch mithilfe mehrerer Ordner unterstützt. In diesem Beispiel wird Verbindungszeichenfolge als Beispiel akzeptiert. Anmeldeinformationen von Dienstprinzipalen und verwalteten Identitäten werden ebenfalls unterstützt, Benutzer können verwandte Eigenschaften in den Umgebungseinstellungen der einzelnen Ordner festlegen.
Um mehrere Ordner von ServiceBus zu verwenden, konfigurieren Sie die folgenden Eigenschaften in der Datei application.yml :
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Hinweis
In der vorherigen Anwendungsdatei wird gezeigt, wie Sie einen einzelnen Standardabfrager für die Anwendung für alle Bindungen konfigurieren. Wenn Sie den Poller für eine bestimmte Bindung konfigurieren möchten, können Sie eine Konfiguration wie
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
z. B. verwenden.wir müssen zwei Lieferanten und zwei Verbraucher definieren
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Ressourcenbereitstellung
Der Servicebusordner unterstützt die Bereitstellung von Warteschlangen, Themen und Abonnements. Benutzer können die folgenden Eigenschaften verwenden, um die Bereitstellung zu aktivieren.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Hinweis
Die zulässigen tenant-id
Werte sind: common
, , organizations
, consumers
, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.
Beispiele
Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für