Ler dados do Azure Cosmos DB para tabelas do Apache Cassandra com o Spark

APLICA-SE A: Cassandra

Este artigo descreve como ler dados armazenados no Azure Cosmos DB para Apache Cassandra a partir do Spark.

API para configuração do Cassandra

Defina a configuração do Spark abaixo no cluster de blocos de notas. É uma atividade única.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Nota

Se estiver a utilizar o Spark 3.x, não precisa de instalar o programa auxiliar e a fábrica de ligação do Azure Cosmos DB. Também deve utilizar remoteConnectionsPerExecutor em vez de connections_per_executor_max para o conector spark 3 (ver acima).

Aviso

Os exemplos do Spark 3 apresentados neste artigo foram testados com a versão 3.2.1 do Spark e o conector do Cassandra Spark correspondente com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. As versões posteriores do Spark e/ou do conector cassandra podem não funcionar conforme esperado.

API DataFrame

Ler tabela com o comando session.read.format

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

Ler tabela com spark.read.cassandraFormat

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

Ler colunas específicas na tabela

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Aplicar filtros

Pode emitir predicados para baixo para a base de dados para permitir consultas do Spark otimizadas melhor. Um predicado é uma condição numa consulta que devolve verdadeiro ou falso, normalmente localizado na cláusula WHERE. Um push down predicado filtra os dados na consulta da base de dados, reduzindo o número de entradas obtidas da base de dados e melhorando o desempenho das consultas. Por predefinição, a API do Conjunto de Dados do Spark irá emitir automaticamente cláusulas WHERE válidas para a base de dados.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

A Cassandra Filters secção do plano físico inclui o filtro pushed down.

partições

RDD API

Ler tabela

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

Ler colunas específicas na tabela

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

Vistas SQL

Criar uma vista temporária a partir de um dataframe

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

Executar consultas na vista

select * from books_vw where book_pub_year > 1891

Passos seguintes

Seguem-se artigos adicionais sobre como trabalhar com o Azure Cosmos DB para Apache Cassandra a partir do Spark: