Azure Cosmos DB'deki DDL işlemleri Spark'tan Cassandra API
ŞUNLAR IÇIN GEÇERLIDIR:
Cassandra API
Bu makalede Spark'tan Cassandra API Azure Cosmos DB'ye yönelik anahtar alanı ve tablo DDL işlemleri ayrıntılı olarak açıklanmaktadır.
Spark bağlamı
Cassandra API bağlayıcısı, Spark bağlamının bir parçası olarak Cassandra bağlantı ayrıntılarının başlatılmasını gerektirir. Bir not defterini başlattığınızda Spark bağlamı zaten başlatılır ve durdurmanız ve yeniden başlatmanız önerilmez. Çözümlerden biri, Cassandra API örneği yapılandırmasını küme düzeyinde, küme spark yapılandırmasına eklemektir. Küme başına bir kerelik etkinliktir. Spark yapılandırmasına aşağıdaki kodu boşlukla ayrılmış anahtar değer çifti olarak ekleyin:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Cassandra API ile ilgili yapılandırma
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Not
Spark 3.x kullanıyorsanız Cosmos DB yardımcısını ve bağlantı fabrikasını yüklemeniz gerekmez. Spark 3 bağlayıcısı connections_per_executor_max için yerine de kullanmanız remoteConnectionsPerExecutor gerekir (yukarıya bakın).
Uyarı
Bu makalede gösterilen Spark 3 örnekleri Spark sürüm 3.2.1 ve ilgili Cassandra Spark Bağlayıcısı com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1 ile test edilmiştir. Spark ve/veya Cassandra bağlayıcısının sonraki sürümleri beklendiği gibi çalışmayabilir.
Keyspace DDL işlemleri
Anahtar alanı oluşturma
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
cqlsh'de doğrulama
cqlsh'de aşağıdaki komutu çalıştırın; daha önce oluşturduğunuz anahtar alanı görmeniz gerekir.
DESCRIBE keyspaces;
Anahtar alanı bırakma
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
cqlsh'de doğrulama
DESCRIBE keyspaces;
Tablo DDL işlemleri
Husus -lar:
- Aktarım hızı, create table deyimi kullanılarak tablo düzeyinde atanabilir.
- Bir bölüm anahtarı 20 GB veri depolayabilir.
- Bir kayıt en fazla 2 MB veri depolayabilir.
- Bir bölüm anahtarı aralığı birden çok bölüm anahtarını depolayabilir.
Bir tablo oluşturma
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
cqlsh'de doğrulama
cqlsh'de aşağıdaki komutu çalıştırın ve "books:
USE books_ks;
DESCRIBE books;
Sağlanan aktarım hızı ve varsayılan TTL değerleri önceki komutun çıkışında gösterilmez, bu değerleri portaldan alabilirsiniz.
Tabloyu değiştirme
Alter table komutunu kullanarak aşağıdaki değerleri değiştirebilirsiniz:
- sağlanan aktarım hızı
- yaşam süresi değeri
Sütun değişiklikleri şu anda desteklenmiyor.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Tabloyu bırakma
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
cqlsh'de doğrulama
cqlsh'de aşağıdaki komutu çalıştırın ve "books" tablosunun artık kullanılamadığını görmeniz gerekir:
USE books_ks;
DESCRIBE tables;
Sonraki adımlar
Anahtar alanını ve tabloyu oluşturduktan sonra CRUD işlemleri ve daha fazlası için aşağıdaki makalelere geçin: