Accelerate big data analytics by using the Apache Spark to Azure Cosmos DB connector

You can run Spark jobs with data stored in Azure Cosmos DB using the Cosmos DB Spark connector. Cosmos can be used for batch and stream processing, and as a serving layer for low latency access.

You can use the connector with Azure Databricks or Azure HDInsight, which provide managed Spark clusters on Azure. The following table shows supported Spark versions.

Component Version
Apache Spark 2.4.x, 2.3.x, 2.2.x, and 2.1.x
Scala 2.11
Azure Databricks runtime version > 3.4

Warning

This connector supports the core (SQL) API of Azure Cosmos DB. For Cosmos DB for MongoDB API, use the MongoDB Spark connector. For Cosmos DB Cassandra API, use the Cassandra Spark connector.

Quickstart

Batch reads from Cosmos DB

The following snippet shows how to create a Spark DataFrame to read from Cosmos DB in PySpark.

# Read Configuration
readConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "query_custom": "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format(
    "com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Read Configuration
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "query_custom" -> "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
))

// Connect via azure-cosmosdb-spark to create Spark DataFrame
val flights = spark.read.cosmosDB(readConfig)
flights.count()

Batch writes to Cosmos DB

The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_fromsea",
    "Upsert": "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(
    **writeConfig).save()

And the same code snippet in Scala:

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" : "true"
))

// Write to Cosmos DB from the flights DataFrame
import org.apache.spark.sql.SaveMode
flights.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

Streaming reads from Cosmos DB

The following snippet shows how to connect to and read from Azure Cosmos DB Change Feed.

# Read Configuration
readConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_pcoll",
    "ReadChangeFeed": "true",
    "ChangeFeedQueryName": "Departure-Delays",
    "ChangeFeedStartFromTheBeginning": "false",
    "InferStreamSchema": "true",
    "ChangeFeedCheckpointLocation": "dbfs:/Departure-Delays"
}


# Open a read stream to the Cosmos DB Change Feed via azure-cosmosdb-spark to create Spark DataFrame
changes = (spark
           .readStream
           .format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSourceProvider")
           .options(**readConfig)
           .load())

And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Read Configuration
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "ReadChangeFeed" -> "true",
  "ChangeFeedQueryName" -> "Departure-Delays",
  "ChangeFeedStartFromTheBeginning" -> "false",
  "InferStreamSchema" -> "true",
  "ChangeFeedCheckpointLocation" -> "dbfs:/Departure-Delays"
))

// Open a read stream to the Cosmos DB Change Feed via azure-cosmosdb-spark to create Spark DataFrame
val df = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(readConfig).load()

Streaming writes to Cosmos DB

The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
    "Endpoint": "https://doctorwho.documents.azure.com:443/",
    "Masterkey": "YOUR-KEY-HERE",
    "Database": "DepartureDelays",
    "Collection": "flights_fromsea",
    "Upsert": "true",
    "WritingBatchSize": "500",
    "CheckpointLocation": "/checkpointlocation_write1"
}

# Write to Cosmos DB from the flights DataFrame
changeFeed = (changes
              .writeStream
              .format("com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider")
              .outputMode("append")
              .options(**writeconfig)
              .start())

And the same code snippet in Scala:

// Write configuration

val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" -> "true",
  "WritingBatchSize" -> "500",
  "CheckpointLocation" -> "/checkpointlocation_write1"
))

// Write to Cosmos DB from the flights DataFrame
df
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.options(writeConfig)
.start()

More more snippets and end to end samples, see Jupyter.

Working with the connector

You can build the connector from source in GitHub, or download the uber jars from Maven in the links below.

Spark Scala Latest version
2.4.0 2.11 azure-cosmosdb-spark_2.4.0_2.11_1.4.0
2.3.0 2.11 azure-cosmosdb-spark_2.3.0_2.11_1.3.3
2.2.0 2.11 azure-cosmosdb-spark_2.2.0_2.11_1.1.1
2.1.0 2.11 azure-cosmosdb-spark_2.1.0_2.11_1.2.2

Using Databricks notebooks

Create a library using your Databricks workspace by following the guidance in the Azure Databricks Guide > Use the Azure Cosmos DB Spark connector

Note

Note, the Use the Azure Cosmos DB Spark Connector page is currently not up-to-date. Instead of downloading the six separate jars into six different libraries, you can download the uber jar from maven at https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.4.0_2.11/1.4.0/jar) and install this one jar/library.

Using spark-cli

To work with the connector using the spark-cli (that is, spark-shell, pyspark, spark-submit), you can use the --packages parameter with the connector's maven coordinates.

spark-shell --master yarn --packages "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.0"

Using Jupyter notebooks

If you're using Jupyter notebooks within HDInsight, you can use spark-magic %%configure cell to specify the connector's maven coordinates.

{ "name":"Spark-to-Cosmos_DB_Connector",
  "conf": {
    "spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.4.0",
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
   ...
}

Note, the inclusion of the spark.jars.excludes is specific to remove potential conflicts between the connector, Apache Spark, and Livy.

Build the connector

Currently, this connector project uses maven so to build without dependencies, you can run:

mvn clean package

Working with our samples

The Cosmos DB Spark GitHub repository has the following sample notebooks and scripts that you can try.

More Information

We have more information in the azure-cosmosdb-spark wiki including:

Configuration and Setup

Troubleshooting

Performance

Change Feed

Monitoring

Next steps

If you haven't already, download the Spark to Azure Cosmos DB connector from the azure-cosmosdb-spark GitHub repository. Explore the following additional resources in the repo:

You might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide, and the Apache Spark on Azure HDInsight article.