DDL-műveletek az Azure Cosmos DB-ben Cassandra API a Sparkból
A KÖVETKEZŐKRE VONATKOZIK:
Cassandra API
Ez a cikk a Sparkból származó Azure Cosmos DB-Cassandra API kulcstér- és tábla DDL-műveleteit ismerteti.
Spark-környezet
A Cassandra API összekötőjéhez a Cassandra kapcsolati adatait inicializálni kell a Spark-környezet részeként. Amikor elindít egy jegyzetfüzetet, a Spark-környezet már inicializálva van, és nem ajánlott leállítani és újrainicializálni. Az egyik megoldás a Cassandra API példány konfigurációjának hozzáadása fürtszinten, a fürt Spark-konfigurációjában. Fürtönként egyszeri tevékenység. Adja hozzá a következő kódot a Spark-konfigurációhoz szóközzel elválasztott kulcsértékpárként:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Cassandra API kapcsolódó konfiguráció
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Megjegyzés
Ha Spark 3.x-et használ, nem kell telepítenie a Cosmos DB segítőt és a kapcsolat-előállítót. A Spark 3-összekötő helyett connections_per_executor_max érdemes használnia remoteConnectionsPerExecutor (lásd fent).
Figyelmeztetés
A cikkben bemutatott Spark 3-mintákat a Spark 3.2.1-es verziójával és a megfelelő Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1-es verziójával teszteltük. Előfordulhat, hogy a Spark és/vagy a Cassandra-összekötő későbbi verziói nem a várt módon működnek.
Keyspace DDL-műveletek
Kulcstér létrehozása
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Ellenőrzés a cqlsh-ban
Futtassa a következő parancsot a cqlsh-ban, és meg kell jelennie a korábban létrehozott kulcstérnek.
DESCRIBE keyspaces;
Kulcstér elvetése
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Ellenőrzés a cqlsh-ban
DESCRIBE keyspaces;
Tábla DDL-műveletei
Megfontolások:
- Az átviteli sebesség a tábla szintjén rendelhető hozzá a tábla létrehozása utasítással.
- Egy partíciókulcs 20 GB adat tárolására képes.
- Egy rekord legfeljebb 2 MB adatot tárolhat.
- Egy partíciókulcs-tartomány több partíciókulcsot is tárolhat.
Tábla létrehozása
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Ellenőrzés a cqlsh-ban
Futtassa a következő parancsot a cqlsh-ban, és meg kell jelennie a "books" nevű táblának:
USE books_ks;
DESCRIBE books;
A kiosztott átviteli sebesség és az alapértelmezett TTL-értékek nem jelennek meg az előző parancs kimenetében. Ezeket az értékeket a portálról szerezheti be.
Táblázat módosítása
Az alter table paranccsal módosíthatja a következő értékeket:
- kiosztott átviteli sebesség
- élettartam érték
Az oszlopmódosítások jelenleg nem támogatottak.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Tábla elvetése
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Ellenőrzés a cqlsh-ban
Futtassa a következő parancsot a cqlsh-ban, és látnia kell, hogy a "books" tábla már nem érhető el:
USE books_ks;
DESCRIBE tables;
Következő lépések
A kulcstér és a tábla létrehozása után folytassa a CRUD-műveletekkel kapcsolatos következő cikkekben és egyebekben: