Aszinkron programozás a Javához készült Azure SDK-ban

Ez a cikk a Javához készült Azure SDK aszinkron programozási modelljét ismerteti.

Az Azure SDK kezdetben csak nem blokkoló, aszinkron API-kat tartalmazott az Azure-szolgáltatásokkal való interakcióhoz. Ezek az API-k lehetővé teszik az Azure SDK használatát a rendszererőforrásokat hatékonyan használó skálázható alkalmazások létrehozásához. A Java-hoz készült Azure SDK azonban szinkron ügyfeleket is tartalmaz a szélesebb közönség kiszolgálásához, és az ügyfélkódtárakat olyan felhasználók számára is elérhetővé teszi, akik nem ismerik az aszinkron programozást. (Lásd: Elérhető az Azure SDK tervezési irányelveiben.) Így az Azure SDK for Java összes Java-ügyfélkódtára aszinkron és szinkron ügyfeleket is kínál. Azt javasoljuk azonban, hogy az aszinkron ügyfeleket éles rendszerekhez használva maximalizálja a rendszererőforrások használatát.

Reaktív streamek

Ha a Java Azure SDK tervezési útmutatójában az Async Service Clients szakaszt tekinti át, láthatja, hogy a Java 8 által biztosítottak CompletableFuture helyett az aszinkron API-k reaktív típusokat használnak. Miért választottunk reaktív típusokat a JDK-ban natívan elérhető típusok közül?

A Java 8 olyan funkciókat vezetett be, mint a adatfolyamok, a Lambdas és az CompletableFuture. Ezek a funkciók számos képességet biztosítanak, de bizonyos korlátozásokkal rendelkeznek.

CompletableFuture visszahívásalapú, nem blokkoló képességeket biztosít, valamint az CompletionStage aszinkron műveletek egyszerű összeállításához engedélyezett felületet. A Lambdas olvashatóbbá teszi ezeket a leküldéses alapú API-kat. adatfolyamok funkcionális stílusú műveleteket biztosít az adatelemek gyűjteményének kezeléséhez. A streamek azonban szinkronok, és nem használhatók újra. CompletableFuturelehetővé teszi, hogy egyetlen kérést küldjön, támogatást nyújt a visszahíváshoz, és egyetlen választ vár. Számos felhőszolgáltatás azonban megköveteli az adatok streamelésének lehetőségét – például az Event Hubsot.

A reaktív streamek segíthetnek leküzdeni ezeket a korlátozásokat azáltal, hogy az elemeket egy forrásból egy előfizetőbe streamelik. Amikor egy előfizető adatokat kér egy forrástól, a forrás bármilyen számú eredményt küld vissza. Nem kell egyszerre elküldenie őket. Az átvitel egy adott időszakban történik, amikor a forrásnak van elküldendő adata.

Ebben a modellben az előfizető eseménykezelőket regisztrál az adatok beérkezésekor történő feldolgozásához. Ezek a leküldéses interakciók különböző jeleken keresztül értesítik az előfizetőt:

  • A onSubscribe() hívás azt jelzi, hogy az adatátvitel megkezdődött.
  • A onError() hívás azt jelzi, hogy hiba történt, ami az adatátvitel végét is jelzi.
  • A onComplete() hívás az adatátvitel sikeres befejezését jelzi.

A Java adatfolyamok ellentétben a reaktív streamek első osztályú eseményként kezelik a hibákat. A reaktív streamek dedikált csatornával rendelkeznek a forrás számára, a hibáknak az előfizetővel való közléséhez. A reaktív streamek lehetővé teszik, hogy az előfizető tárgyalja az adatátviteli sebességet, hogy ezeket a streameket leküldéses lekéréses modellté alakítsa.

A Reaktív adatfolyamok specifikáció szabványt biztosít az adatok átvitelének módjához. Magas szinten a specifikáció a következő négy interfészt határozza meg, és meghatározza az interfészek implementálásának szabályait.

  • A Publisher egy adatfolyam forrása.
  • Az előfizető egy adatfolyam fogyasztója.
  • Az előfizetés kezeli a közzétevő és az előfizető közötti adatátvitel állapotát.
  • A processzor egyaránt közzétevő és előfizető.

Vannak jól ismert Java-kódtárak, amelyek a specifikáció implementációit biztosítják, például az RxJava, az Akka adatfolyamok, a Vert.x és a Project Reactor.

A Java-hoz készült Azure SDK elfogadta a Project Reactort, hogy az aszinkron API-kat kínáljon. Ennek a döntésnek a fő mozgatórugója az volt, hogy zökkenőmentes integrációt biztosítson a Project Reactort is használó Spring Webfluxszal. A Project Reactor RxJava-n keresztül történő kiválasztásához az is hozzájárult, hogy a Project Reactor Java 8-at használ, de az RxJava akkor még a Java 7-nél volt. A Project Reactor számos olyan operátort is kínál, amelyek összeállíthatók, és lehetővé teszik deklaratív kód írását adatfeldolgozási folyamatok létrehozásához. A Project Reactor egy másik szép dolog, hogy adapterekkel rendelkezik a Project Reactor-típusok más népszerű implementálási típusokká való átalakításához.

Szinkron és aszinkron műveletek API-jainak összehasonlítása

Szó esett az aszinkron ügyfelek szinkron ügyfeleiről és lehetőségeiről. Az alábbi táblázat összefoglalja, hogyan néznek ki az API-k az alábbi lehetőségek használatával:

API Type Nincs érték Egyetlen érték Több érték
Standard Java – Szinkron API-k void T Iterable<T>
Standard Java – Aszinkron API-k CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reaktív adatfolyamok interfészek Publisher<Void> Publisher<T> Publisher<T>
Reaktív adatfolyamok projektreaktor implementálása Mono<Void> Mono<T> Flux<T>

A teljesség kedvéért érdemes megemlíteni, hogy a Java 9 bevezette a Flow osztályt, amely tartalmazza a négy reaktív stream interfészt. Ez az osztály azonban nem tartalmaz implementációt.

Aszinkron API-k használata a Javához készült Azure SDK-ban

A reaktív streamek specifikációja nem tesz különbséget a közzétevők típusai között. A reaktív streamek specifikációjában a közzétevők egyszerűen nulla vagy több adatelemet állítanak elő. Sok esetben érdemes különbséget tenni a közzétevők között, amelyek legfeljebb egy adatelemet állítanak elő, szemben egy nullát vagy többet termelővel. A felhőalapú API-kban ez a különbség azt jelzi, hogy egy kérés egyetlen értékű választ vagy gyűjteményt ad vissza. A Project Reactor kétféleképpen teszi ezt a különbséget : Mono és Flux. Az a api, amely egy olyan választ ad Mono vissza, amely legfeljebb egy értékkel rendelkezik, és egy olyan API, amely egy olyan választ ad vissza Flux , amely nulla vagy több értéket tartalmaz.

Tegyük fel például, hogy egy ConfigurationAsyncClient használatával kéri le a Azure-alkalmazás Konfigurációs szolgáltatással tárolt konfigurációt. (További információ:Mi Azure-alkalmazás konfiguráció?.)

Ha létrehoz egy ügyfelet ConfigurationAsyncClient , és meghívja getConfigurationSetting() az ügyfelet, az egy Monoértéket ad vissza, amely azt jelzi, hogy a válasz egyetlen értéket tartalmaz. A metódus meghívása azonban nem tesz semmit. Az ügyfél még nem küldött kérelmet a Azure-alkalmazás Konfigurációs szolgáltatáshoz. Ebben a szakaszban az Mono<ConfigurationSetting> API által visszaadott adatok csak az adatfeldolgozási folyamat "szerelvényei". Ez azt jelenti, hogy az adatok felhasználásához szükséges beállítás befejeződött. Az adatátvitel tényleges aktiválásához (azaz a kérés szolgáltatáshoz való kéréséhez és a válasz lekéréséhez) elő kell fizetnie a visszaadott Mono. Tehát, amikor foglalkozik ezekkel a reaktív streamek, meg kell emlékeznie, hogy hívja subscribe() , mert semmi sem történik, amíg meg nem teszi.

Az alábbi példa bemutatja, hogyan iratkozhat fel a Mono konzolra, és hogyan nyomtathatja ki a konfigurációs értéket.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Figyelje meg, hogy az ügyfél meghívása getConfigurationSetting() után a példakód feliratkozik az eredményre, és három különálló lambdát biztosít. Az első lambda a szolgáltatástól kapott adatokat használja fel, amelyek sikeres válasz esetén aktiválódnak. A második lambda akkor aktiválódik, ha hiba történik a konfiguráció beolvasása közben. A harmadik lambda az adatfolyam befejezésekor lesz meghívva, ami azt jelenti, hogy nem várható több adatelem ebből a streamből.

Megjegyzés:

Az összes aszinkron programozáshoz hasonlóan az előfizetés létrehozása után a végrehajtás a szokásos módon folytatódik. Ha nincs semmi, a program aktív és végrehajtható marad, az aszinkron művelet befejeződése előtt leállhat. A hívott subscribe() fő szál nem várja meg, amíg a hálózati hívást Azure-alkalmazás konfigurációra, és választ kap. Az éles rendszerekben előfordulhat, hogy valami mást is feldolgoz, de ebben a példában egy kis késést adhat hozzá hívással Thread.sleep() vagy aszinkron CountDownLatch művelettel, hogy az aszinkron művelet befejeződjön.

Az alábbi példában látható módon a visszaadott Flux API-k szintén hasonló mintát követnek. A különbség az, hogy a metódushoz subscribe() megadott első visszahívást a rendszer többször hívja meg a válasz minden adateleméhez. A hiba vagy a befejezési visszahívások pontosan egyszer lesznek meghívva, és termináljeleknek minősülnek. A rendszer nem hív meg más visszahívásokat, ha ezek közül bármelyik jel a közzétevőtől érkezik.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Visszanyomás

Mi történik, ha a forrás gyorsabban készíti el az adatokat, mint amennyit az előfizető képes kezelni? Az előfizető túlterhelheti az adatokat, ami memóriakihasználtsághoz vezethet. Az előfizetőnek vissza kell kommunikálnia a közzétevővel, hogy lassítson, ha nem tud lépést tartani. Alapértelmezés szerint a fenti példában látható módon történő híváskor subscribe()Flux az előfizető kötetlen adatstreamet kér, jelezve a közzétevőnek, hogy a lehető leggyorsabban küldje el az adatokat. Ez a viselkedés nem mindig kívánatos, és előfordulhat, hogy az előfizetőnek szabályoznia kell a közzététel sebességét a "backpressure" használatával. A backpressure lehetővé teszi az előfizető számára az adatelemek áramlásának irányítását. Az előfizetők korlátozott számú adatelemet igényelnek, amelyeket kezelni tudnak. Miután az előfizető befejezte ezeknek az elemeknek a feldolgozását, az előfizető további kéréseket kérhet. A backpressure használatával leküldéses adatátviteli modellt alakíthat át leküldéses lekéréses modellté.

Az alábbi példa bemutatja, hogyan szabályozhatja, hogy az Eseményközpontok fogyasztója milyen sebességgel fogad eseményeket:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Amikor az előfizető először "csatlakozik" a közzétevőhöz, a közzétevő átad egy példányt Subscription az előfizetőnek, amely kezeli az adatátvitel állapotát. Ez Subscription az a közeg, amelyen keresztül az előfizető visszanyomást alkalmazhat, ha meghívja request() , hogy adja meg, hány további adatelemet képes kezelni.

Ha az előfizető minden híváskor onNext()több adatelemet kér, request(10) például a közzétevő azonnal elküldi a következő 10 elemet, ha elérhetők, vagy amikor elérhetővé válnak. Ezek az elemek egy pufferben halmozódnak fel az előfizető végén, és mivel minden onNext() hívás további 10-et kér, a hátralék mindaddig növekszik, amíg a közzétevőnek nincs több küldendő adateleme, vagy az előfizető pufferének túlcsordulása, ami memóriakimaradási hibákat eredményez.

Cancel a subscription

Az előfizetés kezeli a közzétevő és az előfizető közötti adatátvitel állapotát. Az előfizetés addig aktív, amíg a közzétevő nem végzi el az összes adat átvitelét az előfizetőnek, vagy az előfizető már nem szeretne adatokat kapni. Az alábbiakban látható módon lemondhatja az előfizetést.

Az alábbi példa megszakítja az előfizetést az előfizető kizárásával:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Az alábbi példa a metódus meghívásával megszakítja az cancel() előfizetést Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Összefoglalás

A szálak költséges erőforrások, amelyeket nem szabad pazarolni a távoli szolgáltatáshívások válaszaira való várakozásra. A mikroszolgáltatási architektúrák bevezetésének növekedésével az erőforrások hatékony skálázásának és használatának szükségessége létfontosságúvá válik. Az aszinkron API-k akkor előnyösek, ha hálózathoz kötött műveletek vannak. Az Azure SDK for Java számos API-t kínál az aszinkron műveletekhez a rendszererőforrások maximalizálása érdekében. Javasoljuk, hogy próbálja ki az aszinkron ügyfeleinket.

Az adott feladatoknak leginkább megfelelő operátorokról további információt a Reactor 3 referencia-útmutatójában talál.

További lépések

Most, hogy jobban megismerte a különböző aszinkron programozási fogalmakat, fontos, hogy megtanulja, hogyan lehet iterálni az eredményeket. A legjobb iterációs stratégiákról és a lapozás működésének részleteiről további információt a Java Azure SDK-ban található Pagination és iteráció című témakörben talál.