Spring Cloud Azure-támogatás a Spring Cloud Streamhez

Ez a cikk a következőre vonatkozik: ✔️ 4.14.0 ✔️ 5.8.0-s verzió

A Spring Cloud Stream egy keretrendszer, amely nagy mértékben skálázható, eseményvezérelt mikroszolgáltatásokat hoz létre, amelyek megosztott üzenetkezelési rendszerekkel vannak összekapcsolva.

A keretrendszer egy rugalmas programozási modellt biztosít, amely már meglévő és jól ismert Spring-kifejezésekre és ajánlott eljárásokra épül. Ezek az ajánlott eljárások közé tartozik az állandó pub/al szemantikák, a fogyasztói csoportok és az állapotalapú partíciók támogatása.

A binder jelenlegi implementációi a következők:

Spring Cloud Stream Binder az Azure Event Hubshoz

Fő fogalmak

Az Azure Event Hubshoz készült Spring Cloud Stream Binder biztosítja a Spring Cloud Stream-keretrendszer kötési implementációját. Ez az implementáció a Spring Integration Event Hubs-csatornaadaptereket használja az alapításakor. A tervezés szempontjából az Event Hubs hasonló a Kafkához. Az Event Hubs a Kafka API-n keresztül is elérhető. Ha a projekt szorosan függ a Kafka API-tól, kipróbálhatja az Event Hubot a Kafka API-mintával

Fogyasztói csoport

Az Event Hubs hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Bár a Kafka az összes véglegesített eltolást a közvetítőben tárolja, a manuálisan feldolgozott Event Hubs-üzenetek eltolásait kell tárolnia. Az Event Hubs SDK az ilyen eltolások Azure Storage-ban való tárolására szolgál.

Particionálás támogatása

Az Event Hubs a Kafkához hasonló fizikai partíciót biztosít. A Kafka által a felhasználók és a partíciók közötti automatikus újraegyensúlyozástól eltérően az Event Hubs egyfajta megelőző módot biztosít. A tárfiók bérletként működik annak meghatározásához, hogy melyik felhasználó melyik partíció tulajdonosa. Amikor egy új felhasználó elindul, megpróbál ellopni néhány partíciót a legnagyobb terhelésű felhasználóktól a számítási feladatok egyensúlyának elérése érdekében.

A terheléselosztási stratégia megadásához meg kell adni a spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* tulajdonságokat. További információ: Fogyasztói tulajdonságok szakasz.

Batch fogyasztói támogatás

A Spring Cloud Azure Stream Event Hubs binder támogatja a Spring Cloud Stream Batch fogyasztói funkcióját.

A batch-consumer mód használatához állítsa a tulajdonságot a spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode következőre true: . Ha engedélyezve van, a rendszer a kötegelt események listájának hasznos adatait tartalmazó üzenetet fogad, és átadja a Consumer függvénynek. A rendszer az egyes üzenetfejléceket is listává alakítja, amelyek tartalma az egyes eseményekhez tartozó fejlécérték. A partícióazonosító, az ellenőrzőpont és az utolsó lekérdezett tulajdonságok közös fejlécei egyetlen értékként jelennek meg, mivel az események teljes kötegének ugyanaz az értéke. További információkért tekintse meg a Spring Cloud Azure-támogatás a Spring Integrationhez készült Event Hubs üzenetfejléceit ismertető szakaszt.

Megjegyzés:

Az ellenőrzőpont fejléce csak az MANUAL ellenőrzőpont mód használata esetén létezik.

A kötegfelhasználók ellenőrzőpontozása két módot támogat: BATCH és MANUAL. BATCH A mód egy automatikus ellenőrzőpontozási mód, a kötőanyag fogadása után az események teljes kötegének közös ellenőrzéséhez. MANUAL módban a felhasználók ellenőrzik az eseményeket. Használat esetén a rendszer átadja az Checkpointer üzenet fejlécét, és a felhasználók használhatják az ellenőrzőpont-ellenőrzést.

A köteg méretét az előtaggal spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.rendelkező tulajdonságok és max-wait-time tulajdonságok max-size beállításával adhatja meg. A max-size tulajdonság szükséges, és a max-wait-time tulajdonság nem kötelező. További információ: Fogyasztói tulajdonságok szakasz.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Másik lehetőségként használhatja a Spring Cloud Azure Stream Event Hubs Startert is, ahogyan az a Maven esetében az alábbi példában is látható:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfiguráció

A kötőanyag a konfigurációs beállítások alábbi három részét biztosítja:

Csatlakozás ion konfigurációs tulajdonságai

Ez a szakasz az Azure Event Hubshoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Megjegyzés:

Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

Csatlakozás spring-cloud-azure-stream-binder-eventhubs konfigurálható tulajdonságai:

Tulajdonság Type Description
spring.cloud.azure.eventhubs.enabled Logikai Az Azure Event Hubs engedélyezése.
spring.cloud.azure.eventhubs.connection-string Sztring Event Hubs-névtér kapcsolati sztring érték.
spring.cloud.azure.eventhubs.namespace Sztring Event Hubs-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie
spring.cloud.azure.eventhubs.domain-name Sztring Egy Azure Event Hubs-névtérérték tartományneve.
spring.cloud.azure.eventhubs.custom-endpoint-address Sztring Egyéni végpont címe.

Tipp.

Az Azure Service SDK gyakori konfigurációs beállításai a Spring Cloud Azure Stream Event Hubs iratgyűjtőhöz is konfigurálhatók. A támogatott konfigurációs beállítások a Spring Cloud Azure-konfigurációban jelennek meg, és konfigurálhatók az egyesített előtaggal spring.cloud.azure. vagy a spring.cloud.azure.eventhubs..

A kötőanyag alapértelmezés szerint támogatja a Spring Could Azure Resource Managert is. A kapcsolati sztring a kapcsolódó szerepkörökkel nem rendelkező biztonsági tagokkal Data való lekéréséről a Spring Could Azure Resource Manager alapszintű használati szakaszában olvashat.

Ellenőrzőpont konfigurációs tulajdonságai

Ez a szakasz a Storage Blobs szolgáltatás konfigurációs beállításait tartalmazza, amely a partíció tulajdonjogának és ellenőrzőpont-adatainak megőrzésére szolgál.

Megjegyzés:

A 4.0.0-s verzióból, amikor a spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tulajdonság nincs manuálisan engedélyezve, a rendszer nem hoz létre automatikusan egy Storage-tárolót a spring.cloud.stream.bindings.binding-name.destination névvel.

A spring-cloud-azure-stream-binder-eventhubs konfigurálható tulajdonságainak ellenőrzése:

Tulajdonság Type Description
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Logikai Engedélyezi-e a tárolók létrehozását, ha nem létezik.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Sztring A tárfiók neve.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Sztring Tárfiók hozzáférési kulcsa.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Sztring Tároló neve.

Tipp.

Az Azure Service SDK gyakori konfigurációs beállításai a Storage Blob CheckPoint Store-hoz is konfigurálhatók. A támogatott konfigurációs beállítások a Spring Cloud Azure-konfigurációban jelennek meg, és konfigurálhatók az egyesített előtaggal spring.cloud.azure. vagy a spring.cloud.azure.eventhubs.processor.checkpoint-store.

Az Azure Event Hubs-kötés konfigurációs tulajdonságai

A következő lehetőségek négy részre vannak osztva: Fogyasztói tulajdonságok, Speciális fogyasztói konfigurációk, Gyártói tulajdonságok és Speciális gyártói konfigurációk.

Fogyasztói tulajdonságok

Ezek a tulajdonságok a következőn keresztül EventHubsConsumerPropertiesérhetők el: .

A spring-cloud-azure-stream-binder-eventhubs fogyasztói konfigurálható tulajdonságai:

Tulajdonság Type Description
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Ellenőrzőpont mód, amelyet akkor használnak, amikor a fogyasztó dönti el, hogyan kell ellenőrizni az ellenőrzőpont-üzenetet
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Egész Az egyes partíciók üzenetmennyiségét határozza meg egy ellenőrzőponthoz. Csak ellenőrzőpont mód használata esetén PARTITION_COUNT lép érvénybe.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Duration Egy ellenőrzőponthoz megadott időintervallumot határozza meg. Csak ellenőrzőpont mód használata esetén TIME lép érvénybe.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Egész A kötegben lévő események maximális száma. A kötegfelhasználói módhoz szükséges.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Duration A köteghasználat maximális időtartama. Csak akkor lép érvénybe, ha a batch-consumer mód engedélyezve van, és nem kötelező.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Duration A frissítés időközi időtartama.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy A terheléselosztási stratégia.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Duration Az az időtartam, amely után a partíció tulajdonjoga lejár.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Logikai Azt jelzi, hogy az eseményfeldolgozónak adatokat kell-e kérnie a társított partíció utolsó lekéréses eseményéről, és nyomon kell-e követnie ezeket az információkat az események fogadása során.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Egész A fogyasztó által azon események számának szabályozására használt szám, amelyeket az Event Hub-fogyasztó helyileg aktívan fogad és vár.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Leképezés a kulcssal partícióazonosítóként, és a StartPositionProperties Az egyes partíciókhoz használandó eseményhelyzetet tartalmazó térkép, ha a partíció ellenőrzőpontja nem létezik az Ellenőrzőpont-tárolóban. Ez a térkép a partícióazonosítóból van kulcsra kapcsolva.

Megjegyzés:

A initial-partition-event-position konfiguráció elfogadja az egyes eseményközpontok map kezdeti pozíciójának megadását. Így a kulcs a partícióazonosító, és az StartPositionProperties érték tartalmazza az eltolás tulajdonságait, a sorszámot, a lekérdezett dátumidőt és a befogadót. Beállíthatja például úgy, hogy

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Speciális fogyasztói konfiguráció

A fenti kapcsolat, ellenőrzőpont és gyakori Azure SDK-ügyfélkonfiguráció támogatja az egyes iratgyűjtő-felhasználók testreszabását, amelyet az előtaggal spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.konfigurálhat.

Gyártói tulajdonságok

Ezek a tulajdonságok a következőn keresztül EventHubsProducerPropertiesérhetők el: .

A spring-cloud-azure-stream-binder-eventhubs gyártó által konfigurálható tulajdonságai:

Tulajdonság Type Description
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync Logikai A gyártó szinkronizálásának kapcsolójelzője. Ha igaz, a gyártó a küldési művelet után várja meg a választ.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout hosszú A küldési művelet után a válaszra váró idő. Csak akkor lép érvénybe, ha egy szinkronizálási gyártó engedélyezve van.
Speciális gyártókonfiguráció

A fenti kapcsolat és az Azure SDK-ügyfélkonfiguráció támogatja az egyes iratgyűjtő-előállítók testreszabását, amelyeket az előtaggal spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.konfigurálhat.

Alapszintű használat

Üzenetek küldése és fogadása az Event Hubsból vagy az eseményközpontba

  1. Adja meg a konfigurációs beállításokat hitelesítő adatokkal.

    • Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

  • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Szállító és fogyasztó meghatározása.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Particionálás támogatása

A rendszer létrehoz egy PartitionSupplier felhasználó által megadott partícióadatokat az elküldendő üzenet partícióadatainak konfigurálásához. Az alábbi folyamatábra a partícióazonosító és a kulcs különböző prioritásainak beszerzését mutatja be:

Diagram showing a flowchart of the partitioning support process.

Batch fogyasztói támogatás

  1. Adja meg a kötegkonfigurációs beállításokat az alábbi példában látható módon:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Szállító és fogyasztó meghatározása.

    Az ellenőrzőpont-módban BATCHaz alábbi kód használatával küldhet üzeneteket, és kötegekben használhatja azokat.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Az ellenőrzőpont-módban MANUALaz alábbi kóddal küldhet üzeneteket, és kötegekben használhatja a felhasználást/ellenőrzőpontot.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Megjegyzés:

Kötegfelhasználó módban a Spring Cloud Stream-iratgyűjtő alapértelmezett tartalomtípusa az application/json, ezért győződjön meg arról, hogy az üzenet hasznos adatai igazodnak a tartalomtípushoz. Ha például az alapértelmezett tartalomtípust application/json használja az üzenetek hasznos adatokkal String való fogadásához, a hasznos adatokat dupla idézőjelekkel kell körülvenni JSON Stringaz eredeti String szöveghez. Míg a text/plain tartalom típusa, ez lehet egy String objektum közvetlenül. További információ: Spring Cloud Stream tartalomtípus egyeztetése.

Hibaüzenetek kezelése

  • Kimenő kötési hibaüzenetek kezelése

    Alapértelmezés szerint a Spring Integration létrehoz egy globális hibacsatornát .errorChannel Konfigurálja a következő üzenetvégpontot a kimenő kötési hibaüzenetek kezeléséhez:

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Bejövő kötési hibaüzenetek kezelése

    A Spring Cloud Stream Event Hubs Binder két megoldást támogat a bejövő üzenetkötések hibáinak kezelésére: egyéni hibacsatornák és kezelők.

    Hibacsatorna:

    A Spring Cloud Stream hibacsatornát biztosít minden bejövő kötéshez. A ErrorMessage rendszer egy hibaüzenetet küld a hibacsatornának. További információ: Hibák kezelése a Spring Cloud Stream dokumentációjában.

    • Alapértelmezett hibacsatorna

      Egy globális hibacsatornát használhat, amely az errorChannel összes bejövő kötési hibaüzenetet felhasználja. Az üzenetek kezeléséhez konfigurálja a következő üzenetvégpontot:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Kötésspecifikus hibacsatorna

      Egy adott hibacsatornával az alapértelmezett hibacsatornánál magasabb prioritású bejövő kötési hibaüzeneteket használhat. Az üzenetek kezeléséhez konfigurálja a következő üzenetvégpontot:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Megjegyzés:

      A kötésspecifikus hibacsatorna kölcsönösen kizárja a többi megadott hibakezelőt és csatornát.

    Hibakezelő:

    A Spring Cloud Stream egy olyan mechanizmust tesz elérhetővé, amellyel egyéni hibakezelőt biztosíthat egy Consumer példányokat elfogadó ErrorMessage elem hozzáadásával. További információ: Hibakezelés a Spring Cloud Stream dokumentációjában.

    Megjegyzés:

    Ha bármilyen kötési hibakezelő konfigurálva van, az az alapértelmezett hibacsatornával is működik.

    • Kötés alapértelmezett hibakezelője

      Konfiguráljon egyetlen Consumer babot az összes bejövő kötési hibaüzenet felhasználásához. Az alábbi alapértelmezett függvény feliratkozik az egyes bejövő kötési hibacsatornákra:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      A tulajdonságot a spring.cloud.stream.default.error-handler-definition függvény nevére is be kell állítania.

    • Kötésspecifikus hibakezelő

      Konfiguráljon egy babot Consumer az adott bejövő kötési hibaüzenetek felhasználásához. Az alábbi függvény feliratkozik az adott bejövő kötési hibacsatornára, és magasabb prioritással rendelkezik, mint a kötés alapértelmezett hibakezelője:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      A tulajdonságot a spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition függvény nevére is be kell állítania.

Event Hubs-üzenetfejlécek

A támogatott alapvető üzenetfejléceket a Spring Cloud Azure-támogatás Event Hubs-üzenetfejlécek szakaszában találja a Spring Integrationhez.

Több kötőanyag támogatása

Csatlakozás több Event Hubs-névtérre való átengedést több kötőanyag is támogatja. Ez a minta példaként egy kapcsolati sztring vesz igénybe. A szolgáltatásnevek és a felügyelt identitások hitelesítő adatai is támogatottak. A kapcsolódó tulajdonságokat az egyes iratgyűjtők környezeti beállításaiban állíthatja be.

  1. Ha több kötést szeretne használni az Event Hubsban, konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Megjegyzés:

    Az előző alkalmazásfájl bemutatja, hogyan konfigurálhat egyetlen alapértelmezett lekérdezést az alkalmazáshoz az összes kötésre. Ha egy adott kötéshez szeretné konfigurálni a lekérdezést, használhat olyan konfigurációt, mint a spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Két szállítót és két fogyasztót kell definiálnunk:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Az erőforrások kiépítése

Az Event Hubs-iratgyűjtő támogatja az eseményközpont és a fogyasztói csoport kiépítését, a felhasználók az alábbi tulajdonságok használatával engedélyezhetik a kiépítést.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

Samples

További információ: Azure-Spring-Boot-Samples adattár a GitHubon.

Spring Cloud Stream Binder az Azure Service Bushoz

Fő fogalmak

Az Azure Service Bushoz készült Spring Cloud Stream Binder biztosítja a Spring Cloud Stream-keretrendszer kötési implementációt. Ez az implementáció a Spring Integration Service Bus-csatornaadaptereket használja az alapításakor.

Ütemezett üzenet

Ez a kötés támogatja az üzenetek küldését egy témakörbe késleltetett feldolgozás céljából. A felhasználók ezredmásodpercben küldhetnek ütemezett üzeneteket az üzenet fejlécének x-delay kifejezésével. Az üzenet ezredmásodperc után x-delay jelenik meg a megfelelő témakörökben.

Fogyasztói csoport

A Service Bus-témakör hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Ez a kötőanyag egy témakörre támaszkodik Subscription , amely fogyasztói csoportként működik.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Másik lehetőségként használhatja a Spring Cloud Azure Stream Service Bus Startert is, ahogyan az a Maven esetében az alábbi példában is látható:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfiguráció

Az iratgyűjtő a konfigurációs beállítások alábbi két részét biztosítja:

Csatlakozás ion konfigurációs tulajdonságai

Ez a szakasz az Azure Service Bushoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Megjegyzés:

Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

Csatlakozás spring-cloud-azure-stream-binder-servicebus konfigurálható tulajdonságai:

Tulajdonság Type Description
spring.cloud.azure.servicebus.enabled Logikai Az Azure Service Bus engedélyezése.
spring.cloud.azure.servicebus.connection-string Sztring Service Bus-névtér kapcsolati sztring érték.
spring.cloud.azure.servicebus.namespace Sztring Service Bus-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie
spring.cloud.azure.servicebus.domain-name Sztring Egy Azure Service Bus-névtérérték tartományneve.

Megjegyzés:

Az Azure Service SDK gyakori konfigurációs beállításai a Spring Cloud Azure Stream Service Bus-iratgyűjtőhöz is konfigurálhatók. A támogatott konfigurációs beállítások a Spring Cloud Azure-konfigurációban jelennek meg, és konfigurálhatók az egyesített előtaggal spring.cloud.azure. vagy a spring.cloud.azure.servicebus..

A kötőanyag alapértelmezés szerint támogatja a Spring Could Azure Resource Managert is. A kapcsolati sztring a kapcsolódó szerepkörökkel nem rendelkező biztonsági tagokkal Data való lekéréséről a Spring Could Azure Resource Manager alapszintű használati szakaszában olvashat.

Az Azure Service Bus kötéskonfigurációs tulajdonságai

A következő lehetőségek négy részre vannak osztva: Fogyasztói tulajdonságok, Speciális fogyasztói konfigurációk, Gyártói tulajdonságok és Speciális gyártói konfigurációk.

Fogyasztói tulajdonságok

Ezek a tulajdonságok a következőn keresztül ServiceBusConsumerPropertiesérhetők el: .

A spring-cloud-azure-stream-binder-servicebus fogyasztói konfigurálható tulajdonságai:

Tulajdonság Type Alapértelmezett Leírás
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected Logikai false Ha a sikertelen üzeneteket a rendszer a DLQ-hoz irányítja.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Egész 1 A Service Bus processzorügyfél által feldolgozandó egyidejű üzenetek maximális beállítása.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Egész null Egyszerre feldolgozandó munkamenetek maximális száma.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Logikai null Engedélyezve van-e a munkamenet.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Egész 0 A Service Bus processzorügyfél előzetes száma.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue Nincs Az alsor típusa, amelyhez csatlakozni szeretne.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Duration 5m A zárolás automatikus megújításának folytatásához szükséges idő.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock A Service Bus processzorügyfél fogadási módja.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Logikai true Az üzenetek automatikus rendezése. Ha hamisként van beállítva, a rendszer hozzáad egy üzenetfejlécet Checkpointer , hogy a fejlesztők manuálisan rendezhessék az üzeneteket.
Speciális fogyasztói konfiguráció

A fenti kapcsolat és a gyakori Azure SDK-ügyfélkonfiguráció támogatja az egyes iratgyűjtő-felhasználók testreszabását, amelyeket az előtaggal spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.konfigurálhat.

Gyártói tulajdonságok

Ezek a tulajdonságok a következőn keresztül ServiceBusProducerPropertiesérhetők el: .

A spring-cloud-azure-stream-binder-servicebus gyártó által konfigurálható tulajdonságai:

Tulajdonság Type Alapértelmezett Leírás
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync Logikai false Kapcsolójelző a gyártó szinkronizálásához.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout hosszú 10000 Időtúllépési érték a gyártó elküldéséhez.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType null A gyártó Service Bus-entitástípusa, amely a kötelező gyártóhoz szükséges.

Fontos

A kötésgyártó használatakor konfigurálni kell a tulajdonságát spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type .

Speciális gyártókonfiguráció

A fenti kapcsolat és az Azure SDK-ügyfélkonfiguráció támogatja az egyes iratgyűjtő-előállítók testreszabását, amelyeket az előtaggal spring.cloud.stream.servicebus.bindings.<binding-name>.producer.konfigurálhat.

Alapszintű használat

Üzenetek küldése és fogadása a Service Busból vagy a Service Busba

  1. Adja meg a konfigurációs beállításokat hitelesítő adatokkal.

    • Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

  • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Szállító és fogyasztó meghatározása.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Partíciókulcs támogatása

A kötés támogatja a Service Bus particionálását azáltal, hogy engedélyezi a partíciókulcs és a munkamenet-azonosító beállítását az üzenetfejlécben. Ez a szakasz bemutatja, hogyan állíthat be partíciókulcsot az üzenetekhez.

A Spring Cloud Stream egy partíciókulcs SpEL-kifejezéstulajdonságát spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionbiztosítja. Például állítsa be ezt a tulajdonságot, "'partitionKey-' + headers[<message-header-key>]" és adjon hozzá egy message-header-key nevű fejlécet. A Spring Cloud Stream a fejléc értékét használja a kifejezés kiértékelésekor egy partíciókulcs hozzárendeléséhez. Az alábbi kód egy példakészítőt tartalmaz:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Munkamenet-támogatás

A kötés támogatja a Service Bus üzenet-munkameneteit . Az üzenet munkamenet-azonosítója az üzenet fejlécén keresztül állítható be.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Megjegyzés:

A Service Bus particionálása szerint a munkamenet-azonosító magasabb prioritással rendelkezik, mint a partíciókulcs. Így ha mindkét ServiceBusMessageHeaders#SESSION_ID fejléc ServiceBusMessageHeaders#PARTITION_KEY be van állítva, a munkamenet-azonosító értéke idővel felülírja a partíciókulcs értékét.

Hibaüzenetek kezelése

  • Kimenő kötési hibaüzenetek kezelése

    Alapértelmezés szerint a Spring Integration létrehoz egy globális hibacsatornát .errorChannel Konfigurálja a következő üzenetvégpontot a kimenő kötési hibaüzenet kezeléséhez.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Bejövő kötési hibaüzenetek kezelése

    A Spring Cloud Stream Service Bus Binder három megoldást támogat a bejövő üzenetkötések hibáinak kezelésére: a binder hibakezelője, az egyéni hibacsatornák és a kezelők.

    Binder hibakezelő:

    Az alapértelmezett kötéskezelő hibakezelő kezeli a bejövő kötést. Ezzel a kezelő használatával sikertelen üzeneteket küldhet a kézbesítetlen levelek üzenetsorába, ha spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected engedélyezve van. Ellenkező esetben a sikertelen üzenetek elhagyva lesznek. A kötésspecifikus hibacsatorna konfigurálásának kivételével a kötéskezelő hibakezelő mindig érvénybe lép, függetlenül attól, hogy vannak-e más egyéni hibakezelők vagy csatornák.

    Hibacsatorna:

    A Spring Cloud Stream hibacsatornát biztosít minden bejövő kötéshez. A ErrorMessage rendszer egy hibaüzenetet küld a hibacsatornának. További információ: Hibák kezelése a Spring Cloud Stream dokumentációjában.

    • Alapértelmezett hibacsatorna

      Egy globális hibacsatornát használhat, amely az errorChannel összes bejövő kötési hibaüzenetet felhasználja. Az üzenetek kezeléséhez konfigurálja a következő üzenetvégpontot:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Kötésspecifikus hibacsatorna

      Egy adott hibacsatornával az alapértelmezett hibacsatornánál magasabb prioritású bejövő kötési hibaüzeneteket használhat. Az üzenetek kezeléséhez konfigurálja a következő üzenetvégpontot:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Megjegyzés:

      A kötésspecifikus hibacsatorna kölcsönösen kizárja a többi megadott hibakezelőt és csatornát.

    Hibakezelő:

    A Spring Cloud Stream egy olyan mechanizmust tesz elérhetővé, amellyel egyéni hibakezelőt biztosíthat egy Consumer példányokat elfogadó ErrorMessage elem hozzáadásával. További információ: Hibakezelés a Spring Cloud Stream dokumentációjában.

    Megjegyzés:

    Ha bármilyen kötési hibakezelő konfigurálva van, az az alapértelmezett hibacsatornával és a kötéskezelő hibakezelővel is használható.

    • Kötés alapértelmezett hibakezelője

      Konfiguráljon egyetlen Consumer babot az összes bejövő kötési hibaüzenet felhasználásához. Az alábbi alapértelmezett függvény feliratkozik az egyes bejövő kötési hibacsatornákra:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      A tulajdonságot a spring.cloud.stream.default.error-handler-definition függvény nevére is be kell állítania.

    • Kötésspecifikus hibakezelő

      Konfiguráljon egy babot Consumer az adott bejövő kötési hibaüzenetek felhasználásához. Az alábbi függvény előfizet az adott bejövő kötési hibacsatornára, amelynek prioritása magasabb, mint a kötés alapértelmezett hibakezelője.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      A tulajdonságot a spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition függvény nevére is be kell állítania.

Service Bus-üzenetfejlécek

A támogatott alapszintű üzenetfejlécekért tekintse meg a Spring Cloud Azure-támogatás a Spring Integration Service Bus-üzenetfejléceit ismertető szakaszt.

Megjegyzés:

A partíciókulcs beállításakor az üzenetfejléc prioritása magasabb, mint a Spring Cloud Stream tulajdonság. Így spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression csak akkor lép érvénybe, ha egyik ServiceBusMessageHeaders#SESSION_IDServiceBusMessageHeaders#PARTITION_KEY fejléc sincs konfigurálva.

Több kötőanyag támogatása

Csatlakozás több Service Bus-névtérre való átengedést több kötőanyag is támogatja. Ez a minta példaként kapcsolati sztring. A szolgáltatásnevek és a felügyelt identitások hitelesítő adatai is támogatottak, a felhasználók a kapcsolódó tulajdonságokat az egyes iratgyűjtők környezeti beállításaiban állíthatják be.

  1. A ServiceBus több kötésének használatához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Megjegyzés:

    Az előző alkalmazásfájl bemutatja, hogyan konfigurálhat egyetlen alapértelmezett lekérdezést az alkalmazáshoz az összes kötésre. Ha egy adott kötéshez szeretné konfigurálni a lekérdezést, használhat olyan konfigurációt, mint a spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. két beszállítót és két fogyasztót kell meghatároznunk

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Az erőforrások kiépítése

A Service Bus binder támogatja az üzenetsor, a témakör és az előfizetés kiépítését, a felhasználók az alábbi tulajdonságok használatával engedélyezhetik a kiépítést.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

Samples

További információ: Azure-Spring-Boot-Samples adattár a GitHubon.