Агрегированные операции в Azure Cosmos DB для таблиц Apache Cassandra из Spark

ПРИМЕНИМО К: Кассандра

В этой статье описываются основные операции агрегирования для таблиц Apache Cassandra в Azure Cosmos DB из Spark.

Примечание

Фильтрация на стороне сервера и агрегирование на стороне сервера в настоящее время не поддерживаются в Azure Cosmos DB для Apache Cassandra.

API для конфигурации Cassandra

Задайте следующую конфигурацию Spark в кластере записных книжек. Это разовое действие.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Примечание

Если вы используете Spark 3.x, вам не нужно устанавливать вспомогательное средство и фабрику подключений Azure Cosmos DB. Также необходимо использовать remoteConnectionsPerExecutor вместо connections_per_executor_max для соединителя Spark 3 (см. выше).

Предупреждение

Примеры для Spark 3 в этой статье протестированы с использованием Spark версии 3.2.1 и соответствующего соединителя Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Более поздние версии Spark и (или) соединителя Cassandra могут работать непредсказуемым образом.

Пример генератора данных

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Операция подсчета

API RRD

sc.cassandraTable("books_ks", "books").count

Выходные данные:

count: Long = 5

API Dataframe

Подсчет кадров данных сейчас не поддерживается. В следующем примере показано, как выполнить подсчет кадров данных после сохранения кадра данных в память в качестве обходного решения.

Выберите вариант хранения из доступных вариантов, чтобы не столкнуться с проблемами нехватки памяти:

  • MEMORY_ONLY: это вариант хранилища по умолчанию. Хранит RDD как десериализованные объекты Java на виртуальной машине Java. Если RDD не помещается в памяти, некоторые секции не будут кэшироваться, и они будут вычисляться повторно в режиме реального времени каждый раз, когда требуется.

  • MEMORY_AND_DISK: хранит RDD как десериализованные объекты Java на виртуальной машине Java. Если RDD не помещается в памяти, секции, которые не помещаются в памяти, будут сохранены на диске, и, когда они потребуются, они будут считаны из расположения, в котором хранятся.

  • MEMORY_ONLY_SER (Java/Scala): хранит RDD как сериализованные объекты Java — в однобайтовом массиве для каждой секции. Этот вариант позволяет сэкономить пространство по сравнению с десериализованными объектами, особенно при использовании быстрого сериализатора, однако для чтения таких объектов более интенсивно используется ЦП.

  • MEMORY_AND_DISK_SER (Java/Scala): этот вариант похож на MEMORY_ONLY_SER, единственное различие состоит в том, что секции, которые не помещаются на диске, сбрасываются, вместо того чтобы вычислять их повторно, когда они необходимы.

  • DISK_ONLY: хранит секции RDD только на диске.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2…: то же, что и для уровней, указанных выше, однако каждая секция реплицируется на два узла кластера.

  • OFF_HEAP (экспериментальная функция): аналогично MEMORY_ONLY_SER, однако данные сохраняются в памяти вне кучи, и память вне кучи требуется предварительно включить.

//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()

//Explain plan
readBooksDF.explain

//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache 
readBooksDF.count

//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Операция усреднения

API RRD

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Выходные данные:

res24: Double = 16.016000175476073

API Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Выходные данные:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

Выходные данные:

16.016000175476073

Операция определения минимума

API RRD

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Выходные данные:

res31: Float = 11.33

API Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Выходные данные:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

%sql
select avg(book_price) from books_vw;

Выходные данные:

11.33

Операция определения максимума

API RRD

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

API Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Выходные данные:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

%sql
select max(book_price) from books_vw;

Выходные данные:

22.45

Операция суммирования

API RRD

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Выходные данные:

res46: Double = 80.08000087738037

API Dataframe

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Выходные данные:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

Выходные данные:

80.08000087738037

Операция определения максимального значения или сравнения

API RRD

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Выходные данные:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

API Dataframe

import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)

//Explain plan
readBooksDF.explain

//Top
readBooksDF.show

Выходные данные:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+

import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Дальнейшие действия

Для выполнения операций копирования таблицы обратитесь к разделу: