Accelerate big data analytics by using the Apache Spark to Azure Cosmos DB connector

You can run Spark jobs with data stored in Azure Cosmos DB using the Cosmos DB Spark connector. Cosmos can be used for batch and stream processing, and as a serving layer for low latency access.

You can use the connector with Azure Databricks or Azure HDInsight, which provide managed Spark clusters on Azure. The following table shows supported Spark versions.

Component Version
Apache Spark 2.4.x, 2.3.x, 2.2.x, and 2.1.x
Scala 2.11
Azure Databricks runtime version > 3.4

Warning

This connector supports the core (SQL) API of Azure Cosmos DB. For Cosmos DB for MongoDB API, use the MongoDB Spark connector. For Cosmos DB Cassandra API, use the Cassandra Spark connector.

Quickstart

Reading from Cosmos DB

The following snippet shows how to create a Spark DataFrame to read from Cosmos DB in PySpark.

# Read Configuration
readConfig = {
  "Endpoint" : "https://doctorwho.documents.azure.com:443/",
  "Masterkey" : "YOUR-KEY-HERE",
  "Database" : "DepartureDelays",
  "Collection" : "flights_pcoll",
  "query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()
flights.count()

And the same code snippet in Scala:

// Import Necessary Libraries
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Configure connection to your collection
val readConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_pcoll",
  "query_custom" -> "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'" // Optional
))

// Connect via azure-cosmosdb-spark to create Spark DataFrame
val flights = spark.read.cosmosDB(readConfig)
flights.count()

Writing to Cosmos DB

The following snippet shows how to write a data frame to Cosmos DB in PySpark.

# Write configuration
writeConfig = {
 "Endpoint" : "https://doctorwho.documents.azure.com:443/",
 "Masterkey" : "YOUR-KEY-HERE",
 "Database" : "DepartureDelays",
 "Collection" : "flights_fromsea",
 "Upsert" : "true"
}

# Write to Cosmos DB from the flights DataFrame
flights.write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()

And the same code snippet in Scala:

// Configure connection to the sink collection
val writeConfig = Config(Map(
  "Endpoint" -> "https://doctorwho.documents.azure.com:443/",
  "Masterkey" -> "YOUR-KEY-HERE",
  "Database" -> "DepartureDelays",
  "Collection" -> "flights_fromsea",
  "Upsert" : "true"
))

// Upsert the dataframe to Cosmos DB
import org.apache.spark.sql.SaveMode
flights.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

More more snippets and end to end samples, see Jupyter.

Working with the connector

You can build the connector from source in Github, or download the uber jars from Maven in the links below.

Spark Scala Latest version
2.4.0 2.11 azure-cosmosdb-spark_2.4.0_2.11_1.3.5
2.3.0 2.11 azure-cosmosdb-spark_2.3.0_2.11_1.3.3
2.2.0 2.11 azure-cosmosdb-spark_2.2.0_2.11_1.1.1
2.1.0 2.11 azure-cosmosdb-spark_2.1.0_2.11_1.2.2

Using Databricks notebooks

Create a library using your Databricks workspace by following the guidance in the Azure Databricks Guide > Use the Azure Cosmos DB Spark connector

Note

Note, the Use the Azure Cosmos DB Spark Connector page is currently not up-to-date. Instead of downloading the six separate jars into six different libraries, you can download the uber jar from maven at https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.4.0_2.11/1.3.5/jar) and install this one jar/library.

Using spark-cli

To work with the connector using the spark-cli (that is, spark-shell, pyspark, spark-submit), you can use the --packages parameter with the connector's maven coordinates.

spark-shell --master yarn --packages "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5"

Using Jupyter notebooks

If you're using Jupyter notebooks within HDInsight, you can use spark-magic %%configure cell to specify the connector's maven coordinates.

{ "name":"Spark-to-Cosmos_DB_Connector",
  "conf": {
    "spark.jars.packages": "com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5",
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
   ...
}

Note, the inclusion of the spark.jars.excludes is specific to remove potential conflicts between the connector, Apache Spark, and Livy.

Build the connector

Currently, this connector project uses maven so to build without dependencies, you can run:

mvn clean package

Working with our samples

The Cosmos DB Spark GitHub repository has the following sample notebooks and scripts that you can try.

More Information

We have more information in the azure-cosmosdb-spark wiki including:

Configuration and Setup

Troubleshooting

Performance

Change Feed

Monitoring

Next steps

If you haven't already, download the Spark to Azure Cosmos DB connector from the azure-cosmosdb-spark GitHub repository. Explore the following additional resources in the repo:

You might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide, and the Apache Spark on Azure HDInsight article.