Läsa data från Azure Cosmos DB API för Cassandra med Spark

GÄLLER för: API för Cassandra

I den här artikeln beskrivs hur du läser data som lagras Azure Cosmos DB API för Cassandra från Spark.

API för Cassandra konfiguration

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
// if using Spark 2.x
// spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
//spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") // Spark 2.x
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10") // Spark 3.x
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Anteckning

Om du använder Spark 3.0 eller senare behöver du inte installera Cosmos DB hjälp- och anslutningsfabrik. Du bör också använda remoteConnectionsPerExecutor i stället för för Spark connections_per_executor_max 3-anslutningsappen (se ovan). Anslutningsrelaterade egenskaper har definierats i anteckningsboken ovan. Med syntaxen nedan kan anslutningsegenskaper definieras på det här sättet utan att behöva definieras på klusternivå (Initiering av Spark-kontext).

Dataframe-API

Läs tabell med kommandot session.read.format

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

Läsa tabell med spark.read.cassandraFormat

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

Läsa specifika kolumner i tabellen

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Använda filter

Du kan skicka ned predikat till databasen för att möjliggöra bättre optimerade Spark-frågor. Ett predikat är ett villkor för en fråga som returnerar sant eller falskt, som vanligtvis finns i WHERE-satsen. En predikat-push nedåt filtrerar data i databasfrågan, vilket minskar antalet poster som hämtas från databasen och förbättrar frågeprestanda. Som standard pushar Spark Dataset-API:et automatiskt ned giltiga WHERE-satser till databasen.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Avsnittet Cassandra Filters i den fysiska planen innehåller filtret som push-skickas ned.

partitioner

RDD-API

Läs tabell

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

Läsa specifika kolumner i tabellen

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

SQL-vyer

Skapa en tillfällig vy från en dataram

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

Köra frågor mot vyn

select * from books_vw where book_pub_year > 1891

Nästa steg

Följande är ytterligare artiklar om att arbeta med Azure Cosmos DB API för Cassandra från Spark: