Spring Cloud Azure-támogatás a Spring Integrationhez

Ez a cikk a következőre vonatkozik: ✔️ 4.14.0 ✔️ 5.8.0-s verzió

Az Azure Spring Integration Extension spring integration adaptereket biztosít a Java-hoz készült Azure SDK különböző szolgáltatásaihoz. Spring Integration-támogatást biztosítunk ezekhez az Azure-szolgáltatásokhoz: Event Hubs, Service Bus, Storage Queue. A támogatott adapterek listája a következő:

Spring Integration with Azure Event Hubs

Fő fogalmak

Az Azure Event Hubs egy big data streamelési platform és eseménybetöltési szolgáltatás. Másodpercenként több millió eseményt képes fogadni és feldolgozni. Az eseményközpontokba elküldött adatok bármilyen valós idejű elemzési szolgáltató vagy kötegelési/tárolóadapter segítségével átalakíthatók és tárolhatók.

A Spring Integration lehetővé teszi az egyszerű üzenetkezelést a Spring-alapú alkalmazásokban, és deklaratív adaptereken keresztül támogatja a külső rendszerekkel való integrációt. Ezek az adapterek magasabb szintű absztrakciót biztosítanak a Spring újraegyeztetési, üzenetkezelési és ütemezési támogatásával szemben. Az Event Hubs Spring Integration bővítményprojektje bejövő és kimenő csatornaadaptereket és átjárókat biztosít az Azure Event Hubshoz.

Megjegyzés:

Az RxJava támogatási API-k a 4.0.0-s verzióról törlődnek. Részletekért lásd: Javadoc.

Fogyasztói csoport

Az Event Hubs hasonló támogatást nyújt a fogyasztói csoportoknak, mint az Apache Kafka, de kismértékben eltérő logikával. Bár a Kafka az összes véglegesített eltolást a közvetítőben tárolja, a manuálisan feldolgozott Event Hubs-üzenetek eltolásait kell tárolnia. Az Event Hubs SDK az ilyen eltolások Azure Storage-ban való tárolására szolgál.

Particionálás támogatása

Az Event Hubs a Kafkához hasonló fizikai partíciót biztosít. A Kafka felhasználói és partíciói közötti automatikus újraelosztással ellentétben azonban az Event Hubs egyfajta megelőző módot biztosít. A tárfiók bérletként működik annak meghatározásához, hogy melyik partíció melyik felhasználó tulajdonában van. Amikor egy új felhasználó elindul, megpróbál ellopni néhány partíciót a legtöbb nehéz terhelésű felhasználótól a számítási feladatok kiegyensúlyozása érdekében.

A terheléselosztási stratégia megadásához a fejlesztők használhatják EventHubsContainerProperties a konfigurációt. A konfigurálásra EventHubsContainerPropertiesa következő szakaszban talál példát.

Batch fogyasztói támogatás

Ez EventHubsInboundChannelAdapter támogatja a kötegfogyasztó módot. Az engedélyezéshez a felhasználók megadhatja a figyelő módot, mint ListenerMode.BATCH egy EventHubsInboundChannelAdapter példány létrehozásakor. Ha engedélyezve van, a rendszer fogad egy üzenetet , amelynek hasznos adata a kötegelt események listája, és át lesz adva az alsóbb rétegbeli csatornának. Az egyes üzenetfejlécek is listaként lesznek konvertálva, amelyek tartalma az egyes eseményekhez tartozó fejlécérték. A partícióazonosító, az ellenőrzőpont és az utolsó lekérdezett tulajdonságok közösségi fejlécei egyetlen értékként jelennek meg a teljes eseménykötegben, amelyek ugyanazt a tulajdonságot használják. További információ: Event Hubs Üzenetfejlécek szakasz.

Megjegyzés:

Az ellenőrzőpont fejléce csak manuális ellenőrzőpont-mód használata esetén létezik.

A kötegfelhasználók ellenőrzőpontozása két módot támogat: BATCH és MANUAL. BATCH A mód egy automatikus ellenőrzőpontozási mód, amely az események teljes kötegét együttesen ellenőrzi a beérkezés után. MANUAL módban a felhasználók ellenőrzik az eseményeket. Használat esetén a checkpointer át lesz adva az üzenet fejlécébe, és a felhasználók használhatják az ellenőrzőpont-ellenőrzést.

A kötegfogyasztó házirendet megadhatja max-size az and max-wait-time, where max-size is a necessary tulajdonság, de max-wait-time nem kötelező. A köteghasználati stratégia megadásához a fejlesztők használhatják EventHubsContainerProperties a konfigurációt. A konfigurálásra EventHubsContainerPropertiesa következő szakaszban talál példát.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfiguráció

Ez az alapindító a következő 3 konfigurációs lehetőséget biztosítja:

Csatlakozás ion konfigurációs tulajdonságai

Ez a szakasz az Azure Event Hubshoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Megjegyzés:

Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

a spring-cloud-azure-starter-integration-eventhubs Csatlakozás konfigurálható tulajdonságai:

Tulajdonság Type Description
spring.cloud.azure.eventhubs.enabled Logikai Az Azure Event Hubs engedélyezése.
spring.cloud.azure.eventhubs.connection-string Sztring Event Hubs-névtér kapcsolati sztring érték.
spring.cloud.azure.eventhubs.namespace Sztring Event Hubs-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie
spring.cloud.azure.eventhubs.domain-name Sztring Egy Azure Event Hubs-névtérérték tartományneve.
spring.cloud.azure.eventhubs.custom-endpoint-address Sztring Egyéni végpont címe.
spring.cloud.azure.eventhubs.shared-connection Logikai Azt, hogy a mögöttes EventProcessorClient és az EventHubProducerAsyncClient ugyanazt a kapcsolatot használja-e. Alapértelmezés szerint minden létrehozott Event Hub-ügyfélhez új kapcsolat jön létre és jön létre.

Ellenőrzőpont konfigurációs tulajdonságai

Ez a szakasz a Storage Blobs szolgáltatás konfigurációs beállításait tartalmazza, amely a partíció tulajdonjogának és ellenőrzőpont-adatainak megőrzésére szolgál.

Megjegyzés:

A 4.0.0-s verzióból, ha a spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists tulajdonság nincs manuálisan engedélyezve, a rendszer nem hoz létre automatikusan storage-tárolót.

A spring-cloud-azure-starter-integration-eventhubs konfigurálható tulajdonságainak ellenőrzése:

Tulajdonság Type Description
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Logikai Engedélyezi-e a tárolók létrehozását, ha nem létezik.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Sztring A tárfiók neve.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Sztring Tárfiók hozzáférési kulcsa.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Sztring Tároló neve.

Az Azure Service SDK gyakori konfigurációs beállításai a Storage Blob CheckPoint Store-hoz is konfigurálhatók. A támogatott konfigurációs beállítások a Spring Cloud Azure-konfigurációban jelennek meg, és konfigurálhatók az egyesített előtaggal spring.cloud.azure. vagy a spring.cloud.azure.eventhubs.processor.checkpoint-store.

Az Event Hub processzorkonfigurációs tulajdonságai

Az EventHubsInboundChannelAdapter alkalmazás egy EventProcessorClient eseményközpontból származó üzeneteket használ fel egy adott eseményközpont általános tulajdonságainak EventProcessorClientkonfigurálásához, a fejlesztők pedig használhatják EventHubsContainerProperties a konfigurációt. Tekintse meg a következő szakaszt a munka menetéről EventHubsInboundChannelAdapter.

Alapszintű használat

Üzenetek küldése az Azure Event Hubsba

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

    • Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

  1. Hozzon létre DefaultMessageHandler egy babot, EventHubsTemplate hogy üzeneteket küldjön az Event Hubsnak.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Hozzon létre egy üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Üzenetek küldése az átjáróval.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Üzenetek fogadása az Azure Event Hubsból

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

  2. Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Hozzon létre EventHubsInboundChannelAdapter egy babot az EventHubsMessageListenerContainer Event Hubstól érkező üzenetek fogadásához.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Hozzon létre egy üzenet fogadó kötést az EventHubsInboundChannelAdapterrel a korábban létrehozott üzenetcsatornán keresztül.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Az EventHubsMessageConverter konfigurálása az objectMapper testreszabásához

EventHubsMessageConverter konfigurálható babként készült, amely lehetővé teszi a felhasználók számára az ObjectMapper testreszabását.

Batch fogyasztói támogatás

Az Event Hubsból származó üzenetek kötegekben való felhasználása a fenti mintához hasonló, a felhasználókon kívül meg kell adni a kötegfogyasztókkal kapcsolatos konfigurációs beállításokat is EventHubsInboundChannelAdapter.

Létrehozáskor EventHubsInboundChannelAdaptera figyelő módnak a következőnek kell lennie BATCH: . Amikor létrehoz egy babotEventHubsMessageListenerContainer, állítsa be az ellenőrzőpont üzemmódot vagy BATCHMANUAL azt, és a kötegbeállítások igény szerint konfigurálhatók.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs-üzenetfejlécek

Az alábbi táblázat bemutatja, hogyan vannak leképezve az Event Hubs üzenettulajdonságai a Spring-üzenetfejlécekre. Az Azure Event Hubs esetében az üzenet neve event.

Leképezés az Event Hubs-üzenet/eseménytulajdonságok és a spring message fejlécek között rekordfigyelő módban:

Az Event Hubs eseménytulajdonságai Spring Message Header Constants Type Description
Enqueued time EventHubsHeaders#ENQUEUED_TIME Azonnali Az eseménynek az Event Hub partícióban való leküldésének pillanata UTC-ben.
Eltolás EventHubsHeaders#OFF Standard kiadás T Long Az esemény eltolása, amikor a társított Event Hub-partícióról érkezett.
Partíciókulcs AzureHeaders#PARTITION_KEY Sztring A partíció kivonatolási kulcsa, ha az esemény eredeti közzétételekor lett beállítva.
Partícióazonosító AzureHeaders#RAW_PARTITION_ID Sztring Az Event Hub partícióazonosítója.
Sorozat száma EventHubsHeaders#Standard kiadásQUENCE_NUMBER Long Az eseményhez rendelt sorszám, amikor a társított Event Hub-partícióban lekérdezték.
Utolsó enqueued eseménytulajdonságok EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties A partíció utolsó lekérdezett eseményének tulajdonságai.
NA AzureHeaders#CHECKPOINTER Ellenőrzőpont Az adott üzenet ellenőrzőpontjának fejléce.

A felhasználók elemezhetik az egyes események kapcsolódó információinak üzenetfejléceit. Az esemény üzenetfejlécének beállításához a program az összes testreszabott fejlécet egy esemény alkalmazástulajdonságaként helyezi el, ahol a fejléc tulajdonságkulcsként van beállítva. Amikor események érkeznek az Event Hubstól, az összes alkalmazástulajdonság az üzenetfejlécre lesz konvertálva.

Megjegyzés:

A partíciókulcs, a lekérdezett idő, az eltolás és a sorszám üzenetfejlécei nem használhatók manuálisan.

Ha engedélyezve van a batch-consumer mód, a kötegelt üzenetek adott fejlécei a következők, amelyek az egyes Event Hubs-események értékeinek listáját tartalmazzák.

Leképezés az Event Hubs-üzenet/eseménytulajdonságok és a Spring Message-fejlécek között Batch-figyelő módban:

Az Event Hubs eseménytulajdonságai Spring Batch üzenetfejléc-állandók Type Description
Enqueued time EventHubsHeaders#ENQUEUED_TIME Azonnali lista Annak a pillanatnak a listája (UTC), amikor az egyes eseményeket az Event Hub partícióba foglalták.
Eltolás EventHubsHeaders#OFF Standard kiadás T Hosszú lista Az egyes események eltolásának listája a társított Event Hub-partícióról való fogadáskor.
Partíciókulcs AzureHeaders#PARTITION_KEY Sztringek listája A partíció kivonatolási kulcsának listája, ha az az egyes események eredeti közzétételekor lett beállítva.
Sorozat száma EventHubsHeaders#Standard kiadásQUENCE_NUMBER Hosszú lista Az egyes eseményekhez rendelt sorszámok listája, amikor az eseményt a társított Event Hub-partícióban lekérdezték.
Rendszertulajdonságok EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Térképlista Az egyes események rendszertulajdonságainak listája.
Az alkalmazás tulajdonságai EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Térképlista Az egyes események alkalmazástulajdonságainak listája, ahol az összes testreszabott üzenetfejléc vagy eseménytulajdonság el lesz helyezve.

Megjegyzés:

Üzenetek közzétételekor a fenti kötegfejlécek el lesznek távolítva az üzenetekből, ha léteznek.

Samples

További információ: Azure-Spring-Boot-Samples adattár a GitHubon.

Spring Integration with Azure Service Bus

Fő fogalmak

A Spring Integration lehetővé teszi az egyszerű üzenetkezelést a Spring-alapú alkalmazásokban, és deklaratív adaptereken keresztül támogatja a külső rendszerekkel való integrációt.

Az Azure Service Bus Spring Integration bővítményprojektje bejövő és kimenő csatornaadaptereket biztosít az Azure Service Bushoz.

Megjegyzés:

Az CompletableFuture támogatási API-k elavultak a 2.10.0-s verzióról, és a Reactor Core a 4.0.0-s verzióról váltja fel. Részletekért lásd: Javadoc.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfiguráció

Ez az alapindító a következő 2 konfigurációs lehetőséget biztosítja:

Csatlakozás ion konfigurációs tulajdonságai

Ez a szakasz az Azure Service Bushoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Megjegyzés:

Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

Csatlakozás spring-cloud-azure-starter-integration-servicebus konfigurálható tulajdonságai:

Tulajdonság Type Description
spring.cloud.azure.servicebus.enabled Logikai Az Azure Service Bus engedélyezése.
spring.cloud.azure.servicebus.connection-string Sztring Service Bus-névtér kapcsolati sztring érték.
spring.cloud.azure.servicebus.namespace Sztring Service Bus-névtérérték, amely a teljes tartománynév előtagja. Az FQDN-nek a NamespaceName.DomainName névből kell lennie
spring.cloud.azure.servicebus.domain-name Sztring Egy Azure Service Bus-névtérérték tartományneve.

A Service Bus processzor konfigurációs tulajdonságai

A ServiceBusInboundChannelAdapter felhasználók az ServiceBusProcessorClient üzenetek felhasználásával konfigurálják egy adott eszköz általános tulajdonságait ServiceBusProcessorClient, a fejlesztők pedig használhatják ServiceBusContainerProperties a konfigurációt. Tekintse meg a következő szakaszt a munka menetéről ServiceBusInboundChannelAdapter.

Alapszintű használat

Üzenetek küldése az Azure Service Busba

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

    • Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

  • A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

  1. Hozzon létre DefaultMessageHandler a ServiceBusTemplate beannel, hogy üzeneteket küldjön a Service Busnak, állítsa be a ServiceBusTemplate entitástípusát. Ez a minta a Service Bus-üzenetsort veszi példaként.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Hozzon létre egy üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Üzenetek küldése az átjáróval.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Üzenetek fogadása az Azure Service Busból

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

  2. Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Hozzon létre ServiceBusInboundChannelAdapter egy babot, ServiceBusMessageListenerContainer hogy üzeneteket fogadjon a Service Busnak. Ez a minta a Service Bus-üzenetsort veszi példaként.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Hozzon létre egy üzenet fogadó kötést a korábban létrehozott üzenetcsatornán ServiceBusInboundChannelAdapter keresztül.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

A ServiceBusMessageConverter konfigurálása az objectMapper testreszabásához

ServiceBusMessageConverter konfigurálható babként készült, amely lehetővé teszi a felhasználók számára a testreszabást ObjectMapper.

Service Bus-üzenetfejlécek

Egyes Service Bus-fejlécek esetében, amelyek több Spring-fejlécállandóra is leképezhetők, a különböző Spring-fejlécek prioritása megjelenik a listában.

Leképezés a Service Bus-fejlécek és a spring fejlécek között:

Service Bus-üzenetfejlécek és -tulajdonságok Spring message header constants Típus Konfigurálható Leírás
Tartalomtípus MessageHeaders#CONTENT_TYPE Sztring Igen Az üzenet RFC2045 tartalomtípus-leírója.
Correlation ID ServiceBusMessageHeaders#CORRELATION_ID Sztring Igen Az üzenet korrelációs azonosítója
Üzenetazonosító ServiceBusMessageHeaders#MESSAGE_ID Sztring Igen Az üzenet üzenetazonosítója, amelynek fejléce nagyobb prioritással rendelkezik, mint MessageHeaders#IDa .
Üzenetazonosító MessageHeaders#ID UUID Igen Az üzenet üzenetazonosítója, amelynek fejléce alacsonyabb prioritással rendelkezik, mint ServiceBusMessageHeaders#MESSAGE_IDa .
Partíciókulcs ServiceBusMessageHeaders#PARTITION_KEY Sztring Igen Az üzenet particionált entitásnak való küldéséhez használt partíciókulcs.
Válasz MessageHeaders#REPLY_CHANNEL Sztring Igen Egy entitás címe, amelybe válaszokat szeretne küldeni.
Válasz a munkamenet-azonosítóra ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Sztring Igen Az üzenet ReplyToGroupId tulajdonságértéke.
Ütemezett beiktatás időpontja utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Igen Az a dátum, amikor az üzenetet a Service Busban kell lekérni, ez a fejléc nagyobb prioritással rendelkezik, mint AzureHeaders#SCHEDULED_ENQUEUE_MESSAGEa .
Ütemezett beiktatás időpontja utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Egész Igen Az a dátum, amikor az üzenetet a Service Busban kell lekérni, ez a fejléc alacsonyabb prioritással rendelkezik, mint ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIMEa .
Munkamenet-azonosító ServiceBusMessageHeaders#SESSION_ID Sztring Igen A munkamenet-vezérelt entitás munkamenet-IDentifier-azonosítója.
Élettartam ServiceBusMessageHeaders#TIME_TO_LIVE Duration Igen Az üzenet lejárata előtti időtartam.
Végérték ServiceBusMessageHeaders#TO Sztring Igen Az üzenet "címzett" címe, amely az útválasztási forgatókönyvekben való jövőbeli használatra van fenntartva, és amelyet jelenleg maga a közvetítő figyelmen kívül hagy.
Subject ServiceBusMessageHeaders#SUBJECT Sztring Igen Az üzenet tárgya.
Holt betű hiba leírása ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Sztring Nem Egy halott betűs üzenet leírása.
Holt betű oka ServiceBusMessageHeaders#DEAD_LETTER_REASON Sztring Nem Az oka annak, hogy egy üzenet elhalt betűs volt.
Holt betűforrás ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Sztring Nem Az az entitás, amelyben az üzenet elhalt betűs volt.
Kézbesítések száma ServiceBusMessageHeaders#DELIVERY_COUNT hosszú Nem Az üzenet ügyfeleknek való kézbesítésének száma.
Enqueued sorszám ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER hosszú Nem A Service Bus által egy üzenethez hozzárendelt lekérdezett sorszám.
Enqueued time ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nem Az az időpont, amikor az üzenet be lett iktatásra a Service Busban.
A lejárat dátuma: ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nem Az az időpont, amikor az üzenet lejár.
Jogkivonat zárolása ServiceBusMessageHeaders#LOCK_TOKEN Sztring Nem Az aktuális üzenet zárolási jogkivonata.
Zárolva, amíg ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nem Az az időpont, amikor az üzenet zárolása lejár.
Sorozat száma ServiceBusMessageHeaders#SEQUENCE_NUMBER hosszú Nem A Service Bus által egy üzenethez rendelt egyedi szám.
Állami ServiceBusMessageHeaders#STATE ServiceBusMessageState Nem Az üzenet állapota, amely lehet aktív, késleltetett vagy ütemezett.

Partíciókulcs támogatása

Ez a kezdő támogatja a Service Bus particionálását azáltal, hogy engedélyezi a partíciókulcs és a munkamenet-azonosító beállítását az üzenetfejlécben. Ez a szakasz bemutatja, hogyan állíthat be partíciókulcsot az üzenetekhez.

Ajánlott: Használja ServiceBusMessageHeaders.PARTITION_KEY a fejléc kulcsaként.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nem ajánlott, de jelenleg támogatott:AzureHeaders.PARTITION_KEY a fejléc kulcsaként.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Megjegyzés:

Ha mindkettő ServiceBusMessageHeaders.PARTITION_KEYAzureHeaders.PARTITION_KEY be van állítva az üzenetfejlécekben, ServiceBusMessageHeaders.PARTITION_KEY akkor a rendszer előnyben részesíti.

Munkamenet-támogatás

Ez a példa bemutatja, hogyan állíthatja be manuálisan egy üzenet munkamenet-azonosítóját az alkalmazásban.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Megjegyzés:

Ha az ServiceBusMessageHeaders.SESSION_ID üzenetfejlécekben van beállítva, és egy másik ServiceBusMessageHeaders.PARTITION_KEY fejléc is be van állítva, a munkamenet-azonosító értéke végül a partíciókulcs értékének felülírására lesz használva.

Samples

További információ: Azure-Spring-Boot-Samples adattár a GitHubon.

Spring Integration with Azure Storage Queue

Fő fogalmak

Az Azure Queue Storage szolgáltatás nagy számú üzenet tárolására szolgál. A világ bármely pontjáról elérheti az üzeneteket hitelesített hívásokon keresztül HTTP vagy HTTPS használatával. Az üzenetsor-üzenetek mérete legfeljebb 64 KB lehet. Az üzenetsorok több millió üzenetet tartalmazhatnak, akár a tárfiók teljes kapacitáskorlátját is. Az üzenetsorokat gyakran használják az aszinkron feldolgozáshoz használt teendőlista létrehozásához.

Függőség beállítása

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfiguráció

Ez az alapindító a következő konfigurációs beállításokat biztosítja:

Csatlakozás ion konfigurációs tulajdonságai

Ez a szakasz az Azure Storage Queuehoz való csatlakozáshoz használt konfigurációs beállításokat tartalmazza.

Megjegyzés:

Ha úgy dönt, hogy biztonsági taggal hitelesít és engedélyez egy Azure-erőforráshoz való hozzáférést a Microsoft Entra-azonosítóval, olvassa el a Hozzáférés engedélyezése a Microsoft Entra-azonosítóval című témakört, amelyből megtudhatja, hogy a biztonsági tag megkapta-e a megfelelő engedélyt az Azure-erőforrás eléréséhez.

Csatlakozás spring-cloud-azure-starter-integration-storage-queue konfigurálható tulajdonságai:

Tulajdonság Type Description
spring.cloud.azure.storage.queue.enabled Logikai Hogy engedélyezve van-e egy Azure Storage-üzenetsor.
spring.cloud.azure.storage.queue.connection-string Sztring A Tárolósor névtere kapcsolati sztring érték.
spring.cloud.azure.storage.queue.accountName Sztring Storage Queue-fiók neve.
spring.cloud.azure.storage.queue.accountKey Sztring Storage Queue-fiókkulcs.
spring.cloud.azure.storage.queue.endpoint Sztring Storage Queue szolgáltatásvégpont.
spring.cloud.azure.storage.queue.sasToken Sztring Sas-jogkivonat hitelesítő adatai
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion Api-kérések készítésekor használt QueueServiceVersion.
spring.cloud.azure.storage.queue.messageEncoding Sztring Üzenetsor üzenetkódolása.

Alapszintű használat

Üzenetek küldése az Azure Storage-üzenetsorba

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

    • Az kapcsolati sztring hitelesítő adatokhoz konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • A hitelesítő adatok felügyelt identitásként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

  • A hitelesítő adatok szolgáltatásnévként való megadásához konfigurálja az alábbi tulajdonságokat az application.yml fájlban:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Megjegyzés:

Az engedélyezett tenant-id értékek a következők: common, organizations, consumersvagy a bérlőazonosító. Ezekről az értékekről további információt a AADSTS50020 – Az identitásszolgáltatótól származó felhasználói fiók nem létezik a bérlőben című hiba helytelen végpontja (személyes és szervezeti fiókok) című szakaszában talál. Az egybérlős alkalmazás konvertálásával kapcsolatos információkért lásd : Egybérlős alkalmazás átalakítása több-bérlőssé a Microsoft Entra-azonosítón.

  1. Hozzon létre DefaultMessageHandler egy babot, StorageQueueTemplate hogy üzeneteket küldjön a Storage-üzenetsorba.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Hozzon létre egy Üzenetátjáró-kötést a fenti üzenetkezelővel egy üzenetcsatornán keresztül.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Üzenetek küldése az átjáróval.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Üzenetek fogadása az Azure Storage-üzenetsorból

  1. Adja meg a hitelesítő adatok konfigurációs beállításait.

  2. Hozzon létre egy bab üzenetcsatornát bemeneti csatornaként.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Hozzon létre StorageQueueMessageSource egy babot a StorageQueueTemplate Storage-üzenetsorba érkező üzenetek fogadásához.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Hozzon létre egy üzenet fogadó kötést a StorageQueueMessageSource használatával, amelyet az előző lépésben hoztunk létre a korábban létrehozott üzenetcsatornán keresztül.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Samples

További információ: Azure-Spring-Boot-Samples adattár a GitHubon.