Azure Cosmos DB'deki DDL işlemleri Spark'tan Cassandra API

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra API

Bu makalede Spark'tan Cassandra API Azure Cosmos DB'ye yönelik anahtar alanı ve tablo DDL işlemleri ayrıntılı olarak açıklanmaktadır.

Spark bağlamı

Cassandra API bağlayıcısı, Spark bağlamının bir parçası olarak Cassandra bağlantı ayrıntılarının başlatılmasını gerektirir. Bir not defterini başlattığınızda Spark bağlamı zaten başlatılır ve durdurmanız ve yeniden başlatmanız önerilmez. Çözümlerden biri, Cassandra API örneği yapılandırmasını küme düzeyinde, küme spark yapılandırmasına eklemektir. Küme başına bir kerelik etkinliktir. Spark yapılandırmasına aşağıdaki kodu boşlukla ayrılmış anahtar değer çifti olarak ekleyin:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Not

Spark 3.x kullanıyorsanız Cosmos DB yardımcısını ve bağlantı fabrikasını yüklemeniz gerekmez. Spark 3 bağlayıcısı connections_per_executor_max için yerine de kullanmanız remoteConnectionsPerExecutor gerekir (yukarıya bakın).

Uyarı

Bu makalede gösterilen Spark 3 örnekleri Spark sürüm 3.2.1 ve ilgili Cassandra Spark Bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 ile test edilmiştir. Spark ve/veya Cassandra bağlayıcısının sonraki sürümleri beklendiği gibi çalışmayabilir.

Keyspace DDL işlemleri

Anahtar alanı oluşturma

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

cqlsh'de doğrulama

cqlsh'de aşağıdaki komutu çalıştırın; daha önce oluşturduğunuz anahtar alanı görmeniz gerekir.

DESCRIBE keyspaces;

Anahtar alanı bırakma

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

cqlsh'de doğrulama

DESCRIBE keyspaces;

Tablo DDL işlemleri

Husus -lar:

  • Aktarım hızı, create table deyimi kullanılarak tablo düzeyinde atanabilir.
  • Bir bölüm anahtarı 20 GB veri depolayabilir.
  • Bir kayıt en fazla 2 MB veri depolayabilir.
  • Bir bölüm anahtarı aralığı birden çok bölüm anahtarını depolayabilir.

Bir tablo oluşturma

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

cqlsh'de doğrulama

cqlsh'de aşağıdaki komutu çalıştırın ve "books:

USE books_ks;
DESCRIBE books;

Sağlanan aktarım hızı ve varsayılan TTL değerleri önceki komutun çıkışında gösterilmez, bu değerleri portaldan alabilirsiniz.

Tabloyu değiştirme

Alter table komutunu kullanarak aşağıdaki değerleri değiştirebilirsiniz:

  • sağlanan aktarım hızı
  • yaşam süresi değeri
    Sütun değişiklikleri şu anda desteklenmiyor.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Tabloyu bırakma

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cqlsh'de doğrulama

cqlsh'de aşağıdaki komutu çalıştırın ve "books" tablosunun artık kullanılamadığını görmeniz gerekir:

USE books_ks;
DESCRIBE tables;

Sonraki adımlar

Anahtar alanını ve tabloyu oluşturduktan sonra CRUD işlemleri ve daha fazlası için aşağıdaki makalelere geçin: