Spring Cloud Azure-támogatás a Spring Integrationhez
Ez a cikk a következőre vonatkozik: ✔️ 4.14.0 ✔️ 5.8.0-s verzió
Az Azure Spring Integration Extension spring integration adaptereket biztosít a Java-hoz készült Azure SDK különböző szolgáltatásaihoz. Spring Integration-támogatást biztosítunk ezekhez az Azure-szolgáltatásokhoz: Event Hubs, Service Bus, Storage Queue. A támogatott adapterek listája a következő:
spring-cloud-azure-starter-integration-eventhubs
– további információ: Spring Integration with Azure Event Hubsspring-cloud-azure-starter-integration-servicebus
– további információ: Spring Integration with Azure Service Busspring-cloud-azure-starter-integration-storage-queue
– további információ: Spring Integration with Azure Storage Queue
Spring Integration with Azure Event Hubs
Fő fogalmak
Az Azure Event Hubs egy big data streamelési platform és eseménybetöltési szolgáltatás. Másodpercenként több millió eseményt képes fogadni és feldolgozni. Az eseményközpontokba elküldött adatok bármilyen valós idejű elemzési szolgáltató vagy kötegelési/tárolóadapter segítségével átalakíthatók és tárolhatók.
A Spring Integration lehetővé teszi az egyszerű üzenetkezelést a Spring-alapú alkalmazásokban, és deklaratív adaptereken keresztül támogatja a külső rendszerekkel való integrációt. Ezek az adapterek magasabb szintű absztrakciót biztosítanak a Spring újraegyeztetési, üzenetkezelési és ütemezési támogatásával szemben. Az Event Hubs Spring Integration bővítményprojektje bejövő és kimenő csatornaadaptereket és átjárókat biztosít az Azure Event Hubshoz.
Megjegyzés:
Az RxJava támogatási API-k a 4.0.0-s verzióról törlődnek. Részletekért lásd: Javadoc.
Fogyasztói csoport
Az Event Hubs hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Bár a Kafka az összes véglegesített eltolást a közvetítőben tárolja, a manuálisan feldolgozott Event Hubs-üzenetek eltolásait kell tárolnia. Az Event Hubs SDK az ilyen eltolások Azure Storage-ban való tárolására szolgál.
Particionálás támogatása
Az Event Hubs a Kafkához hasonló fizikai partíciót biztosít. A Kafka felhasználói és partíciói közötti automatikus újraelosztással ellentétben azonban az Event Hubs egyfajta megelőző módot biztosít. A tárfiók bérletként működik annak meghatározásához, hogy melyik partíció melyik felhasználó tulajdonában van. Amikor egy új felhasználó elindul, megpróbál ellopni néhány partíciót a legtöbb nehéz terhelésű felhasználótól a számítási feladatok kiegyensúlyozása érdekében.
A terheléselosztási stratégia megadásához a fejlesztők használhatják EventHubsContainerProperties
a konfigurációt. A konfigurálásra EventHubsContainerProperties
a következő szakaszban talál példát.
Batch fogyasztói támogatás
Ez EventHubsInboundChannelAdapter
támogatja a kötegfogyasztó módot. Az engedélyezéshez a felhasználók megadhatja a figyelő módot, mint ListenerMode.BATCH
egy EventHubsInboundChannelAdapter
példány létrehozásakor.
Ha engedélyezve van, a rendszer fogad egy üzenetet , amelynek hasznos adata a kötegelt események listája, és át lesz adva az alsóbb rétegbeli csatornának. Az egyes üzenetfejlécek is listaként lesznek konvertálva, amelyek tartalma az egyes eseményekhez tartozó fejlécérték. A partícióazonosító, az ellenőrzőpont és az utolsó lekérdezett tulajdonságok közösségi fejlécei egyetlen értékként jelennek meg a teljes eseménykötegben, amelyek ugyanazt a tulajdonságot használják. További információ: Event Hubs Üzenetfejlécek szakasz.
Megjegyzés:
Az ellenőrzőpont fejléce csak manuális ellenőrzőpont-mód használata esetén létezik.
A kötegfelhasználók ellenőrzőpontozása két módot támogat: BATCH
és MANUAL
. BATCH
A mód egy automatikus ellenőrzőpontozási mód, amely az események teljes kötegét együttesen ellenőrzi a beérkezés után. MANUAL
módban a felhasználók ellenőrzik az eseményeket. Használat esetén a checkpointer át lesz adva az üzenet fejlécébe, és a felhasználók használhatják az ellenőrzőpont-ellenőrzést.
A kötegfogyasztó házirendet megadhatja max-size
az and max-wait-time
, where max-size
is a necessary tulajdonság, de max-wait-time
nem kötelező.
A köteghasználati stratégia megadásához a fejlesztők használhatják EventHubsContainerProperties
a konfigurációt. A konfigurálásra EventHubsContainerProperties
a következő szakaszban talál példát.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfiguráció
Ez az alapindító a következő 3 konfigurációs lehetőséget biztosítja:
Csatlakozás ion konfigurációs tulajdonságai
Ez a szakasz az Azure Event Hubshoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Megjegyzés:
Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
a spring-cloud-azure-starter-integration-eventhubs Csatlakozás konfigurálható tulajdonságai:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.enabled | Logikai | Az Azure Event Hubs engedélyezése. |
spring.cloud.azure.eventhubs.connection-string | Sztring | Event Hubs-névtér kapcsolati sztring érték. |
spring.cloud.azure.eventhubs.namespace | Sztring | Event Hubs-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie |
spring.cloud.azure.eventhubs.domain-name | Sztring | Egy Azure Event Hubs-névtérérték tartományneve. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Sztring | Egyéni végpont címe. |
spring.cloud.azure.eventhubs.shared-connection | Logikai | Azt, hogy a mögöttes EventProcessorClient és az EventHubProducerAsyncClient ugyanazt a kapcsolatot használja-e. Alapértelmezés szerint minden létrehozott Event Hub-ügyfélhez új kapcsolat jön létre és jön létre. |
Ellenőrzőpont konfigurációs tulajdonságai
Ez a szakasz a Storage Blobs szolgáltatás konfigurációs beállításait tartalmazza, amely a partíció tulajdonjogának és ellenőrzőpont-adatainak megőrzésére szolgál.
Megjegyzés:
A 4.0.0-s verzióból, ha a spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tulajdonság nincs manuálisan engedélyezve, a rendszer nem hoz létre automatikusan storage-tárolót.
A spring-cloud-azure-starter-integration-eventhubs konfigurálható tulajdonságainak ellenőrzése:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Logikai | Engedélyezi-e a tárolók létrehozását, ha nem létezik. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Sztring | A tárfiók neve. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Sztring | Tárfiók hozzáférési kulcsa. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Sztring | Tároló neve. |
Az Azure Service SDK gyakori konfigurációs beállításai a Storage Blob CheckPoint Store-hoz is konfigurálhatók. A támogatott konfigurációs beállítások a Spring Cloud Azure-konfigurációban jelennek meg, és konfigurálhatók az egyesített előtaggal spring.cloud.azure.
vagy a spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Az Event Hub processzorkonfigurációs tulajdonságai
Az EventHubsInboundChannelAdapter
alkalmazás egy EventProcessorClient
eseményközpontból származó üzeneteket használ fel egy adott eseményközpont általános tulajdonságainak EventProcessorClient
konfigurálásához, a fejlesztők pedig használhatják EventHubsContainerProperties
a konfigurációt. Tekintse meg a következő szakaszt a munka menetéről EventHubsInboundChannelAdapter
.
Alapszintű használat
Üzenetek küldése az Azure Event Hubsba
Adja meg a hitelesítő adatok konfigurációs beállításait.
Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
Hozzon létre
DefaultMessageHandler
egy babot,EventHubsTemplate
hogy üzeneteket küldjön az Event Hubsnak.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Hozzon létre egy üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Üzenetek küldése az átjáróval.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Üzenetek fogadása az Azure Event Hubsból
Adja meg a hitelesítő adatok konfigurációs beállításait.
Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Hozzon létre
EventHubsInboundChannelAdapter
egy babot azEventHubsMessageListenerContainer
Event Hubstól érkező üzenetek fogadásához.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Hozzon létre egy üzenet fogadó kötést az EventHubsInboundChannelAdapterrel a korábban létrehozott üzenetcsatornán keresztül.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Az EventHubsMessageConverter konfigurálása az objectMapper testreszabásához
EventHubsMessageConverter
konfigurálható babként készült, amely lehetővé teszi a felhasználók számára az ObjectMapper testreszabását.
Batch fogyasztói támogatás
Az Event Hubsból származó üzenetek kötegekben való felhasználása a fenti mintához hasonló, a felhasználókon kívül meg kell adni a kötegfogyasztókkal kapcsolatos konfigurációs beállításokat is EventHubsInboundChannelAdapter
.
Létrehozáskor EventHubsInboundChannelAdapter
a figyelő módnak a következőnek kell lennie BATCH
: . Amikor létrehoz egy babotEventHubsMessageListenerContainer
, állítsa be az ellenőrzőpont üzemmódot vagy BATCH
MANUAL
azt, és a kötegbeállítások igény szerint konfigurálhatók.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Event Hubs-üzenetfejlécek
Az alábbi táblázat bemutatja, hogyan vannak leképezve az Event Hubs üzenettulajdonságai a Spring-üzenetfejlécekre. Az Azure Event Hubs esetében az üzenet neve event
.
Leképezés az Event Hubs-üzenet/eseménytulajdonságok és a spring message fejlécek között rekordfigyelő módban:
Az Event Hubs eseménytulajdonságai | Spring Message Header Constants | Type | Description |
---|---|---|---|
Enqueued time | EventHubsHeaders#ENQUEUED_TIME | Azonnali | Az eseménynek az Event Hub partícióban való leküldésének pillanata UTC-ben. |
Eltolás | EventHubsHeaders#OFF Standard kiadás T | Long | Az esemény eltolása, amikor a társított Event Hub-partícióról érkezett. |
Partíciókulcs | AzureHeaders#PARTITION_KEY | Sztring | A partíció kivonatolási kulcsa, ha az esemény eredeti közzétételekor lett beállítva. |
Partícióazonosító | AzureHeaders#RAW_PARTITION_ID | Sztring | Az Event Hub partícióazonosítója. |
Sorozat száma | EventHubsHeaders#Standard kiadásQUENCE_NUMBER | Long | Az eseményhez rendelt sorszám, amikor a társított Event Hub-partícióban lekérdezték. |
Utolsó enqueued eseménytulajdonságok | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | A partíció utolsó lekérdezett eseményének tulajdonságai. |
NA | AzureHeaders#CHECKPOINTER | Ellenőrzőpont | Az adott üzenet ellenőrzőpontjának fejléce. |
A felhasználók elemezhetik az egyes események kapcsolódó információinak üzenetfejléceit. Az esemény üzenetfejlécének beállításához a program az összes testreszabott fejlécet egy esemény alkalmazástulajdonságaként helyezi el, ahol a fejléc tulajdonságkulcsként van beállítva. Amikor események érkeznek az Event Hubstól, az összes alkalmazástulajdonság az üzenetfejlécre lesz konvertálva.
Megjegyzés:
A partíciókulcs, a lekérdezett idő, az eltolás és a sorszám üzenetfejlécei nem használhatók manuálisan.
Ha engedélyezve van a batch-consumer mód, a kötegelt üzenetek adott fejlécei a következők, amelyek az egyes Event Hubs-események értékeinek listáját tartalmazzák.
Leképezés az Event Hubs-üzenet/eseménytulajdonságok és a Spring Message-fejlécek között Batch-figyelő módban:
Az Event Hubs eseménytulajdonságai | Spring Batch üzenetfejléc-állandók | Type | Description |
---|---|---|---|
Enqueued time | EventHubsHeaders#ENQUEUED_TIME | Azonnali lista | Annak a pillanatnak a listája (UTC), amikor az egyes eseményeket az Event Hub partícióba foglalták. |
Eltolás | EventHubsHeaders#OFF Standard kiadás T | Hosszú lista | Az egyes események eltolásának listája a társított Event Hub-partícióról való fogadáskor. |
Partíciókulcs | AzureHeaders#PARTITION_KEY | Sztringek listája | A partíció kivonatolási kulcsának listája, ha az az egyes események eredeti közzétételekor lett beállítva. |
Sorozat száma | EventHubsHeaders#Standard kiadásQUENCE_NUMBER | Hosszú lista | Az egyes eseményekhez rendelt sorszámok listája, amikor az eseményt a társított Event Hub-partícióban lekérdezték. |
Rendszertulajdonságok | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Térképlista | Az egyes események rendszertulajdonságainak listája. |
Az alkalmazás tulajdonságai | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Térképlista | Az egyes események alkalmazástulajdonságainak listája, ahol az összes testreszabott üzenetfejléc vagy eseménytulajdonság el lesz helyezve. |
Megjegyzés:
Üzenetek közzétételekor a fenti kötegfejlécek el lesznek távolítva az üzenetekből, ha léteznek.
Samples
További információ: Azure-Spring-Boot-Samples adattár a GitHubon.
Spring Integration with Azure Service Bus
Fő fogalmak
A Spring Integration lehetővé teszi az egyszerű üzenetkezelést a Spring-alapú alkalmazásokban, és deklaratív adaptereken keresztül támogatja a külső rendszerekkel való integrációt.
Az Azure Service Bus Spring Integration bővítményprojektje bejövő és kimenő csatornaadaptereket biztosít az Azure Service Bushoz.
Megjegyzés:
Az CompletableFuture támogatási API-k elavultak a 2.10.0-s verzióról, és a Reactor Core a 4.0.0-s verzióról váltja fel. Részletekért lásd: Javadoc.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfiguráció
Ez az alapindító a következő 2 konfigurációs lehetőséget biztosítja:
Csatlakozás ion konfigurációs tulajdonságai
Ez a szakasz az Azure Service Bushoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Megjegyzés:
Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
Csatlakozás spring-cloud-azure-starter-integration-servicebus konfigurálható tulajdonságai:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.azure.servicebus.enabled | Logikai | Az Azure Service Bus engedélyezése. |
spring.cloud.azure.servicebus.connection-string | Sztring | Service Bus-névtér kapcsolati sztring érték. |
spring.cloud.azure.servicebus.namespace | Sztring | Service Bus-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie |
spring.cloud.azure.servicebus.domain-name | Sztring | Egy Azure Service Bus-névtérérték tartományneve. |
A Service Bus processzor konfigurációs tulajdonságai
A ServiceBusInboundChannelAdapter
felhasználók az ServiceBusProcessorClient
üzenetek felhasználásával konfigurálják egy adott eszköz általános tulajdonságait ServiceBusProcessorClient
, a fejlesztők pedig használhatják ServiceBusContainerProperties
a konfigurációt. Tekintse meg a következő szakaszt a munka menetéről ServiceBusInboundChannelAdapter
.
Alapszintű használat
Üzenetek küldése az Azure Service Busba
Adja meg a hitelesítő adatok konfigurációs beállításait.
Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
Hozzon létre
DefaultMessageHandler
aServiceBusTemplate
beannel, hogy üzeneteket küldjön a Service Busnak, állítsa be a ServiceBusTemplate entitástípusát. Ez a minta a Service Bus-üzenetsort veszi példaként.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Hozzon létre egy üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Üzenetek küldése az átjáróval.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Üzenetek fogadása az Azure Service Busból
Adja meg a hitelesítő adatok konfigurációs beállításait.
Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Hozzon létre
ServiceBusInboundChannelAdapter
egy babot,ServiceBusMessageListenerContainer
hogy üzeneteket fogadjon a Service Busnak. Ez a minta a Service Bus-üzenetsort veszi példaként.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Hozzon létre egy üzenet fogadó kötést a korábban létrehozott üzenetcsatornán
ServiceBusInboundChannelAdapter
keresztül.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
A ServiceBusMessageConverter konfigurálása az objectMapper testreszabásához
ServiceBusMessageConverter
konfigurálható babként készült, amely lehetővé teszi a felhasználók számára a testreszabást ObjectMapper
.
Service Bus-üzenetfejlécek
Egyes Service Bus-fejlécek esetében, amelyek több Spring-fejlécállandóra is leképezhetők, a különböző Spring-fejlécek prioritása megjelenik a listában.
Leképezés a Service Bus-fejlécek és a spring fejlécek között:
Service Bus-üzenetfejlécek és -tulajdonságok | Spring message header constants | Típus | Konfigurálható | Leírás |
---|---|---|---|---|
Tartalomtípus | MessageHeaders#CONTENT_TYPE |
Sztring | Igen | Az üzenet RFC2045 tartalomtípus-leírója. |
Correlation ID | ServiceBusMessageHeaders#CORRELATION_ID |
Sztring | Igen | Az üzenet korrelációs azonosítója |
Üzenetazonosító | ServiceBusMessageHeaders#MESSAGE_ID |
Sztring | Igen | Az üzenet üzenetazonosítója, amelynek fejléce nagyobb prioritással rendelkezik, mint MessageHeaders#ID a . |
Üzenetazonosító | MessageHeaders#ID |
UUID | Igen | Az üzenet üzenetazonosítója, amelynek fejléce alacsonyabb prioritással rendelkezik, mint ServiceBusMessageHeaders#MESSAGE_ID a . |
Partíciókulcs | ServiceBusMessageHeaders#PARTITION_KEY |
Sztring | Igen | Az üzenet particionált entitásnak való küldéséhez használt partíciókulcs. |
Válasz | MessageHeaders#REPLY_CHANNEL |
Sztring | Igen | Egy entitás címe, amelybe válaszokat szeretne küldeni. |
Válasz a munkamenet-azonosítóra | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Sztring | Igen | Az üzenet ReplyToGroupId tulajdonságértéke. |
Ütemezett beiktatás időpontja utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Igen | Az a dátum, amikor az üzenetet a Service Busban kell lekérni, ez a fejléc nagyobb prioritással rendelkezik, mint AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE a . |
Ütemezett beiktatás időpontja utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Egész | Igen | Az a dátum, amikor az üzenetet a Service Busban kell lekérni, ez a fejléc alacsonyabb prioritással rendelkezik, mint ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME a . |
Munkamenet-azonosító | ServiceBusMessageHeaders#SESSION_ID |
Sztring | Igen | A munkamenet-vezérelt entitás munkamenet-IDentifier-azonosítója. |
Élettartam | ServiceBusMessageHeaders#TIME_TO_LIVE |
Duration | Igen | Az üzenet lejárata előtti időtartam. |
Végérték | ServiceBusMessageHeaders#TO |
Sztring | Igen | Az üzenet "címzett" címe, amely az útválasztási forgatókönyvekben való jövőbeli használatra van fenntartva, és amelyet jelenleg maga a közvetítő figyelmen kívül hagy. |
Subject | ServiceBusMessageHeaders#SUBJECT |
Sztring | Igen | Az üzenet tárgya. |
Holt betű hiba leírása | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Sztring | Nem | Egy halott betűs üzenet leírása. |
Holt betű oka | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Sztring | Nem | Az oka annak, hogy egy üzenet elhalt betűs volt. |
Holt betűforrás | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Sztring | Nem | Az az entitás, amelyben az üzenet elhalt betűs volt. |
Kézbesítések száma | ServiceBusMessageHeaders#DELIVERY_COUNT |
hosszú | Nem | Az üzenet ügyfeleknek való kézbesítésének száma. |
Enqueued sorszám | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
hosszú | Nem | A Service Bus által egy üzenethez hozzárendelt lekérdezett sorszám. |
Enqueued time | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Nem | Az az időpont, amikor az üzenet be lett iktatásra a Service Busban. |
A lejárat dátuma: | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Nem | Az az időpont, amikor az üzenet lejár. |
Jogkivonat zárolása | ServiceBusMessageHeaders#LOCK_TOKEN |
Sztring | Nem | Az aktuális üzenet zárolási jogkivonata. |
Zárolva, amíg | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Nem | Az az időpont, amikor az üzenet zárolása lejár. |
Sorozat száma | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
hosszú | Nem | A Service Bus által egy üzenethez rendelt egyedi szám. |
Állami | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Nem | Az üzenet állapota, amely lehet aktív, késleltetett vagy ütemezett. |
Partíciókulcs támogatása
Ez a kezdő támogatja a Service Bus particionálását azáltal, hogy engedélyezi a partíciókulcs és a munkamenet-azonosító beállítását az üzenetfejlécben. Ez a szakasz bemutatja, hogyan állíthat be partíciókulcsot az üzenetekhez.
Ajánlott: Használja ServiceBusMessageHeaders.PARTITION_KEY
a fejléc kulcsaként.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nem ajánlott, de jelenleg támogatott:AzureHeaders.PARTITION_KEY
a fejléc kulcsaként.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Megjegyzés:
Ha mindkettő ServiceBusMessageHeaders.PARTITION_KEY
AzureHeaders.PARTITION_KEY
be van állítva az üzenetfejlécekben, ServiceBusMessageHeaders.PARTITION_KEY
akkor a rendszer előnyben részesíti.
Munkamenet-támogatás
Ez a példa bemutatja, hogyan állíthatja be manuálisan egy üzenet munkamenet-azonosítóját az alkalmazásban.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Megjegyzés:
Ha az ServiceBusMessageHeaders.SESSION_ID
üzenetfejlécekben van beállítva, és egy másik ServiceBusMessageHeaders.PARTITION_KEY
fejléc is be van állítva, a munkamenet-azonosító értéke végül a partíciókulcs értékének felülírására lesz használva.
Samples
További információ: Azure-Spring-Boot-Samples adattár a GitHubon.
Spring Integration with Azure Storage Queue
Fő fogalmak
Az Azure Queue Storage szolgáltatás nagy számú üzenet tárolására szolgál. A világ bármely pontjáról elérheti az üzeneteket hitelesített hívásokon keresztül HTTP vagy HTTPS használatával. Az üzenetsor-üzenetek mérete legfeljebb 64 KB lehet. Az üzenetsorok több millió üzenetet tartalmazhatnak, akár a tárfiók teljes kapacitáskorlátját is. Az üzenetsorokat gyakran használják az aszinkron feldolgozáshoz használt teendőlista létrehozásához.
Függőség beállítása
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfiguráció
Ez az alapindító a következő konfigurációs beállításokat biztosítja:
Csatlakozás ion konfigurációs tulajdonságai
Ez a szakasz az Azure Storage Queuehoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.
Megjegyzés:
Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.
Csatlakozás spring-cloud-azure-starter-integration-storage-queue konfigurálható tulajdonságai:
Tulajdonság | Type | Description |
---|---|---|
spring.cloud.azure.storage.queue.enabled | Logikai | Hogy engedélyezve van-e egy Azure Storage-üzenetsor. |
spring.cloud.azure.storage.queue.connection-string | Sztring | A Tárolósor névtere kapcsolati sztring érték. |
spring.cloud.azure.storage.queue.accountName | Sztring | Storage Queue-fiók neve. |
spring.cloud.azure.storage.queue.accountKey | Sztring | Storage Queue-fiókkulcs. |
spring.cloud.azure.storage.queue.endpoint | Sztring | Storage Queue szolgáltatásvégpont. |
spring.cloud.azure.storage.queue.sasToken | Sztring | Sas-jogkivonat hitelesítő adatai |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | Api-kérések készítésekor használt QueueServiceVersion. |
spring.cloud.azure.storage.queue.messageEncoding | Sztring | Üzenetsor üzenetkódolása. |
Alapszintű használat
Üzenetek küldése az Azure Storage-üzenetsorba
Adja meg a hitelesítő adatok konfigurációs beállításait.
Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Megjegyzés:
Az engedélyezett tenant-id
értékek a következők: common
, organizations
, consumers
vagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.
Hozzon létre
DefaultMessageHandler
egy babot,StorageQueueTemplate
hogy üzeneteket küldjön a Storage-üzenetsorba.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Hozzon létre egy Üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Üzenetek küldése az átjáróval.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Üzenetek fogadása az Azure Storage-üzenetsorból
Adja meg a hitelesítő adatok konfigurációs beállításait.
Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Hozzon létre
StorageQueueMessageSource
egy babot aStorageQueueTemplate
Storage-üzenetsorba érkező üzenetek fogadásához.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Hozzon létre egy üzenet fogadó kötést a StorageQueueMessageSource használatával, amelyet az előző lépésben hoztunk létre a korábban létrehozott üzenetcsatornán keresztül.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Samples
További információ: Azure-Spring-Boot-Samples adattár a GitHubon.
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: