Freigeben über


Spring Cloud Azure-Support für Spring Cloud Stream

Dieser Artikel bezieht sich auf: ✔️ Version 4.14.0 ✔️ Version 5.8.0

Spring Cloud Stream ist ein Framework für die Erstellung hoch skalierbarer ereignisgesteuerter Mikroservices, die mit freigegebenen Messagingsystemen verbunden sind.

Das Framework bietet ein flexibles Programmiermodell, das auf bereits etablierten und vertrauten Spring-Idioms und bewährten Methoden basiert. Zu diesen bewährten Methoden gehören Unterstützung für persistente Pub/Sub-Semantik, Consumergruppen und zustandsbehaftete Partitionen.

Zu den aktuellen Binderimplementierungen gehören:

Spring Cloud Stream Binder für Azure Event Hubs

Wichtige Begriffe

Der Spring Cloud Stream Binder für Azure Event Hubs stellt die Bindungsimplementierung für das Spring Cloud Stream-Framework bereit. Diese Implementierung verwendet Spring Integration Event Hubs-Kanaladapter auf der Grundlage. Aus Der Perspektive des Designs ist Event Hubs ähnlich wie Kafka. Außerdem kann über die Kafka-API auf Event Hubs zugegriffen werden. Wenn Ihr Projekt eine enge Abhängigkeit von der Kafka-API hat, können Sie den Event Hub mit dem Kafka-API-Beispiel ausprobieren .

Consumergruppe

Event Hubs bietet ähnliche Unterstützung von Consumergruppen wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Während Kafka alle zugesicherten Offsets im Broker speichert, müssen Sie Offsets von Event Hubs-Nachrichten speichern, die manuell verarbeitet werden. Event Hubs SDK stellt die Funktion zum Speichern solcher Offsets in Azure Storage bereit.

Unterstützung der Partitionierung

Event Hubs bietet ein ähnliches Konzept der physischen Partition wie Kafka. Im Gegensatz zu Kafkas automatischer Rebalancing zwischen Verbrauchern und Partitionen bietet Event Hubs jedoch eine Art präemptiver Modus. Das Speicherkonto fungiert als Lease, um zu bestimmen, welcher Verbraucher welche Partition besitzt. Wenn ein neuer Verbraucher beginnt, versucht er, einige Partitionen von den am stärksten geladenen Verbrauchern zu stehlen, um die Arbeitsauslastungsbilanz zu erreichen.

Um die Lastenausgleichsstrategie anzugeben, werden Eigenschaften spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* bereitgestellt. Weitere Informationen finden Sie im Abschnitt "Consumer-Eigenschaften ".

Batch-Consumerunterstützung

Spring Cloud Azure Stream Event Hubs-Ordner unterstützt Spring Cloud Stream Batch Consumer-Feature.

Um mit dem Batch-Consumer-Modus zu arbeiten, legen Sie die spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode Eigenschaft auf true. Wenn diese Option aktiviert ist, wird eine Nachricht mit einer Nutzlast einer Liste von Batchereignissen empfangen und an die Consumer Funktion übergeben. Jeder Nachrichtenkopf wird auch in eine Liste konvertiert, deren Inhalt der zugeordnete Headerwert ist, der von jedem Ereignis analysiert wird. Die kommunalen Header der Partitions-ID, des Prüfpunkters und der letzten Enqueued-Eigenschaften werden als einzelner Wert dargestellt, da der gesamte Batch von Ereignissen denselben Wert aufweist. Weitere Informationen finden Sie im Abschnitt "Event Hubs-Nachrichtenkopfzeilen" von Spring Cloud Azure-Support for Spring Integration.

Hinweis

Der Prüfpunktheader ist nur vorhanden, wenn der MANUAL Prüfpunktmodus verwendet wird.

Die Prüfpunkterstellung von Batch-Consumer unterstützt zwei Modi: BATCH und MANUAL. BATCH der Modus ist ein automatischer Prüfpunktmodus, um den gesamten Batch von Ereignissen zusammen zu überwachen, sobald der Ordner sie empfängt. MANUAL der Modus besteht darin, die Ereignisse von Benutzern zu überprüfen. Wenn sie verwendet wird, wird die Checkpointer Nachricht an den Nachrichtenkopf übergeben, und Benutzer können sie verwenden, um Prüfpunkte zu erledigen.

Sie können die Batchgröße angeben, indem Sie die max-size Eigenschaften max-wait-time festlegen, die ein Präfix von spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Die max-size Eigenschaft ist erforderlich, und die max-wait-time Eigenschaft ist optional. Weitere Informationen finden Sie im Abschnitt "Consumer-Eigenschaften ".

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Alternativ können Sie auch den Spring Cloud Azure Stream Event Hubs Starter verwenden, wie im folgenden Beispiel für Maven gezeigt:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfiguration

Der Ordner stellt die folgenden drei Konfigurationsoptionen bereit:

Verbinden ion-Konfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Event Hubs verwendet werden.

Hinweis

Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:

Eigenschaft Typ Beschreibung
spring.cloud.azure.eventhubs.enabled boolean Gibt an, ob azure Event Hubs aktiviert ist.
spring.cloud.azure.eventhubs.connection-string String Event Hubs-Namespace Verbindungszeichenfolge Wert.
spring.cloud.azure.eventhubs.namespace String Event Hubs-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DoStandard Name bestehen.
spring.cloud.azure.eventhubs.do Standard-name String Do Standard Name eines Azure Event Hubs-Namespacewerts.
spring.cloud.azure.eventhubs.custom-endpoint-address String Benutzerdefinierte Endpunktadresse.

Tipp

Allgemeine Azure Service SDK-Konfigurationsoptionen können auch für den Feder Cloud Azure Stream Event Hubs-Ordner konfiguriert werden. Die unterstützten Konfigurationsoptionen werden in der Spring Cloud Azure-Konfiguration eingeführt und können entweder mit dem einheitlichen Präfix oder dem Präfix spring.cloud.azure. von spring.cloud.azure.eventhubs.konfiguriert werden.

Der Ordner unterstützt auch Spring Could Azure Resource Manager standardmäßig. Informationen zum Abrufen der Verbindungszeichenfolge mit Sicherheitsprinzipale, die nicht mit Data verwandten Rollen gewährt werden, finden Sie im Abschnitt "Grundlegende Verwendung" von Spring Could Azure Resource Manager.

Eigenschaften der Prüfpunktkonfiguration

Dieser Abschnitt enthält die Konfigurationsoptionen für den Speicher-Blobs-Dienst, der zum Beibehalten des Partitionsbesitzes und der Prüfpunktinformationen verwendet wird.

Hinweis

Ab Version 4.0.0 wird, wenn die Eigenschaft von spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nicht manuell aktiviert ist, kein Speichercontainer automatisch mit dem Namen aus spring.cloud.stream.bindings.bindings.binding-name.destination erstellt.

Prüfpunkt konfigurierbarer Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:

Eigenschaft Typ Beschreibung
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean Gibt an, ob das Erstellen von Containern zulässig ist, falls nicht vorhanden.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Name für das Speicherkonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Zugriffsschlüssel für das Speicherkonto
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Name des Speichercontainers.

Tipp

Allgemeine Azure Service SDK-Konfigurationsoptionen sind auch für den Speicherspeicher für Speicher-Blob-Prüfpunktspeicher konfigurierbar. Die unterstützten Konfigurationsoptionen werden in der Spring Cloud Azure-Konfiguration eingeführt und können entweder mit dem einheitlichen Präfix oder dem Präfix spring.cloud.azure. von spring.cloud.azure.eventhubs.processor.checkpoint-storekonfiguriert werden.

Azure Event Hubs Binding-Konfigurationseigenschaften

Die folgenden Optionen sind in vier Abschnitte unterteilt: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.

Consumereigenschaften

Diese Eigenschaften werden über EventHubsConsumerProperties.

Konfigurierbare Verbrauchereigenschaften von spring-cloud-azure-stream-binder-eventhubs:

Eigenschaft Typ Beschreibung
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Prüfpunktmodus, der verwendet wird, wenn Verbraucher entscheiden, wie die Prüfpunktmeldung ausgeführt werden soll
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Integer Bestimmt die Nachrichtenmenge für jede Partition, um einen Prüfpunkt zu erledigen. Wird nur wirksam, wenn PARTITION_COUNT der Prüfpunktmodus verwendet wird.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Duration Legt das Zeitintervall fest, um einen Prüfpunkt zu erledigen. Wird nur wirksam, wenn TIME der Prüfpunktmodus verwendet wird.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Integer Die maximale Anzahl von Ereignissen in einem Batch. Erforderlich für den Batch-Consumer-Modus.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Duration Die maximale Zeitdauer für die Batchaufwärmung. Wird nur wirksam, wenn der Batch-Consumer-Modus aktiviert ist und optional ist.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Duration Die Intervallzeitdauer für die Aktualisierung.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Die Lastenausgleichsstrategie.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Duration Die Zeitdauer, nach der der Besitz der Partition abläuft.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolean Gibt an, ob der Ereignisprozessor Informationen zum letzten enqueued-Ereignis auf der zugehörigen Partition anfordern soll und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Integer Die Anzahl, die vom Consumer verwendet wird, um die Anzahl der Ereignisse zu steuern, die der Event Hub-Consumer aktiv empfängt und lokal in die Warteschlange stellt.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Zuordnen des Schlüssels als Partitions-ID und Werte von StartPositionProperties Die Zuordnung, die die Ereignisposition enthält, die für jede Partition verwendet werden soll, wenn kein Prüfpunkt für die Partition im Prüfpunktspeicher vorhanden ist. Diese Zuordnung wird von der Partitions-ID abgeschlüsselt.

Hinweis

Die initial-partition-event-position Konfiguration akzeptiert eine map , um die Anfangsposition für jeden Event Hub anzugeben. Daher ist der Schlüssel die Partitions-ID, und der Wert enthält StartPositionProperties Eigenschaften von Offset, Sequenznummer, enqueuierte Datumszeit und ob einschließlich. Sie können sie z. B. als

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Erweiterte Consumerkonfiguration

Die oben genannten Verbindungen, Prüfpunkte und die allgemeine Azure SDK-Clientkonfiguration unterstützen Anpassungen für jeden Sammelordner-Consumer, die Sie mit dem Präfix spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.konfigurieren können.

Produzenteneigenschaften

Diese Eigenschaften werden über EventHubsProducerProperties.

Konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:

Eigenschaft Typ Beschreibung
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolean Die Switch-Kennzeichnung für die Synchronisierung des Produzenten. Wenn true, wartet der Produzent nach einem Sendevorgang auf eine Antwort.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout lang Die Zeitspanne, die nach einem Sendevorgang auf eine Antwort wartet. Wird nur wirksam, wenn ein Synchronisierungsproduzent aktiviert ist.
Erweiterte Produzentenkonfiguration

Die oben genannte Verbindung und die allgemeine Azure SDK-Clientkonfiguration unterstützen anpassungen für jeden Ordnerhersteller, den Sie mit dem Präfix spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.konfigurieren können.

Grundlegende Verwendung

Senden und Empfangen von Nachrichten von/an Event Hubs

  1. Füllen Sie die Konfigurationsoptionen mit Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definieren Sie Lieferanten und Verbraucher.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Unterstützung der Partitionierung

Es wird eine PartitionSupplier mit vom Benutzer bereitgestellte Partitionsinformationen erstellt, um die Partitionsinformationen über die zu sendende Nachricht zu konfigurieren. Das folgende Flussdiagramm zeigt den Vorgang zum Abrufen verschiedener Prioritäten für die Partitions-ID und den Schlüssel:

Diagram showing a flowchart of the partitioning support process.

Batch-Consumerunterstützung

  1. Stellen Sie die Batchkonfigurationsoptionen bereit, wie im folgenden Beispiel gezeigt:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definieren Sie Lieferanten und Verbraucher.

    Für den Prüfpunktmodus als BATCH, können Sie den folgenden Code verwenden, um Nachrichten zu senden und in Batches zu nutzen.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Für den Prüfpunktmodus als MANUAL, können Sie den folgenden Code verwenden, um Nachrichten zu senden und nachrichten zu verbrauchen/prüfpunkt in Batches zu verwenden.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Hinweis

Im Batch-Nutzungsmodus ist application/jsonder Standardinhaltstyp des Spring Cloud Stream-Sammelbands. Stellen Sie daher sicher, dass die Nachrichtennutzlast am Inhaltstyp ausgerichtet ist. Wenn Sie z. B. den Standardinhaltstyp zum Empfangen von application/json Nachrichten mit String Nutzlast verwenden, sollte die Nutzlast mit doppelten Anführungszeichen für den ursprünglichen String Text umgeben seinJSON String. Bei Inhaltstyp text/plain kann es sich um ein String Objekt direkt handeln. Weitere Informationen finden Sie unter Spring Cloud Stream Content Type Negotiation.

Behandeln von Fehlermeldungen

  • Behandeln von Ausgehenden Bindungsfehlermeldungen

    Standardmäßig erstellt Spring Integration einen globalen Fehlerkanal namens errorChannel. Konfigurieren Sie den folgenden Nachrichtenendpunkt für die Behandlung von Fehlermeldungen für ausgehende Bindung:

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Behandeln eingehender Bindungsfehlermeldungen

    Spring Cloud Stream Event Hubs Binder unterstützt zwei Lösungen zur Behandlung von Fehlern für eingehende Nachrichtenbindungen: benutzerdefinierte Fehlerkanäle und Handler.

    Fehlerkanal:

    Spring Cloud Stream stellt einen Fehlerkanal für jede eingehende Bindung bereit. Es ErrorMessage wird an den Fehlerkanal gesendet. Weitere Informationen finden Sie in der Spring Cloud Stream-Dokumentation zur Behandlung von Fehlern .

    • Standardfehlerkanal

      Sie können einen globalen Fehlerkanal verwenden, der benannt ist errorChannel , um alle eingehenden Bindungsfehlermeldungen zu nutzen. Um diese Nachrichten zu verarbeiten, konfigurieren Sie den folgenden Nachrichtenendpunkt:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Bindungsspezifischer Fehlerkanal

      Sie können einen bestimmten Fehlerkanal verwenden, um die spezifischen eingehenden Bindungsfehlermeldungen mit einer höheren Priorität zu nutzen als der Standardfehlerkanal. Um diese Nachrichten zu verarbeiten, konfigurieren Sie den folgenden Nachrichtenendpunkt:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Hinweis

      Der bindungsspezifische Fehlerkanal schließt sich gegenseitig mit anderen bereitgestellten Fehlerhandlern und Kanälen aus.

    Fehlerhandler:

    Spring Cloud Stream macht einen Mechanismus verfügbar, mit dem Sie einen benutzerdefinierten Fehlerhandler bereitstellen können, indem Sie eine ConsumerErrorMessage Instanz hinzufügen. Weitere Informationen finden Sie in der Dokumentation zu Spring Cloud Stream unter "Fehlerbehandlung ".

    Hinweis

    Wenn ein Bindungsfehlerhandler konfiguriert ist, kann er mit dem Standardfehlerkanal arbeiten.

    • Bindungsstandardfehlerhandler

      Konfigurieren Sie eine einzelne Consumer Bohnen, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Standardfunktion abonniert jeden eingehenden Bindungsfehlerkanal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Sie müssen die spring.cloud.stream.default.error-handler-definition Eigenschaft auch auf den Funktionsnamen festlegen.

    • Bindungsspezifischer Fehlerhandler

      Konfigurieren Sie eine Consumer Bohnen, um die spezifischen eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Funktion abonniert den spezifischen Fehlerkanal für eingehende Bindung und hat eine höhere Priorität als der Bindungsstandardfehlerhandler:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Sie müssen die spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition Eigenschaft auch auf den Funktionsnamen festlegen.

Event Hubs-Nachrichtenkopfzeilen

Die unterstützten grundlegenden Nachrichtenkopfzeilen finden Sie im Abschnitt "Event Hubs-Nachrichtenkopfzeilen" von Spring Cloud Azure-Support for Spring Integration.

Unterstützung mehrerer Ordner

Verbinden ion auf mehrere Event Hubs-Namespaces wird auch mithilfe mehrerer Ordner unterstützt. In diesem Beispiel wird ein Verbindungszeichenfolge als Beispiel akzeptiert. Anmeldeinformationen von Dienstprinzipalen und verwalteten Identitäten werden ebenfalls unterstützt. Sie können verwandte Eigenschaften in den Umgebungseinstellungen der einzelnen Ordner festlegen.

  1. Um mehrere Ordner mit Event Hubs zu verwenden, konfigurieren Sie die folgenden Eigenschaften in der Datei application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Hinweis

    In der vorherigen Anwendungsdatei wird gezeigt, wie Sie einen einzelnen Standardabfrager für die Anwendung für alle Bindungen konfigurieren. Wenn Sie den Poller für eine bestimmte Bindung konfigurieren möchten, können Sie eine Konfiguration wie spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000z. B. verwenden.

  2. Wir müssen zwei Lieferanten und zwei Verbraucher definieren:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Ressourcenbereitstellung

Event Hubs-Ordner unterstützt die Bereitstellung von Event Hub- und Consumergruppen. Benutzer können die folgenden Eigenschaften verwenden, um die Bereitstellung zu aktivieren.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

Beispiele

Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.

Azure Spring Cloud Stream Binder für Azure Service Bus

Wichtige Begriffe

Der Spring Cloud Stream Binder für Azure Service Bus stellt die Bindungsimplementierung für das Spring Cloud Stream Framework bereit. Diese Implementierung verwendet Spring Integration Service Bus Channel Adapter auf der Grundlage.

Geplante Nachricht

Dieser Ordner unterstützt das Senden von Nachrichten an ein Thema zur verzögerten Verarbeitung. Benutzer können geplante Nachrichten mit Kopfzeilen x-delay senden, die in Millisekunden eine Verzögerungszeit für die Nachricht ausdrücken. Die Nachricht wird nach x-delay Millisekunden an die jeweiligen Themen übermittelt.

Consumergruppe

Service Bus Topic bietet ähnliche Unterstützung der Consumergruppe wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Dieser Ordner basiert auf Subscription einem Thema, das als Verbrauchergruppe fungiert.

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Alternativ können Sie auch den Spring Cloud Azure Stream Service Bus Starter verwenden, wie im folgenden Beispiel für Maven gezeigt:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfiguration

Der Ordner stellt die folgenden beiden Konfigurationsoptionen bereit:

Verbinden ion-Konfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Service Bus verwendet werden.

Hinweis

Wenn Sie sich dafür entscheiden, einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource zu verwenden, lesen Sie "Autorisieren des Zugriffs mit Microsoft Entra-ID ", um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Verbinden ion konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-servicebus:

Eigenschaft Typ Beschreibung
spring.cloud.azure.servicebus.enabled boolean Gibt an, ob ein Azure Service Bus aktiviert ist.
spring.cloud.azure.servicebus.connection-string String Service Bus Namespace Verbindungszeichenfolge Wert.
spring.cloud.azure.servicebus.namespace String Service Bus-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DoStandard Name bestehen.
spring.cloud.azure.servicebus.do Standard-name String Do Standard Name eines Azure Service Bus-Namespacewerts.

Hinweis

Allgemeine Azure Service SDK-Konfigurationsoptionen können auch für den Feder Cloud Azure Stream Service Bus-Ordner konfiguriert werden. Die unterstützten Konfigurationsoptionen werden in der Spring Cloud Azure-Konfiguration eingeführt und können entweder mit dem einheitlichen Präfix oder dem Präfix spring.cloud.azure. von spring.cloud.azure.servicebus.konfiguriert werden.

Der Ordner unterstützt auch Spring Could Azure Resource Manager standardmäßig. Informationen zum Abrufen der Verbindungszeichenfolge mit Sicherheitsprinzipale, die nicht mit Data verwandten Rollen gewährt werden, finden Sie im Abschnitt "Grundlegende Verwendung" von Spring Could Azure Resource Manager.

Azure Service Bus-Bindungskonfigurationseigenschaften

Die folgenden Optionen sind in vier Abschnitte unterteilt: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.

Consumereigenschaften

Diese Eigenschaften werden über ServiceBusConsumerProperties.

Konfigurierbare Verbrauchereigenschaften von spring-cloud-azure-stream-binder-servicebus:

Eigenschaft type Standard Beschreibung
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected boolean false Wenn die fehlgeschlagenen Nachrichten an den DLQ weitergeleitet werden.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Integer 1 Max. gleichzeitige Nachrichten, die der Dienstbusprozessorclient verarbeiten soll.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Integer NULL Maximale Anzahl gleichzeitiger Sitzungen, die zu einem bestimmten Zeitpunkt verarbeitet werden sollen.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolean NULL Gibt an, ob die Sitzung aktiviert ist.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Integer 0 Die Prefetch-Anzahl des Service Bus-Prozessorclients.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue Keine Der Typ der Unterwarteschlange, mit der eine Verbindung hergestellt werden soll.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Duration 5 m Die Zeitspanne, um die automatische Verlängerung der Sperre fortzusetzen.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Der Empfangsmodus des Service Bus-Prozessorclients.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolesch true Gibt an, ob Nachrichten automatisch abgleichen sollen. Wenn dieser Wert auf "false" festgelegt ist, wird eine Nachrichtenkopfzeile Checkpointer hinzugefügt, mit der Entwickler Nachrichten manuell abgleichen können.
Erweiterte Consumerkonfiguration

Die oben genannte Verbindung und die allgemeine Azure SDK-Clientkonfiguration unterstützen anpassungen für jeden Sammelordner-Consumer, den Sie mit dem Präfix spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.konfigurieren können.

Produzenteneigenschaften

Diese Eigenschaften werden über ServiceBusProducerProperties.

Konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-servicebus:

Eigenschaft type Standard Beschreibung
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolean false Switch flag for sync of producer.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout lang 10000 Timeoutwert für das Senden des Produzenten.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType NULL Service Bus-Entitätstyp des Produzenten, erforderlich für den Bindungshersteller.

Wichtig

Bei Verwendung des Bindungsherstellers muss die Eigenschaft konfiguriert spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type werden.

Erweiterte Produzentenkonfiguration

Die oben genannte Verbindung und die allgemeine Azure SDK-Clientkonfiguration unterstützen anpassungen für jeden Ordnerhersteller, den Sie mit dem Präfix spring.cloud.stream.servicebus.bindings.<binding-name>.producer.konfigurieren können.

Grundlegende Verwendung

Senden und Empfangen von Nachrichten von/an Service Bus

  1. Füllen Sie die Konfigurationsoptionen mit Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in der Datei "application.yml":

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in der Datei "application.yml ":

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in der Datei "application.yml ":

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definieren Sie Lieferanten und Verbraucher.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Unterstützung des Partitionsschlüssels

Der Ordner unterstützt die Dienstbuspartitionierung , indem die Einstellung des Partitionsschlüssels und der Sitzungs-ID im Nachrichtenkopf zugelassen wird. In diesem Abschnitt wird erläutert, wie Sie den Partitionsschlüssel für Nachrichten festlegen.

Spring Cloud Stream stellt eine SpEL-Ausdruckseigenschaft spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressiondes Partitionsschlüssels bereit. Legen Sie diese Eigenschaft z. B. als "'partitionKey-' + headers[<message-header-key>]" Kopfzeile fest, und fügen Sie eine Kopfzeile mit dem Namen "Message-header-key" hinzu. Spring Cloud Stream verwendet den Wert für diesen Header beim Auswerten des Ausdrucks, um einen Partitionsschlüssel zuzuweisen. Der folgende Code stellt einen Beispielhersteller bereit:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Sitzungsunterstützung

Der Ordner unterstützt Nachrichtensitzungen von Service Bus. Die Sitzungs-ID einer Nachricht kann über den Nachrichtenkopf festgelegt werden.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Hinweis

Gemäß der Dienstbuspartitionierung hat die Sitzungs-ID eine höhere Priorität als Partitionsschlüssel. Wenn also sowohl header ServiceBusMessageHeaders#SESSION_IDServiceBusMessageHeaders#PARTITION_KEY als auch Header festgelegt werden, wird der Wert der Sitzungs-ID schließlich verwendet, um den Wert des Partitionsschlüssels zu überschreiben.

Behandeln von Fehlermeldungen

  • Behandeln von Ausgehenden Bindungsfehlermeldungen

    Standardmäßig erstellt Spring Integration einen globalen Fehlerkanal namens errorChannel. Konfigurieren Sie den folgenden Nachrichtenendpunkt, um die Fehlermeldung für ausgehende Bindung zu behandeln.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Behandeln eingehender Bindungsfehlermeldungen

    Spring Cloud Stream Service Bus Binder unterstützt drei Lösungen zur Behandlung von Fehlern für eingehende Nachrichtenbindungen: der Sammelordnerfehlerhandler, benutzerdefinierte Fehlerkanäle und Handler.

    Sammelmappenfehlerhandler:

    Der Standardordnerfehlerhandler behandelt die eingehende Bindung. Sie verwenden diesen Handler, um fehlgeschlagene Nachrichten an die Warteschlange mit inaktiven Buchstaben zu senden, wenn spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected diese aktiviert ist. Andernfalls werden die fehlgeschlagenen Nachrichten abgebrochen. Mit Ausnahme der Konfiguration des bindungsspezifischen Fehlerkanals wird der Ordnerfehlerhandler immer wirksam, unabhängig davon, ob andere benutzerdefinierte Fehlerhandler oder Kanäle vorhanden sind.

    Fehlerkanal:

    Spring Cloud Stream stellt einen Fehlerkanal für jede eingehende Bindung bereit. Es ErrorMessage wird an den Fehlerkanal gesendet. Weitere Informationen finden Sie in der Spring Cloud Stream-Dokumentation zur Behandlung von Fehlern .

    • Standardfehlerkanal

      Sie können einen globalen Fehlerkanal verwenden, der benannt ist errorChannel , um alle eingehenden Bindungsfehlermeldungen zu nutzen. Um diese Nachrichten zu verarbeiten, konfigurieren Sie den folgenden Nachrichtenendpunkt:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Bindungsspezifischer Fehlerkanal

      Sie können einen bestimmten Fehlerkanal verwenden, um die spezifischen eingehenden Bindungsfehlermeldungen mit einer höheren Priorität zu nutzen als der Standardfehlerkanal. Um diese Nachrichten zu verarbeiten, konfigurieren Sie den folgenden Nachrichtenendpunkt:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Hinweis

      Der bindungsspezifische Fehlerkanal schließt sich gegenseitig mit anderen bereitgestellten Fehlerhandlern und Kanälen aus.

    Fehlerhandler:

    Spring Cloud Stream macht einen Mechanismus verfügbar, mit dem Sie einen benutzerdefinierten Fehlerhandler bereitstellen können, indem Sie eine ConsumerErrorMessage Instanz hinzufügen. Weitere Informationen finden Sie in der Dokumentation zu Spring Cloud Stream unter "Fehlerbehandlung ".

    Hinweis

    Wenn ein Bindungsfehlerhandler konfiguriert ist, kann er mit dem Standardfehlerkanal und dem Ordnerfehlerhandler arbeiten.

    • Bindungsstandardfehlerhandler

      Konfigurieren Sie eine einzelne Consumer Bohnen, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Standardfunktion abonniert jeden eingehenden Bindungsfehlerkanal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Sie müssen die spring.cloud.stream.default.error-handler-definition Eigenschaft auch auf den Funktionsnamen festlegen.

    • Bindungsspezifischer Fehlerhandler

      Konfigurieren Sie eine Consumer Bohnen, um die spezifischen eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Funktion abonniert den spezifischen Eingehenden Bindungsfehlerkanal mit einer höheren Priorität als der Bindungsstandardfehlerhandler.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Sie müssen die spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition Eigenschaft auch auf den Funktionsnamen festlegen.

Dienstbus-Nachrichtenkopfzeilen

Die unterstützten grundlegenden Nachrichtenkopfzeilen finden Sie im Abschnitt "Service Bus-Nachrichtenkopfzeilen" von Spring Cloud Azure-Support for Spring Integration.

Hinweis

Beim Festlegen des Partitionsschlüssels ist die Priorität des Nachrichtenkopfs höher als die Spring Cloud Stream-Eigenschaft. Wird also spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression nur wirksam, wenn keines der ServiceBusMessageHeaders#SESSION_ID Header konfiguriert ServiceBusMessageHeaders#PARTITION_KEY ist.

Unterstützung mehrerer Ordner

Verbinden ion an mehrere ServiceBus-Namespaces wird auch mithilfe mehrerer Ordner unterstützt. In diesem Beispiel wird Verbindungszeichenfolge als Beispiel akzeptiert. Anmeldeinformationen von Dienstprinzipalen und verwalteten Identitäten werden ebenfalls unterstützt, Benutzer können verwandte Eigenschaften in den Umgebungseinstellungen der einzelnen Ordner festlegen.

  1. Um mehrere Ordner von ServiceBus zu verwenden, konfigurieren Sie die folgenden Eigenschaften in der Datei application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Hinweis

    In der vorherigen Anwendungsdatei wird gezeigt, wie Sie einen einzelnen Standardabfrager für die Anwendung für alle Bindungen konfigurieren. Wenn Sie den Poller für eine bestimmte Bindung konfigurieren möchten, können Sie eine Konfiguration wie spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000z. B. verwenden.

  2. wir müssen zwei Lieferanten und zwei Verbraucher definieren

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Ressourcenbereitstellung

Der Servicebusordner unterstützt die Bereitstellung von Warteschlangen, Themen und Abonnements. Benutzer können die folgenden Eigenschaften verwenden, um die Bereitstellung zu aktivieren.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Hinweis

Die zulässigen tenant-id Werte sind: common, , organizations, consumers, oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt "Verwendet" des falschen Endpunkts (persönliche und Organisationskonten) des Fehlers AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandanten vorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

Beispiele

Weitere Informationen finden Sie im Repository "azure-spring-boot-samples " auf GitHub.