Skapa/infoga data i Azure Cosmos DB API för Cassandra från Spark

GÄLLER för: API för Cassandra

Den här artikeln beskriver hur du infogar exempeldata i en tabell i Azure Cosmos DB API för Cassandra från Spark.

API för Cassandra konfiguration

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
// if using Spark 2.x
// spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Anteckning

Om du använder Spark 3.0 eller senare behöver du inte installera Cosmos DB hjälp- och anslutningsfabrik. Du bör också använda remoteConnectionsPerExecutor i stället för för Spark connections_per_executor_max 3-anslutningsappen (se ovan). Du ser att anslutningsrelaterade egenskaper har definierats i anteckningsboken ovan. Med syntaxen nedan kan anslutningsegenskaper definieras på det här sättet utan att behöva definieras på klusternivå (Initiering av Spark-kontext).

Dataframe-API

Skapa en dataram med exempeldata

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Anteckning

Funktionen "Skapa om det inte finns" på radnivå stöds inte ännu.

Spara i Azure Cosmos DB API för Cassandra

När du sparar data kan du även ange principinställningar för time to live och konsekvens som du ser i följande exempel:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Anteckning

TTL på kolumnnivå stöds inte ännu.

Verifiera i cqlsh

use books_ks;
select * from books;

RDD-API (Resilient Distributed Database)

Skapa en RDD med exempeldata

//Drop and re-create table to delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Anteckning

Funktionen Skapa om den inte finns stöds inte ännu.

Spara i Azure Cosmos DB API för Cassandra

När du sparar data API för Cassandra kan du även ange principinställningar för time to live och konsekvens som du ser i följande exempel:

import com.datastax.spark.connector.writer._

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

Verifiera i cqlsh

use books_ks;
select * from books;

Nästa steg

När du har infogat data Azure Cosmos DB API för Cassandra tabellen fortsätter du till följande artiklar för att utföra andra åtgärder på de data som lagras i Cosmos DB API för Cassandra: