Spring Cloud Azure-támogatás a Spring Cloud Streamhez
Ez a cikk a következőre vonatkozik: ✔️ 4.14.0 ✔️ 5.8.0-s verzió
A Spring Cloud Stream egy keretrendszer, amely nagy mértékben skálázható, eseményvezérelt mikroszolgáltatásokat hoz létre, amelyek megosztott üzenetkezelési rendszerekkel vannak összekapcsolva.
A keretrendszer egy rugalmas programozási modellt biztosít, amely már meglévő és jól ismert Spring-kifejezésekre és ajánlott eljárásokra épül. Ezek az ajánlott eljárások közé tartozik az állandó pub/al szemantikák, a fogyasztói csoportok és az állapotalapú partíciók támogatása.
A binder jelenlegi implementációi a következők:
spring-cloud-azure-stream-binder-eventhubs
– további információ: Spring Cloud Stream Binder for Azure Event Hubsspring-cloud-azure-stream-binder-servicebus
– további információ: Spring Cloud Stream Binder for Azure Service Bus
Spring Cloud Stream Binder az Azure Event Hubshoz
Fő fogalmak
Az Azure Event Hubshoz készült Spring Cloud Stream Binder biztosítja a Spring Cloud Stream-keretrendszer kötési implementációját. Ez az implementáció a Spring Integration Event Hubs-csatornaadaptereket használja az alapításakor. A tervezés szempontjából az Event Hubs hasonló a Kafkához. Az Event Hubs a Kafka API-n keresztül is elérhető. Ha a projekt szorosan függ a Kafka API-tól, kipróbálhatja az Event Hubot a Kafka API-mintával
Fogyasztói csoport
Az Event Hubs hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Bár a Kafka az összes véglegesített eltolást a közvetítőben tárolja, a manuálisan feldolgozott Event Hubs-üzenetek eltolásait kell tárolnia. Az Event Hubs SDK az ilyen eltolások Azure Storage-ban való tárolására szolgál.
Particionálás támogatása
Az Event Hubs a Kafkához hasonló fizikai partíciót biztosít. A Kafka által a felhasználók és a partíciók közötti automatikus újraegyensúlyozástól eltérően az Event Hubs egyfajta megelőző módot biztosít. A tárfiók bérletként működik annak meghatározásához, hogy melyik felhasználó melyik partíció tulajdonosa. Amikor egy új felhasználó elindul, megpróbál ellopni néhány partíciót a legnagyobb terhelésű felhasználóktól a számítási feladatok egyensúlyának elérése érdekében.
A terheléselosztási stratégia megadásához meg kell adni a spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
tulajdonságokat. További információ: Fogyasztói tulajdonságok szakasz.
Batch fogyasztói támogatás
A Spring Cloud Azure Stream Event Hubs binder támogatja a Spring Cloud Stream Batch fogyasztói funkcióját.
A batch-consumer mód használatához állítsa a tulajdonságot a spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
következőre true
: . Ha engedélyezve van, a rendszer a kötegelt események listájának hasznos adatait tartalmazó üzenetet fogad, és átadja a Consumer
függvénynek. A rendszer az egyes üzenetfejléceket is listává alakítja, amelyek tartalma az egyes eseményekhez tartozó fejlécérték. A partícióazonosító, az ellenőrzőpont és az utolsó lekérdezett tulajdonságok közös fejlécei egyetlen értékként jelennek meg, mivel az események teljes kötegének ugyanaz az értéke. További információkért tekintse meg a Spring Cloud Azure-támogatás a Spring Integrationhez készült Event Hubs üzenetfejléceit ismertető szakaszt.
Megjegyzés:
Az ellenőrzőpont fejléce csak az MANUAL
ellenőrzőpont mód használata esetén létezik.
A kötegfelhasználók ellenőrzőpontozása két módot támogat: BATCH
és MANUAL
. BATCH
A mód egy automatikus ellenőrzőpontozási mód, a kötőanyag fogadása után az események teljes kötegének közös ellenőrzéséhez. MANUAL
módban a felhasználók ellenőrzik az eseményeket. Használat esetén a rendszer átadja az Checkpointer
üzenet fejlécét, és a felhasználók használhatják az ellenőrzőpont-ellenőrzést.
A köteg méretét az előtaggal spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
rendelkező tulajdonságok és max-wait-time
tulajdonságok max-size
beállításával adhatja meg. A max-size
tulajdonság szükséges, és a max-wait-time
tulajdonság nem kötelező. További információ: Fogyasztói tulajdonságok szakasz.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Másik lehetőségként használhatja a Spring Cloud Azure Stream Event Hubs Startert is, ahogyan az a Maven esetében az alábbi példában is látható:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfiguráció
A kötőanyag a konfigurációs beállítások alábbi három részét biztosítja:
Csatlakozás ion konfigurációs tulajdonságai
Ez a szakasz az Azure Event Hubshoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Megjegyzés:
Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
Csatlakozás spring-cloud-azure-stream-binder-eventhubs konfigurálható tulajdonságai:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.enabled | Logikai | Az Azure Event Hubs engedélyezése. |
spring.cloud.azure.eventhubs.connection-string | Sztring | Event Hubs-névtér kapcsolati sztring érték. |
spring.cloud.azure.eventhubs.namespace | Sztring | Event Hubs-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie |
spring.cloud.azure.eventhubs.domain-name | Sztring | Egy Azure Event Hubs-névtérérték tartományneve. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Sztring | Egyéni végpont címe. |
Tipp.
Az Azure Service SDK gyakori konfigurációs beállításai a Spring Cloud Azure Stream Event Hubs iratgyűjtőhöz is konfigurálhatók. A támogatott konfigurációs beállítások a Spring Cloud Azure-konfigurációban jelennek meg, és konfigurálhatók az egyesített előtaggal spring.cloud.azure.
vagy a spring.cloud.azure.eventhubs.
.
A kötőanyag alapértelmezés szerint támogatja a Spring Could Azure Resource Managert is. A kapcsolati sztring a kapcsolódó szerepkörökkel nem rendelkező biztonsági tagokkal Data
való lekéréséről a Spring Could Azure Resource Manager alapszintű használati szakaszában olvashat.
Ellenőrzőpont konfigurációs tulajdonságai
Ez a szakasz a Storage Blobs szolgáltatás konfigurációs beállításait tartalmazza, amely a partíció tulajdonjogának és ellenőrzőpont-adatainak megőrzésére szolgál.
Megjegyzés:
A 4.0.0-s verzióból, amikor a spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tulajdonság nincs manuálisan engedélyezve, a rendszer nem hoz létre automatikusan egy Storage-tárolót a spring.cloud.stream.bindings.binding-name.destination névvel.
A spring-cloud-azure-stream-binder-eventhubs konfigurálható tulajdonságainak ellenőrzése:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Logikai | Engedélyezi-e a tárolók létrehozását, ha nem létezik. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Sztring | A tárfiók neve. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Sztring | Tárfiók hozzáférési kulcsa. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Sztring | Tároló neve. |
Tipp.
Az Azure Service SDK gyakori konfigurációs beállításai a Storage Blob CheckPoint Store-hoz is konfigurálhatók. A támogatott konfigurációs beállítások a Spring Cloud Azure-konfigurációban jelennek meg, és konfigurálhatók az egyesített előtaggal spring.cloud.azure.
vagy a spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Az Azure Event Hubs-kötés konfigurációs tulajdonságai
A következő lehetőségek négy részre vannak osztva: Fogyasztói tulajdonságok, Speciális fogyasztói konfigurációk, Gyártói tulajdonságok és Speciális gyártói konfigurációk.
Fogyasztói tulajdonságok
Ezek a tulajdonságok a következőn keresztül EventHubsConsumerProperties
érhetők el: .
A spring-cloud-azure-stream-binder-eventhubs fogyasztói konfigurálható tulajdonságai:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Ellenőrzőpont mód, amelyet akkor használnak, amikor a fogyasztó dönti el, hogyan kell ellenőrizni az ellenőrzőpont-üzenetet |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Egész | Az egyes partíciók üzenetmennyiségét határozza meg egy ellenőrzőponthoz. Csak ellenőrzőpont mód használata esetén PARTITION_COUNT lép érvénybe. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Duration | Egy ellenőrzőponthoz megadott időintervallumot határozza meg. Csak ellenőrzőpont mód használata esetén TIME lép érvénybe. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Egész | A kötegben lévő események maximális száma. A kötegfelhasználói módhoz szükséges. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Duration | A köteghasználat maximális időtartama. Csak akkor lép érvénybe, ha a batch-consumer mód engedélyezve van, és nem kötelező. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Duration | A frissítés időközi időtartama. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | A terheléselosztási stratégia. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Duration | Az az időtartam, amely után a partíció tulajdonjoga lejár. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Logikai | Azt jelzi, hogy az eseményfeldolgozónak adatokat kell-e kérnie a társított partíció utolsó lekéréses eseményéről, és nyomon kell-e követnie ezeket az információkat az események fogadása során. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Egész | A fogyasztó által azon események számának szabályozására használt szám, amelyeket az Event Hub-fogyasztó helyileg aktívan fogad és vár. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Leképezés a kulcssal partícióazonosítóként, és a StartPositionProperties |
Az egyes partíciókhoz használandó eseményhelyzetet tartalmazó térkép, ha a partíció ellenőrzőpontja nem létezik az Ellenőrzőpont-tárolóban. Ez a térkép a partícióazonosítóból van kulcsra kapcsolva. |
Megjegyzés:
A initial-partition-event-position
konfiguráció elfogadja az egyes eseményközpontok map
kezdeti pozíciójának megadását. Így a kulcs a partícióazonosító, és az StartPositionProperties
érték tartalmazza az eltolás tulajdonságait, a sorszámot, a lekérdezett dátumidőt és a befogadót. Beállíthatja például úgy, hogy
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Speciális fogyasztói konfiguráció
A fenti kapcsolat, ellenőrzőpont és gyakori Azure SDK-ügyfélkonfiguráció támogatja az egyes iratgyűjtő-felhasználók testreszabását, amelyet az előtaggal spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
konfigurálhat.
Gyártói tulajdonságok
Ezek a tulajdonságok a következőn keresztül EventHubsProducerProperties
érhetők el: .
A spring-cloud-azure-stream-binder-eventhubs gyártó által konfigurálható tulajdonságai:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | Logikai | A gyártó szinkronizálásának kapcsolójelzője. Ha igaz, a gyártó a küldési művelet után várja meg a választ. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | hosszú | A küldési művelet után a válaszra váró idő. Csak akkor lép érvénybe, ha egy szinkronizálási gyártó engedélyezve van. |
Speciális gyártókonfiguráció
A fenti kapcsolat és az Azure SDK-ügyfélkonfiguráció támogatja az egyes iratgyűjtő-előállítók testreszabását, amelyeket az előtaggal spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
konfigurálhat.
Alapszintű használat
Üzenetek küldése és fogadása az Event Hubsból vagy az eseményközpontba
Adja meg a konfigurációs beállításokat hitelesítő adatokkal.
Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Szállító és fogyasztó meghatározása.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Particionálás támogatása
A rendszer létrehoz egy PartitionSupplier
felhasználó által megadott partícióadatokat az elküldendő üzenet partícióadatainak konfigurálásához. Az alábbi folyamatábra a partícióazonosító és a kulcs különböző prioritásainak beszerzését mutatja be:
Batch fogyasztói támogatás
Adja meg a kötegkonfigurációs beállításokat az alábbi példában látható módon:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Szállító és fogyasztó meghatározása.
Az ellenőrzőpont-módban
BATCH
az alábbi kód használatával küldhet üzeneteket, és kötegekben használhatja azokat.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Az ellenőrzőpont-módban
MANUAL
az alábbi kóddal küldhet üzeneteket, és kötegekben használhatja a felhasználást/ellenőrzőpontot.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Megjegyzés:
Kötegfelhasználó módban a Spring Cloud Stream-iratgyűjtő alapértelmezett tartalomtípusa az application/json
, ezért győződjön meg arról, hogy az üzenet hasznos adatai igazodnak a tartalomtípushoz. Ha például az alapértelmezett tartalomtípust application/json
használja az üzenetek hasznos adatokkal String
való fogadásához, a hasznos adatokat dupla idézőjelekkel kell körülvenni JSON String
az eredeti String
szöveghez. Míg a text/plain
tartalom típusa, ez lehet egy String
objektum közvetlenül. További információ: Spring Cloud Stream tartalomtípus egyeztetése.
Hibaüzenetek kezelése
Kimenő kötési hibaüzenetek kezelése
Alapértelmezés szerint a Spring Integration létrehoz egy globális hibacsatornát .
errorChannel
Konfigurálja a következő üzenetvégpontot a kimenő kötési hibaüzenetek kezeléséhez:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Bejövő kötési hibaüzenetek kezelése
A Spring Cloud Stream Event Hubs Binder két megoldást támogat a bejövő üzenetkötések hibáinak kezelésére: egyéni hibacsatornák és kezelők.
Hibacsatorna:
A Spring Cloud Stream hibacsatornát biztosít minden bejövő kötéshez. A
ErrorMessage
rendszer egy hibaüzenetet küld a hibacsatornának. További információ: Hibák kezelése a Spring Cloud Stream dokumentációjában.Alapértelmezett hibacsatorna
Egy globális hibacsatornát használhat, amely az
errorChannel
összes bejövő kötési hibaüzenetet felhasználja. Az üzenetek kezeléséhez konfigurálja a következő üzenetvégpontot:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Kötésspecifikus hibacsatorna
Egy adott hibacsatornával az alapértelmezett hibacsatornánál magasabb prioritású bejövő kötési hibaüzeneteket használhat. Az üzenetek kezeléséhez konfigurálja a következő üzenetvégpontot:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Megjegyzés:
A kötésspecifikus hibacsatorna kölcsönösen kizárja a többi megadott hibakezelőt és csatornát.
Hibakezelő:
A Spring Cloud Stream egy olyan mechanizmust tesz elérhetővé, amellyel egyéni hibakezelőt biztosíthat egy
Consumer
példányokat elfogadóErrorMessage
elem hozzáadásával. További információ: Hibakezelés a Spring Cloud Stream dokumentációjában.Megjegyzés:
Ha bármilyen kötési hibakezelő konfigurálva van, az az alapértelmezett hibacsatornával is működik.
Kötés alapértelmezett hibakezelője
Konfiguráljon egyetlen
Consumer
babot az összes bejövő kötési hibaüzenet felhasználásához. Az alábbi alapértelmezett függvény feliratkozik az egyes bejövő kötési hibacsatornákra:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
A tulajdonságot a
spring.cloud.stream.default.error-handler-definition
függvény nevére is be kell állítania.Kötésspecifikus hibakezelő
Konfiguráljon egy babot
Consumer
az adott bejövő kötési hibaüzenetek felhasználásához. Az alábbi függvény feliratkozik az adott bejövő kötési hibacsatornára, és magasabb prioritással rendelkezik, mint a kötés alapértelmezett hibakezelője:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
A tulajdonságot a
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
függvény nevére is be kell állítania.
Event Hubs-üzenetfejlécek
A támogatott alapvető üzenetfejléceket a Spring Cloud Azure-támogatás Event Hubs-üzenetfejlécek szakaszában találja a Spring Integrationhez.
Több kötőanyag támogatása
Csatlakozás több Event Hubs-névtérre való átengedést több kötőanyag is támogatja. Ez a minta példaként egy kapcsolati sztring vesz igénybe. A szolgáltatásnevek és a felügyelt identitások hitelesítő adatai is támogatottak. A kapcsolódó tulajdonságokat az egyes iratgyűjtők környezeti beállításaiban állíthatja be.
Ha több kötést szeretne használni az Event Hubsban, konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Megjegyzés:
Az előző alkalmazásfájl bemutatja, hogyan konfigurálhat egyetlen alapértelmezett lekérdezést az alkalmazáshoz az összes kötésre. Ha egy adott kötéshez szeretné konfigurálni a lekérdezést, használhat olyan konfigurációt, mint a
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Két szállítót és két fogyasztót kell definiálnunk:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Az erőforrások kiépítése
Az Event Hubs-iratgyűjtő támogatja az eseményközpont és a fogyasztói csoport kiépítését, a felhasználók az alábbi tulajdonságok használatával engedélyezhetik a kiépítést.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
Samples
További információ: Azure-Spring-Boot-Samples adattár a GitHubon.
Spring Cloud Stream Binder az Azure Service Bushoz
Fő fogalmak
Az Azure Service Bushoz készült Spring Cloud Stream Binder biztosítja a Spring Cloud Stream-keretrendszer kötési implementációt. Ez az implementáció a Spring Integration Service Bus-csatornaadaptereket használja az alapításakor.
Ütemezett üzenet
Ez a kötés támogatja az üzenetek küldését egy témakörbe késleltetett feldolgozás céljából. A felhasználók ezredmásodpercben küldhetnek ütemezett üzeneteket az üzenet fejlécének x-delay
kifejezésével. Az üzenet ezredmásodperc után x-delay
jelenik meg a megfelelő témakörökben.
Fogyasztói csoport
A Service Bus-témakör hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával.
Ez a kötőanyag egy témakörre támaszkodik Subscription
, amely fogyasztói csoportként működik.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Másik lehetőségként használhatja a Spring Cloud Azure Stream Service Bus Startert is, ahogyan az a Maven esetében az alábbi példában is látható:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfiguráció
Az iratgyűjtő a konfigurációs beállítások alábbi két részét biztosítja:
Csatlakozás ion konfigurációs tulajdonságai
Ez a szakasz az Azure Service Bushoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Megjegyzés:
Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
Csatlakozás spring-cloud-azure-stream-binder-servicebus konfigurálható tulajdonságai:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.azure.servicebus.enabled | Logikai | Az Azure Service Bus engedélyezése. |
spring.cloud.azure.servicebus.connection-string | Sztring | Service Bus-névtér kapcsolati sztring érték. |
spring.cloud.azure.servicebus.namespace | Sztring | Service Bus-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie |
spring.cloud.azure.servicebus.domain-name | Sztring | Egy Azure Service Bus-névtérérték tartományneve. |
Megjegyzés:
Az Azure Service SDK gyakori konfigurációs beállításai a Spring Cloud Azure Stream Service Bus-iratgyűjtőhöz is konfigurálhatók. A támogatott konfigurációs beállítások a Spring Cloud Azure-konfigurációban jelennek meg, és konfigurálhatók az egyesített előtaggal spring.cloud.azure.
vagy a spring.cloud.azure.servicebus.
.
A kötőanyag alapértelmezés szerint támogatja a Spring Could Azure Resource Managert is. A kapcsolati sztring a kapcsolódó szerepkörökkel nem rendelkező biztonsági tagokkal Data
való lekéréséről a Spring Could Azure Resource Manager alapszintű használati szakaszában olvashat.
Az Azure Service Bus kötéskonfigurációs tulajdonságai
A következő lehetőségek négy részre vannak osztva: Fogyasztói tulajdonságok, Speciális fogyasztói konfigurációk, Gyártói tulajdonságok és Speciális gyártói konfigurációk.
Fogyasztói tulajdonságok
Ezek a tulajdonságok a következőn keresztül ServiceBusConsumerProperties
érhetők el: .
A spring-cloud-azure-stream-binder-servicebus fogyasztói konfigurálható tulajdonságai:
Tulajdonság | Type | Alapértelmezett | Leírás |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | Logikai | false | Ha a sikertelen üzeneteket a rendszer a DLQ-hoz irányítja. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Egész | 1 | A Service Bus processzorügyfél által feldolgozandó egyidejű üzenetek maximális beállítása. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Egész | null | Egyszerre feldolgozandó munkamenetek maximális száma. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Logikai | null | Engedélyezve van-e a munkamenet. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Egész | 0 | A Service Bus processzorügyfél előzetes száma. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | Nincs | Az alsor típusa, amelyhez csatlakozni szeretne. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Duration | 5m | A zárolás automatikus megújításának folytatásához szükséges idő. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | A Service Bus processzorügyfél fogadási módja. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Logikai | true | Az üzenetek automatikus rendezése. Ha hamisként van beállítva, a rendszer hozzáad egy üzenetfejlécet Checkpointer , hogy a fejlesztők manuálisan rendezhessék az üzeneteket. |
Speciális fogyasztói konfiguráció
A fenti kapcsolat és a gyakori Azure SDK-ügyfélkonfiguráció támogatja az egyes iratgyűjtő-felhasználók testreszabását, amelyeket az előtaggal spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
konfigurálhat.
Gyártói tulajdonságok
Ezek a tulajdonságok a következőn keresztül ServiceBusProducerProperties
érhetők el: .
A spring-cloud-azure-stream-binder-servicebus gyártó által konfigurálható tulajdonságai:
Tulajdonság | Type | Alapértelmezett | Leírás |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | Logikai | false | Kapcsolójelző a gyártó szinkronizálásához. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | hosszú | 10000 | Időtúllépési érték a gyártó elküldéséhez. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | null | A gyártó Service Bus-entitástípusa, amely a kötelező gyártóhoz szükséges. |
Fontos
A kötésgyártó használatakor konfigurálni kell a tulajdonságát spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
.
Speciális gyártókonfiguráció
A fenti kapcsolat és az Azure SDK-ügyfélkonfiguráció támogatja az egyes iratgyűjtő-előállítók testreszabását, amelyeket az előtaggal spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
konfigurálhat.
Alapszintű használat
Üzenetek küldése és fogadása a Service Busból vagy a Service Busba
Adja meg a konfigurációs beállításokat hitelesítő adatokkal.
Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Szállító és fogyasztó meghatározása.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Partíciókulcs támogatása
A kötés támogatja a Service Bus particionálását azáltal, hogy engedélyezi a partíciókulcs és a munkamenet-azonosító beállítását az üzenetfejlécben. Ez a szakasz bemutatja, hogyan állíthat be partíciókulcsot az üzenetekhez.
A Spring Cloud Stream egy partíciókulcs SpEL-kifejezéstulajdonságát spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
biztosítja. Például állítsa be ezt a tulajdonságot, "'partitionKey-' + headers[<message-header-key>]"
és adjon hozzá egy message-header-key nevű fejlécet. A Spring Cloud Stream a fejléc értékét használja a kifejezés kiértékelésekor egy partíciókulcs hozzárendeléséhez. Az alábbi kód egy példakészítőt tartalmaz:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Munkamenet-támogatás
A kötés támogatja a Service Bus üzenet-munkameneteit . Az üzenet munkamenet-azonosítója az üzenet fejlécén keresztül állítható be.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Megjegyzés:
A Service Bus particionálása szerint a munkamenet-azonosító magasabb prioritással rendelkezik, mint a partíciókulcs. Így ha mindkét ServiceBusMessageHeaders#SESSION_ID
fejléc ServiceBusMessageHeaders#PARTITION_KEY
be van állítva, a munkamenet-azonosító értéke idővel felülírja a partíciókulcs értékét.
Hibaüzenetek kezelése
Kimenő kötési hibaüzenetek kezelése
Alapértelmezés szerint a Spring Integration létrehoz egy globális hibacsatornát .
errorChannel
Konfigurálja a következő üzenetvégpontot a kimenő kötési hibaüzenet kezeléséhez.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Bejövő kötési hibaüzenetek kezelése
A Spring Cloud Stream Service Bus Binder három megoldást támogat a bejövő üzenetkötések hibáinak kezelésére: a binder hibakezelője, az egyéni hibacsatornák és a kezelők.
Binder hibakezelő:
Az alapértelmezett kötéskezelő hibakezelő kezeli a bejövő kötést. Ezzel a kezelő használatával sikertelen üzeneteket küldhet a kézbesítetlen levelek üzenetsorába, ha
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
engedélyezve van. Ellenkező esetben a sikertelen üzenetek elhagyva lesznek. A kötésspecifikus hibacsatorna konfigurálásának kivételével a kötéskezelő hibakezelő mindig érvénybe lép, függetlenül attól, hogy vannak-e más egyéni hibakezelők vagy csatornák.Hibacsatorna:
A Spring Cloud Stream hibacsatornát biztosít minden bejövő kötéshez. A
ErrorMessage
rendszer egy hibaüzenetet küld a hibacsatornának. További információ: Hibák kezelése a Spring Cloud Stream dokumentációjában.Alapértelmezett hibacsatorna
Egy globális hibacsatornát használhat, amely az
errorChannel
összes bejövő kötési hibaüzenetet felhasználja. Az üzenetek kezeléséhez konfigurálja a következő üzenetvégpontot:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Kötésspecifikus hibacsatorna
Egy adott hibacsatornával az alapértelmezett hibacsatornánál magasabb prioritású bejövő kötési hibaüzeneteket használhat. Az üzenetek kezeléséhez konfigurálja a következő üzenetvégpontot:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Megjegyzés:
A kötésspecifikus hibacsatorna kölcsönösen kizárja a többi megadott hibakezelőt és csatornát.
Hibakezelő:
A Spring Cloud Stream egy olyan mechanizmust tesz elérhetővé, amellyel egyéni hibakezelőt biztosíthat egy
Consumer
példányokat elfogadóErrorMessage
elem hozzáadásával. További információ: Hibakezelés a Spring Cloud Stream dokumentációjában.Megjegyzés:
Ha bármilyen kötési hibakezelő konfigurálva van, az az alapértelmezett hibacsatornával és a kötéskezelő hibakezelővel is használható.
Kötés alapértelmezett hibakezelője
Konfiguráljon egyetlen
Consumer
babot az összes bejövő kötési hibaüzenet felhasználásához. Az alábbi alapértelmezett függvény feliratkozik az egyes bejövő kötési hibacsatornákra:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
A tulajdonságot a
spring.cloud.stream.default.error-handler-definition
függvény nevére is be kell állítania.Kötésspecifikus hibakezelő
Konfiguráljon egy babot
Consumer
az adott bejövő kötési hibaüzenetek felhasználásához. Az alábbi függvény előfizet az adott bejövő kötési hibacsatornára, amelynek prioritása magasabb, mint a kötés alapértelmezett hibakezelője.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
A tulajdonságot a
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
függvény nevére is be kell állítania.
Service Bus-üzenetfejlécek
A támogatott alapszintű üzenetfejlécekért tekintse meg a Spring Cloud Azure-támogatás a Spring Integration Service Bus-üzenetfejléceit ismertető szakaszt.
Megjegyzés:
A partíciókulcs beállításakor az üzenetfejléc prioritása magasabb, mint a Spring Cloud Stream tulajdonság. Így spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
csak akkor lép érvénybe, ha egyik ServiceBusMessageHeaders#SESSION_ID
ServiceBusMessageHeaders#PARTITION_KEY
fejléc sincs konfigurálva.
Több kötőanyag támogatása
Csatlakozás több Service Bus-névtérre való átengedést több kötőanyag is támogatja. Ez a minta példaként kapcsolati sztring. A szolgáltatásnevek és a felügyelt identitások hitelesítő adatai is támogatottak, a felhasználók a kapcsolódó tulajdonságokat az egyes iratgyűjtők környezeti beállításaiban állíthatják be.
A ServiceBus több kötésének használatához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Megjegyzés:
Az előző alkalmazásfájl bemutatja, hogyan konfigurálhat egyetlen alapértelmezett lekérdezést az alkalmazáshoz az összes kötésre. Ha egy adott kötéshez szeretné konfigurálni a lekérdezést, használhat olyan konfigurációt, mint a
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.két beszállítót és két fogyasztót kell meghatároznunk
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Az erőforrások kiépítése
A Service Bus binder támogatja az üzenetsor, a témakör és az előfizetés kiépítését, a felhasználók az alábbi tulajdonságok használatával engedélyezhetik a kiépítést.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
Samples
További információ: Azure-Spring-Boot-Samples adattár a GitHubon.
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: