Share via


Java için Azure SDK'da zaman uyumsuz programlama

Bu makalede Java için Azure SDK'sında zaman uyumsuz programlama modeli açıklanmaktadır.

Azure SDK başlangıçta yalnızca Azure hizmetleriyle etkileşime yönelik engelleyici olmayan, zaman uyumsuz API'ler içeriyordu. Bu API'ler, sistem kaynaklarını verimli bir şekilde kullanan ölçeklenebilir uygulamalar oluşturmak için Azure SDK'sını kullanmanıza olanak tanır. Bununla birlikte , Java için Azure SDK daha geniş bir kitleye hitap eden zaman uyumlu istemciler de içerir ve istemci kitaplıklarımızı zaman uyumsuz programlama hakkında bilgi sahibi olmayan kullanıcılar için erişilebilir hale getirir. (Bkz. Azure SDK tasarım yönergelerinde ulaşılabilir.) Bu nedenle, Java için Azure SDK'daki tüm Java istemci kitaplıkları hem zaman uyumsuz hem de zaman uyumlu istemciler sunar. Ancak, sistem kaynaklarının kullanımını en üst düzeye çıkarmak için üretim sistemleri için zaman uyumsuz istemcileri kullanmanızı öneririz.

Reaktif akışlar

Java Azure SDK Tasarım Yönergeleri'nin Zaman Uyumsuz Hizmet İstemcileri bölümüne bakarsanız, Java 8 tarafından sağlananları kullanmak CompletableFuture yerine zaman uyumsuz API'lerimizin reaktif türler kullandığını fark edersiniz. JDK'da yerel olarak kullanılabilen türler yerine neden reaktif türleri seçtik?

Java 8, Akışlar, Lambdas ve CompletableFuture gibi özellikleri kullanıma sunar. Bu özellikler birçok özellik sağlar, ancak bazı sınırlamaları vardır.

CompletableFuture geri çağırma tabanlı, engelleyici olmayan özellikler ve CompletionStage bir dizi zaman uyumsuz işlemin kolay bileşimi için izin verilen arabirim sağlar. Lambdalar, bu gönderme tabanlı API'leri daha okunabilir hale getirir. Akışlar veri öğelerinin bir koleksiyonunu işlemek için işlevsel stil işlemleri sağlar. Ancak akışlar zaman uyumlu olur ve yeniden kullanılamaz. CompletableFuturetek bir istekte bulunmanıza olanak tanır, geri arama için destek sağlar ve tek bir yanıt bekler. Ancak birçok bulut hizmeti, örneğin Event Hubs gibi veri akışı yapabilmeyi gerektirir.

Reaktif akışlar, öğeleri kaynaktan aboneye aktararak bu sınırlamaların üstesinden gelmeye yardımcı olabilir. Abone bir kaynaktan veri istediğinde, kaynak herhangi bir sayıda sonucu geri gönderir. Hepsini aynı anda göndermesi gerekmez. Kaynakta gönderilecek veriler olduğunda aktarım belirli bir süre boyunca gerçekleşir.

Bu modelde abone, olay işleyicilerini geldiğinde verileri işlemek üzere kaydeder. Bu gönderme tabanlı etkileşimler, aboneyi farklı sinyallerle bilgilendirir:

  • Çağrı onSubscribe() , veri aktarımının başlamak üzere olduğunu gösterir.
  • Çağrı onError() , veri aktarımının sonunu da işaretleyen bir hata olduğunu gösterir.
  • Çağrı onComplete() , veri aktarımının başarıyla tamamlanmasını gösterir.

Java Akışlar aksine, reaktif akışlar hataları birinci sınıf olaylar olarak ele alır. Reaktif akışlar, kaynağın hataları aboneye iletmesi için ayrılmış bir kanala sahiptir. Ayrıca, reaktif akışlar abonenin bu akışları bir anında çekme modeline dönüştürmek için veri aktarım hızı konusunda anlaşmasına olanak sağlar.

Reactive Akışlar belirtimi, veri aktarımının nasıl gerçekleşmesi gerektiğine ilişkin bir standart sağlar. Belirtim, üst düzeyde aşağıdaki dört arabirimi tanımlar ve bu arabirimlerin nasıl uygulanması gerektiğine ilişkin kuralları belirtir.

  • Publisher bir veri akışının kaynağıdır.
  • Abone , bir veri akışının tüketicisidir.
  • Abonelik , yayımcı ve abone arasındaki veri aktarımının durumunu yönetir.
  • İşlemci hem yayımcı hem de abonedir.

RxJava, Akka Akışlar, Vert.x ve Project Reactor gibi bu belirtimin uygulamalarını sağlayan bazı iyi bilinen Java kitaplıkları vardır.

Java için Azure SDK, zaman uyumsuz API'lerini sunmak için Project Reactor'ı benimsedi. Bu kararı veren ana faktör, Project Reactor'ı da kullanan Spring Webflux ile sorunsuz tümleştirme sağlamaktı. Project Reactor'ı RxJava yerine seçmeye katkıda bulunan bir diğer faktör de Project Reactor'ın Java 8 kullanması ama O zamanlar RxJava'nın hala Java 7'de olmasıydı. Project Reactor ayrıca birleştirilebilir ve veri işleme işlem hatları oluşturmak için bildirim temelli kod yazmanıza olanak sağlayan zengin bir işleç kümesi sunar. Project Reactor'ın bir diğer güzel özelliği de Project Reactor türlerini diğer popüler uygulama türlerine dönüştürmek için bağdaştırıcılara sahip olmasıdır.

Zaman uyumlu ve zaman uyumsuz işlemlerin API'lerini karşılaştırma

Zaman uyumlu istemcileri ve zaman uyumsuz istemciler için seçenekleri ele aldık. Aşağıdaki tabloda, şu seçenekler kullanılarak tasarlanan API'lerin nasıl göründüğü özetlenmiştir:

API Türü Değer yok Tek değer Birden çok değer
Standart Java - Zaman Uyumlu API'ler void T Iterable<T>
Standart Java - Zaman Uyumsuz API'ler CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Reaktif Akışlar Arabirimleri Publisher<Void> Publisher<T> Publisher<T>
Reaktif Akışlar Proje Reaktörü uygulaması Mono<Void> Mono<T> Flux<T>

Eksiksizlik açısından, Java 9'un dört reaktif akış arabirimi içeren Flow sınıfını tanıttığını belirtmek gerekir. Ancak, bu sınıf herhangi bir uygulama içermez.

Java için Azure SDK'da zaman uyumsuz API'ler kullanma

Reaktif akış belirtimi, yayımcı türleri arasında ayrım yapmaz. Reaktif akış belirtiminde yayımcılar sıfır veya daha fazla veri öğesi üretir. Çoğu durumda, en fazla bir veri öğesi üreten yayımcı ile sıfır veya daha fazla veri üreten yayımcı arasında yararlı bir ayrım vardır. Bulut tabanlı API'lerde bu fark, isteğin tek değerli bir yanıt mı yoksa koleksiyon mu döndürdüğüne işaret eder. Project Reactor bu ayrımı yapmak için iki tür sağlar - Mono ve Flux. döndüren BIR Mono API en fazla bir değer içeren bir yanıt içerir ve döndüren bir Flux API sıfır veya daha fazla değer içeren bir yanıt içerir.

Örneğin, Azure Uygulaması Yapılandırma hizmeti kullanılarak depolanan bir yapılandırmayı almak için ConfigurationAsyncClient kullandığınızı varsayalım. (Daha fazla bilgi için bkz.Azure Uygulaması Yapılandırması nedir?.)

İstemcide bir ConfigurationAsyncClient ve çağrısı getConfigurationSetting() oluşturursanız, yanıtın tek bir Monodeğer içerdiğini gösteren bir döndürür. Ancak, bu yöntemi tek başına çağırmak hiçbir şey yapmaz. İstemci henüz Azure Uygulaması Yapılandırma hizmetine istekte bulunmadı. Bu aşamada, Mono<ConfigurationSetting> bu API tarafından döndürülen yalnızca veri işleme işlem hattının "derlemesi"dir. Bu, verileri tüketmek için gerekli kurulumun tamamlandığını gösterir. Veri aktarımını gerçekten tetikleyebilmek için (hizmete istekte bulunmak ve yanıtı almak için), döndürülen Monoöğesine abone olmanız gerekir. Bu nedenle, bu reaktif akışlarla ilgilenirken aramayı subscribe() unutmamalısınız çünkü siz bunu yapıncaya kadar hiçbir şey olmaz.

Aşağıdaki örnekte konsoluna abone olma Mono ve yapılandırma değerinin konsola yazdırılma işlemi gösterilmektedir.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

İstemcide çağrıldıktan getConfigurationSetting() sonra örnek kodun sonuba abone olduğuna ve üç ayrı lambda sağladığına dikkat edin. İlk lambda, hizmetten alınan verileri tüketir ve bu da başarılı bir yanıt üzerine tetikler. Yapılandırma alınırken bir hata olduğunda ikinci lambda tetikleniyor. Üçüncü lambda, veri akışı tamamlandığında çağrılır, yani bu akıştan başka veri öğesi beklenmez.

Dekont

Tüm zaman uyumsuz programlamalarda olduğu gibi, abonelik oluşturulduktan sonra yürütme her zamanki gibi devam eder. Programı etkin tutmak ve yürütmek için hiçbir şey yoksa, zaman uyumsuz işlem tamamlanmadan önce sonlandırılabilir. Çağrılan subscribe() ana iş parçacığı, Azure Uygulaması Yapılandırması'na ağ çağrısı yapıp bir yanıt alıncaya kadar beklemez. Üretim sistemlerinde başka bir şeyi işlemeye devam edebilirsiniz, ancak bu örnekte çağırarak Thread.sleep() küçük bir gecikme ekleyebilir veya zaman uyumsuz işlemin tamamlanması için bir şans vermek için bir CountDownLatch kullanabilirsiniz.

Aşağıdaki örnekte gösterildiği gibi, bir döndüren Flux API'ler de benzer bir desen izler. Aradaki fark, yönteme sağlanan ilk geri çağırmanın subscribe() yanıttaki her veri öğesi için birden çok kez çağrılır. Hata veya tamamlama geri çağırmaları tam olarak bir kez çağrılır ve terminal sinyalleri olarak kabul edilir. Bu sinyallerden biri yayımcıdan alınırsa başka bir geri çağırma çağrılmazsa.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Geri baskı

Kaynak verileri abonenin işleyebileceğinden daha hızlı ürettiğinde ne olur? Abone verilerden etkilenebilir ve bu da yetersiz bellek hatalarına yol açabilir. Abonenin, yetişemediğinde yavaşlayabilmesi için yayımcıyla geri iletişim kurması için bir yönteme ihtiyacı vardır. Varsayılan olarak, yukarıdaki örnekte gösterildiği gibi bir Flux çağrısı subscribe() yaptığınızda abone, yayımcıya verileri mümkün olan en kısa sürede göndermesini belirten, ilişkisiz bir veri akışı isteğinde bulunur. Bu davranış her zaman arzu edilmez ve abonenin "backpressure" aracılığıyla yayımlama hızını denetlemesi gerekebilir. Geri baskı, abonenin veri öğelerinin akışını denetlemesini sağlar. Abone, işleyebileceği sınırlı sayıda veri öğesi isteyecektir. Abone bu öğeleri işlemeyi tamamladıktan sonra, abone daha fazla istekte bulunabilir. Geri baskı kullanarak, veri aktarımı için bir gönderme modelini anında iletme modeline dönüştürebilirsiniz.

Aşağıdaki örnekte Event Hubs tüketicisi tarafından olayların alınma hızını nasıl denetleyebileceğiniz gösterilmektedir:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Abone yayımcıya ilk kez "bağlandığında", yayımcı aboneye veri aktarımının durumunu yöneten bir Subscription örnek iletir. Bu Subscription , abonenin kaç veri öğesi daha işleyebileceğini belirtmek için çağırarak request() geri baskı uygulayabileceği ortamdır.

Abone her çağırdığında onNext()request(10) birden fazla veri öğesi isterse( örneğin, yayımcı kullanılabilir olduğunda veya kullanılabilir olduğunda) sonraki 10 öğeyi hemen gönderir. Bu öğeler abonenin sonunda bir arabellekte birikir ve her onNext() çağrı 10 daha fazla istekte bulunacağı için, yayımcının göndereceği veri öğesi kalmayıncaya veya abonenin arabellek taşması nedeniyle bellek yetersiz hatalarına neden olana kadar kapsam büyümeye devam eder.

Aboneliği iptal etme

Abonelik, yayımcı ve abone arasındaki veri aktarımının durumunu yönetir. Yayımcı tüm verileri aboneye aktarmayı tamamlayana veya abone artık veri almakla ilgilenmeyene kadar abonelik etkindir. Aşağıda gösterildiği gibi aboneliği iptal etmenin birkaç yolu vardır.

Aşağıdaki örnek aboneyi atarak aboneliği iptal eder:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

Aşağıdaki örnek, üzerinde Subscriptionyöntemini çağırarak cancel() aboneliği iptal eder:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Sonuç

İş parçacıkları, uzak hizmet çağrılarından gelen yanıtları beklerken boşa harcamamanız gereken pahalı kaynaklardır. Mikro hizmet mimarilerinin benimsenmesi arttıkça kaynakları verimli bir şekilde ölçeklendirme ve kullanma gereksinimi de önemli hale gelir. Ağa bağlı işlemler olduğunda zaman uyumsuz API'ler uygundur. Java için Azure SDK, sistem kaynaklarınızı en üst düzeye çıkarmak için zaman uyumsuz işlemler için zengin bir API kümesi sunar. Zaman uyumsuz müşterilerimizi denemenizi kesinlikle öneririz.

Belirli görevlerinize en uygun operatörler hakkında daha fazla bilgi için Reactor 3 Referans Kılavuzu'nda hangi operatöre ihtiyacım var? bölümüne bakın.

Sonraki adımlar

Çeşitli zaman uyumsuz programlama kavramlarını daha iyi anladığınıza göre, sonuçlar üzerinde yineleme yapmayı öğrenmek önemlidir. En iyi yineleme stratejileri ve sayfalandırmanın nasıl çalıştığı hakkında daha fazla bilgi için bkz . Java için Azure SDK'da sayfalandırma ve yineleme.