Criar/inserir dados na Azure Cosmos DB Cassandra API da Spark

APLICA-SE A: Cassandra API

Este artigo descreve como inserir dados de amostra numa tabela em Azure Cosmos DB Cassandra API da Spark.

Configuração da API de Cassandra

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
// if using Spark 2.x
// spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Nota

Se estiver a utilizar o Spark 3.0 ou superior, não precisa de instalar o ajudante e a fábrica de ligação Cosmos DB. Também deve utilizar remoteConnectionsPerExecutor em vez de para o connections_per_executor_max conector Spark 3 (ver acima). Verá que as propriedades relacionadas com a ligação estão definidas dentro do caderno acima. Utilizando a sintaxe abaixo, as propriedades de ligação podem ser definidas desta forma sem necessidade de ser definidas ao nível do cluster (inicialização do contexto de faísca).

Dataframe API

Criar um Dataframe com dados de amostra

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Nota

A funcionalidade "Criar se não existir", a nível de linha, ainda não está suportada.

Persistir para Azure Cosmos DB Cassandra API

Ao guardar dados, também pode definir as definições de política de tempo para viver e consistência, como mostrado no exemplo seguinte:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Nota

O TTL de nível de coluna ainda não está suportado.

Validar em cqlsh

use books_ks;
select * from books;

API de Base de Dados Distribuída Resiliente (RDD)

Criar um RDD com dados de amostra

//Drop and re-create table to delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Nota

Criar se não existir funcionalidade ainda não está suportado.

Persistir para Azure Cosmos DB Cassandra API

Ao guardar dados para a API de Cassandra, também pode definir as definições de política de tempo-a-viver e de consistência, como mostrado no exemplo seguinte:

import com.datastax.spark.connector.writer._

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

Validar em cqlsh

use books_ks;
select * from books;

Passos seguintes

Depois de inserir dados na tabela API API da Azure Cosmos DB Cassandra, dirija-se aos seguintes artigos para efetuar outras operações sobre os dados armazenados na API de Cosmos DB Cassandra: