Spark'tan Azure Cosmos DB Cassandra API’sine bağlama

Uygulama hedefı: Cassandra API

Bu makale, Spark'tan tümleştirme için Azure Cosmos DB Cassandra API makalelerden biri. Makalelerde bağlantı, Veri Tanımlama Dili (DDL) işlemleri, temel Veri İşleme Dili (DML) işlemleri ve Spark'tan gelişmiş Azure Cosmos DB Cassandra API tümleştirmesi yer alır.

Önkoşullar

Bağlantı bağımlılıkları

  • Cassandra için Spark bağlayıcısı: Spark bağlayıcısı, Azure Cosmos DB Cassandra API. Maven Central'da bulunan ve Spark ortamının Spark ve Scala sürümleriyle uyumlu bağlayıcı sürümünü tanımlama ve kullanma. Spark 3.0 veya üstlerini destekleyen bir ortam ve maven koordinatlarında kullanılabilen spark bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 önerilir. Spark 2.x kullanıyorsanız, Maven koordinatlarında Spark bağlayıcısı kullanarak Spark sürüm 2.4.5 ile bir ortam com.datastax.spark:spark-cassandra-connector_2.11:2.4.3 öneririz.

  • Cassandra API için Azure Cosmos DB yardımcı Cassandra API: Spark bağlayıcıya ek olarak Spark 2.x sürümünü kullanıyorsanız hız sınırlamasını işlemek için Azure Cosmos DB'den maven koordinatlarıyla azure-cosmos-cassandra-spark-helper adlı başka bir kitaplık com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 gerekir. Bu kitaplık özel bağlantı fabrikası ve yeniden deneme ilkesi sınıfları içerir.

    Azure Cosmos DB'de yeniden deneme ilkesi, HTTP durum kodu 429("İstek Oranı Büyük") özel durumlarını işecek şekilde yapılandırılmıştır. Azure Cosmos DB Cassandra API bu özel durumları Cassandra yerel protokolünde aşırı yüklenmiş hatalara çevirir ve geri almalarla yeniden sınabilirsiniz. Azure Cosmos DB sağlanan işleme hızı modelini kullandığı için, giriş/çıkış hızları artmışsa istek hızı sınırlama özel durumları oluşur. Yeniden deneme ilkesi, spark işlerinizi kapsayıcınız için ayrılan aktarım hızını kısa süre içinde aşan ani veri artışlarına karşı korur. Spark 3.x bağlayıcısı kullanıyorsanız, bu kitaplığın uygulanması gerekmez.

    Not

    Yeniden deneme ilkesi spark işlerinizi yalnızca anlık ani artışlara karşı koruyabilir. İş yüklerinizi çalıştırmak için gereken ru'ları yapılandırmadıysanız, yeniden deneme ilkesi geçerli değildir ve yeniden deneme ilkesi sınıfı özel durumu yeniden oluşturur.

  • Azure Cosmos DB hesabı bağlantı ayrıntıları: Azure Cassandra API adı, hesap uç noktası ve anahtarı içerir.

Spark bağlayıcısı aktarım hızı yapılandırmasını iyileştirme

Sonraki bölümde Cassandra için Spark Bağlayıcısı kullanarak aktarım hızını denetlemeye ilişkin tüm ilgili parametreler listelenmiştir. Spark işleri için aktarım hızını en üst düzeye çıkarmak üzere parametreleri iyileştirmek amacıyla, çok fazla azaltma ve gerileme (bu da daha düşük aktarım hızına yol açsa da) önlemek için , ve yapılandırmaların doğru yapılandırılması spark.cassandra.output.concurrent.writes spark.cassandra.concurrent.reads spark.cassandra.input.reads_per_sec gerekir.

Bu yapılandırmaların en uygun değeri 4 faktöre bağlıdır:

  • Verilerin alınıyor olduğu tablo için yapılandırılmış aktarım hızı (İstek Birimleri) miktarı.
  • Spark kümenizin çalışan sayısı.
  • Spark işi için yapılandırılan yürütücü sayısı (Spark sürümü kullanılarak veya spark.cassandra.connection.connections_per_executor_max buna bağlı olarak denetlen spark.cassandra.connection.remoteConnectionsPerExecutor kullanılabilir)
  • Aynı Veri Merkezi'nde birlikte bulunduysanız cosmos DB'ye her isteğin ortalama gecikme süresi. Bu değerin yazmalar için 10 ms ve okumalar için 3 ms olduğunu varsayalım.

Örneğin, 5 çalışan ve = 1 değeri ve = 1 değerimiz varsa, her biri 1 iş parçacığıyla aynı anda tabloya yazan 5 çalışan spark.cassandra.output.concurrent.writes spark.cassandra.connection.remoteConnectionsPerExecutor vardır. Tek yazma işlemi 10 ms sürerse, iş parçacığı başına saniye başına 100 istek (1000 milisaniye bölünmüş 10 milisaniye) gönderebiliriz. 5 çalışanla bu, saniye başına 500 yazma olur. Yazma başına ortalama 5 istek birimi (RU) maliyetinde hedef tabloda en az 2500 istek biriminin sağlanması gerekir (saniye başına 5 RU x 500 yazma).

Yürütücü sayısını artırmak belirli bir işteki iş parçacığı sayısını artırabilir ve bu da aktarım hızını artırabilir. Ancak, bunun tam etkisi işe bağlı olarak değişken olabilirken, çalışan sayısıyla aktarım hızını denetlemek daha belirleyicidir. İstek Birimi (RU) ücreti almak için belirli bir isteğin tam maliyetini belirlemek için bu isteğin profilini de çıkarabilirsiniz. Bu, tablo veya anahtar alanınız için aktarım hızı sağlarken daha doğru bir şekilde karar alamanıza yardımcı olur. İstek düzeyinde istek birimi ücretlerini nasıl alasınız anlamak için buradaki makalemize göz atabilirsiniz.

Veritabanında aktarım hızını ölçeklendirme

Cassandra Spark bağlayıcısı, Azure Veritabanı'Cosmos verimli bir şekilde aktarım hızını artırır. Sonuç olarak, etkili yeniden denemelerde bile, hız sınırlamayla ilgili hataları önlemek için tabloda veya anahtar alanı düzeyinde yeterli aktarım hızı (RU) sağ olduğundan emin olun. Verilen bir tablo veya anahtar alanı için en az 400 RU ayarı yeterli olmayacaktır. En düşük aktarım hızı yapılandırma ayarlarında bile Spark bağlayıcısı yaklaşık 6000 istek birimine veya daha fazla işleme birimine karşılık gelen bir hızla yazabilir.

Spark kullanarak veri taşıma için gereken RU ayarı, kararlı durum iş yükünüz için gerekenden daha yüksekse, azure Cosmos DB'de aktarım hızının ölçeğini sistematik olarak artırarak ve azaltırken, belirtilen süre boyunca iş yüklerinin ihtiyaçlarını karşılayacak şekilde kolayca ölçeklendirebilirsiniz. Program aracılığıyla ve dinamik olarak ölçeklendirmeye Cassandra API seçenekleri anlamak için esnek ölçekle ilgili makalemizi okuyun.

Not

Yukarıdaki kılavuzda, verilerin makul bir şekilde tekdüz dağıtımı varsayma. Verilerde önemli bir çarpıklık varsa (başka bir ifadeyle aynı bölüm anahtarı değerine yönelik çok sayıda okuma/yazma varsa), tabloda çok sayıda istek birimi sağlandı olsa bile performans sorunlarıyla karşı karşınıza çıktı. İstek birimleri fiziksel bölümler arasında eşit olarak bölündü ve yoğun veri çarpıklıkları tek bir bölüme yapılan isteklerde performans sorununa neden olabilir.

Spark bağlayıcısı aktarım hızı yapılandırma parametreleri

Aşağıdaki tabloda, bağlayıcı tarafından Cosmos db Cassandra API aktarım hızına özgü yapılandırma parametreleri listelanmıştır. Tüm yapılandırma parametrelerinin ayrıntılı listesi için Spark Cassandra Bağlayıcısı veri deposunun yapılandırma başvurusu GitHub bakın.

Özellik Adı Varsayılan değer Açıklama
spark.cassandra.output.batch.size.rows 1 Tek bir toplu iş başına satır sayısı. Bu parametreyi 1 olarak ayarlayın. Bu parametre, ağır iş yükleri için daha yüksek aktarım hızı elde etmek için kullanılır.
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) Hiçbiri Yürütücü başına düğüm başına en fazla bağlantı sayısı. 10*n, n düğümlü cassandra kümesinde düğüm başına 10 bağlantıya eşdeğerdir. Bu nedenle, 5 düğümlü cassandra kümesi için yürütücü başına düğüm başına 5 bağlantı gerekirse, bu yapılandırmayı 25 olarak ayarlayabilirsiniz. Bu değeri paralellik derecesine veya Spark işlerinin yapılandırılan yürütücü sayısına göre değiştirebilirsiniz.
spark.cassandra.output.concurrent.writes 100 Yürütücü başına meydana gelen paralel yazma sayısını tanımlar. "batch.size.rows" değerini 1 olarak ayar satırınız olduğundan, bu değerin ölçeğini uygun şekilde ölçeklendirin. Bu değeri, paralellik derecesine veya iş yükünüz için elde etmek istediğiniz aktarım hızına göre değiştirebilirsiniz.
spark.cassandra.concurrent.reads 512 Yürütücü başına meydana gelen paralel okuma sayısını tanımlar. Bu değeri, paralellik derecesine veya iş yükünüz için elde etmek istediğiniz aktarım hızına göre değiştirme
spark.cassandra.output.throughput_mb_per_sec Hiçbiri Yürütücü başına toplam yazma aktarım hızını tanımlar. Bu parametre, spark işi aktarım hızınız için üst sınır olarak kullanılabilir ve bu parametreyi kapsayıcınız için sağlanan aktarım Cosmos temel alandır.
spark.cassandra.input.reads_per_sec Hiçbiri Yürütücü başına toplam okuma aktarım hızını tanımlar. Bu parametre, spark işi aktarım hızınız için üst sınır olarak kullanılabilir ve bu parametreyi kapsayıcınız için sağlanan aktarım Cosmos temel alandır.
spark.cassandra.output.batch.grouping.buffer.size 1000 Bir spark görevine göndermeden önce bellekte depolanmış tek bir Spark görevi başına toplu Cassandra API
spark.cassandra.connection.keep_alive_ms 60000 Kullanılmayan bağlantıların kullanılabilir olduğu süreyi tanımlar.

Bu parametrelerin aktarım hızını ve paralellik derecesini, spark işleriniz için beklediğiniz iş yüküne ve Cosmos DB hesabınız için sağlanan işleme hızına göre ayarlayın.

Spark'tan Azure Cosmos DB Cassandra API’sine bağlama

cqlsh

Aşağıdaki komutlar, cqlsh'den Azure CosmosDB Cassandra API bağlanmayı ayrıntılı olarak açıklar. Bu, Spark'ta örneklerin üzerinden geçerek doğrulama için yararlıdır.
Linux/Unix/Mac'ten:

export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl

1. Azure Databricks

Aşağıdaki makalede küme Azure Databricks, Azure Cosmos DB veritabanına bağlanmak için küme yapılandırması Cassandra API DDL işlemleri, DML işlemleri ve daha fazlasını kapsayan çeşitli örnek not defterleri yer alır.
Azure Cosmos DB Cassandra API ile Azure Databricks

2. Azure HDInsight-Spark

aşağıdaki makalede HDinsight-Spark hizmeti, sağlama, Azure Cosmos DB Cassandra API bağlamak için küme yapılandırması ve DDL işlemlerini, DML işlemlerini ve daha fazlasını kapsayan çeşitli örnek not defterleri ele alınmaktadır.
Azure hdınsight 'tan Azure Cosmos DB Cassandra API çalışma-Spark

3. Spark ortamı genel

Yukarıdaki bölümler Azure Spark tabanlı PaaS hizmetlerine özgü olduğundan, bu bölümde tüm genel Spark ortamları ele alınmaktadır. Bağlayıcı bağımlılıkları, içeri aktarmalar ve Spark oturum yapılandırması aşağıda ayrıntılı olarak verilmiştir. "Sonraki adımlar" bölümü, DDL işlemlerine yönelik kod örneklerini, DML işlemlerini ve daha fazlasını içerir.

Bağlayıcı bağımlılıkları:

  1. Spark Için Cassandra bağlayıcısını almak üzere Maven koordinatlarını ekleyin
  2. Cassandra API için Azure Cosmos DB yardımcısı kitaplığı için maven koordinatlarını ekleyin

İşlemlerinin

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

Spark oturum yapılandırması:

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related. You can adjust the values as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Sonraki adımlar

aşağıdaki makalelerde Azure Cosmos DB Cassandra API Spark tümleştirmesi gösterilmektedir.