Operações de agregação no Azure Cosmos DB para tabelas do Apache Cassandra a partir do Spark

APLICA-SE A: Cassandra

Este artigo descreve as operações de agregação básicas em tabelas do Azure Cosmos DB para Apache Cassandra do Spark.

Nota

A filtragem do lado do servidor e a agregação do lado do servidor não são atualmente suportadas no Azure Cosmos DB para Apache Cassandra.

API para configuração do Cassandra

Defina abaixo a configuração do Spark no seu cluster de blocos de notas. É uma atividade única.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Nota

Se estiver a utilizar o Spark 3.x, não precisa de instalar o programa auxiliar e a fábrica de ligações do Azure Cosmos DB. Também deve utilizar remoteConnectionsPerExecutor em vez de para o conector do connections_per_executor_max Spark 3 (ver acima).

Aviso

Os exemplos do Spark 3 apresentados neste artigo foram testados com a versão 3.2.1 do Spark e o Conector do Apache Spark para Cassandra correspondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. As versões posteriores do Spark e/ou do conector do Cassandra podem não funcionar conforme esperado.

Gerador de dados de exemplo

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Operação de contagem

RDD API

sc.cassandraTable("books_ks", "books").count

Saída:

count: Long = 5

API DataFrame

A contagem de dataframes não é atualmente suportada. O exemplo abaixo mostra como executar uma contagem de dataframe depois de manter o dataframe na memória como solução.

Escolha uma opção de armazenamento nas seguintes opções disponíveis para evitar problemas de "memória insuficiente":

  • MEMORY_ONLY: é a opção de armazenamento predefinida. Armazena o RDD como objetos Java serializados na JVM. Se o RDD não caber na memória, algumas partições não serão colocadas em cache e serão recomputadas de imediato sempre que forem necessárias.

  • MEMORY_AND_DISK: armazena o RDD como objetos Java serializados na JVM. Se o RDD não caber na memória, armazene as partições que não cabem no disco e, sempre que necessário, leia-as a partir da localização onde estão armazenadas.

  • MEMORY_ONLY_SER (Java/Scala): armazena RDD como objetos Java serializados– matriz de 1 byte por partição. Esta opção é eficiente em termos de espaço quando comparada com objetos serializados sem serialização, especialmente quando se utiliza um serializador rápido, mas que consome mais CPU para ler.

  • MEMORY_AND_DISK_SER (Java/Scala): esta opção de armazenamento é como MEMORY_ONLY_SER, a única diferença é que transposição de partições que não cabem na memória do disco em vez de as recomputar quando são necessárias.

  • DISK_ONLY: armazena apenas as partições RDD no disco.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Igual aos níveis acima, mas replica cada partição em dois nós de cluster.

  • OFF_HEAP (experimental): semelhante ao MEMORY_ONLY_SER, mas armazena os dados na memória fora da área dinâmica para dados e requer que a memória fora da área dinâmica para dados seja ativada antecipadamente.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache 
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Operação média

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Saída:

res24: Double = 16.016000175476073

API DataFrame

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Saída:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

Saída:

16.016000175476073

Operação mínima

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Saída:

res31: Float = 11.33

API DataFrame

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Saída:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

%sql
select avg(book_price) from books_vw;

Saída:

11.33

Operação máxima

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

API DataFrame

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Saída:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

%sql
select max(book_price) from books_vw;

Saída:

22.45

Operação soma

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Saída:

res46: Double = 80.08000087738037

API DataFrame

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Saída:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

Saída:

80.08000087738037

Operação superior ou comparável

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Saída:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

API DataFrame

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Saída:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Passos seguintes

Para efetuar operações de cópia de tabelas, veja: