Quickstart: Manage data with Azure Cosmos DB Spark 3 OLTP Connector for SQL API


Quick Start Guide for Cosmos DB Spark Connector

This tutorial is a quick start guide to show how to use Cosmos DB Spark Connector to read from or write to Cosmos DB. Cosmos DB Spark Connector is based on Spark 3.1.x.

Throughout this quick tutorial we rely on Azure Databricks Runtime 8.0 with Spark 3.1.1 and a Jupyter Notebook to show how to use the Cosmos DB Spark Connector.

You can use any other Spark 3.1.1 spark offering as well, also you should be able to use any language supported by Spark (PySpark, Scala, Java, etc), or any Spark interface you are familiar with (Jupyter Notebook, Livy, etc).


SLF4J is only needed if you plan to use logging, please also download an SLF4J binding which will link the SLF4J API with the logging implementation of your choice. See the SLF4J user manual for more information.

Install Cosmos DB Spark Connector, in your spark Cluster azure-cosmos-spark_3-1_2-12-4.3.1.jar

The getting started guide is based on PySpark however you can use the equivalent scala version as well, and you can run the following code snippet in an Azure Databricks PySpark notebook.

Create databases and containers

First, set Cosmos DB account credentials, and the Cosmos DB Database name and container name.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,

Next, you can use the new Catalog API to create a Cosmos DB Database and Container through Spark.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

When creating containers with the Catalog API you can set the throughput and partition key path for the container to be created.

For more details, see the full Catalog API documentation.

Ingesting data

The name of the data source is cosmos.oltp, and the following example shows how you can write a memory dataframe consisting of two items to Cosmos DB:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \

Note that id is a mandatory field for Cosmos DB.

For more details related to ingesting data, see the full write configuration documentation.

Querying data

Using the same cosmos.oltp data source, we can query data and use filter to push down filters:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\

df.filter(col("isAlive") == True)\

For more details related to querying data, see the full query configuration documentation.

Schema inference

When querying data, the Spark Connector can infer the schema based on sampling existing items by setting spark.cosmos.read.inferSchema.enabled to true.

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\

Alternatively, you can pass the custom schema you want to be used to read the data:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())

df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\

If no custom schema is specified and schema inference is disabled, then the resulting data will be returning the raw Json content of the items:

df = spark.read.format("cosmos.oltp").options(**cfg)\

For more details related to schema inference, see the full schema inference configuration documentation.

Configuration Reference:

Generic Configuration

Config Property Name Default Description
spark.cosmos.accountEndpoint None Cosmos DB Account Endpoint Uri
spark.cosmos.accountKey None Cosmos DB Account Key
spark.cosmos.database None Cosmos DB database name
spark.cosmos.container None Cosmos DB container name

Additional Tuning

Config Property Name Default Description
spark.cosmos.useGatewayMode false Use gateway mode for the client operations
spark.cosmos.read.forceEventualConsistency true Makes the client use Eventual consistency for read operations instead of using the default account level consistency
spark.cosmos.applicationName None Application name
spark.cosmos.preferredRegionsList None Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., [East US, West US] or East US, West US) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions here. Please note that you can also use spark.cosmos.preferredRegions as alias
spark.cosmos.diagnostics None Can be used to enable more verbose diagnostics. Currently the only supported option is to set this property to simple - which will result in additional logs being emitted as INFO logs in the Driver and Executor logs.

Write Config

Config Property Name Default Description
spark.cosmos.write.strategy ItemOverwrite Cosmos DB Item write Strategy: ItemOverwrite (using upsert), ItemAppend (using create, ignore pre-existing items i.e., Conflicts), ItemDelete (delete all documents), ItemDeleteIfNotModified (delete all documents for which the etag hasn't changed)
spark.cosmos.write.maxRetryCount 10 Cosmos DB Write Max Retry Attempts on retriable failures (e.g., connection error, moderakh add more details)
spark.cosmos.write.point.maxConcurrency None Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size
spark.cosmos.write.bulk.maxPendingOperations None Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of 1000 x Cores
spark.cosmos.write.bulk.enabled true Cosmos DB Item Write bulk enabled

Query Config

Config Property Name Default Description
spark.cosmos.read.customQuery None When provided the custom query will be processed against the Cosmos endpoint instead of dynamically generating the query via predicate push down. Usually it is recommended to rely on Spark's predicate push down because that will allow to generate the most efficient set of filters based on the query plan. But there are a couple of predicates like aggregates (count, group by, avg, sum etc.) that cannot be pushed down yet (at least in Spark 3.1) - so the custom query is a fallback to allow them to be pushed into the query sent to Cosmos. If specified, with schema inference enabled, the custom query will also be used to infer the schema.
spark.cosmos.read.maxItemCount 1000 Overrides the maximum number of documents that can be returned for a single query- or change feed request. The default value is 1000 - consider increasing this only for average document sizes significantly smaller than 1KB or when projection reduces the number of properties selected in queries significantly (like when only selecting "id" of documents etc.).

Schema Inference Config

When doing read operations, users can specify a custom schema or allow the connector to infer it. Schema inference is enabled by default.

Config Property Name Default Description
spark.cosmos.read.inferSchema.enabled true When schema inference is disabled and user is not providing a schema, raw json will be returned.
spark.cosmos.read.inferSchema.query SELECT * FROM r When schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns.
spark.cosmos.read.inferSchema.samplingSize 1000 Sampling size to use when inferring schema and not using a query.
spark.cosmos.read.inferSchema.includeSystemProperties false When schema inference is enabled, whether the resulting schema will include all Cosmos DB system properties.
spark.cosmos.read.inferSchema.includeTimestamp false When schema inference is enabled, whether the resulting schema will include the document Timestamp (_ts). Not required if spark.cosmos.read.inferSchema.includeSystemProperties is enabled, as it will already include all system properties.
spark.cosmos.read.inferSchema.forceNullableProperties true When schema inference is enabled, whether the resulting schema will make all columns nullable. By default, all columns (except cosmos system properties) will be treated as nullable even if all rows within the sample set have non-null values. When disabled, the inferred columns are treated as nullable or not depending on whether any record in the sample set has null-values within a column.

Change feed (only for Spark-Streaming using cosmos.oltp.changeFeed data source, which is read-only) configuration

Config Property Name Default Description
spark.cosmos.changeFeed.startFrom Beginning ChangeFeed Start from settings (Now, Beginning or a certain point in time (UTC) for example 2020-02-10T14:15:03) - the default value is Beginning. If the write config contains a checkpointLocation and any checkpoints exist, the stream is always continued independent of the spark.cosmos.changeFeed.startFrom settings - you need to change checkpointLocation or delete checkpoints to restart the stream if that is the intention.
spark.cosmos.changeFeed.mode Incremental ChangeFeed mode (Incremental or FullFidelity) - NOTE: FullFidelity is in experimental state right now. It requires that the subscription/account has been enabled for the private preview and there are known breaking changes that will happen for FullFidelity (schema of the returned documents). It is recommended to only use FullFidelity for non-production scenarios at this point.
spark.cosmos.changeFeed.itemCountPerTriggerHint None Approximate maximum number of items read from change feed for each micro-batch/trigger

Json conversion configuration

Config Property Name Default Description
spark.cosmos.read.schemaConversionMode Relaxed The schema conversion behavior (Relaxed, Strict). When reading json documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a null value (Relaxed) or an exception (Strict).

Partitioning Strategy Config

Config Property Name Default Description
spark.cosmos.read.partitioning.strategy Default The partitioning strategy used (Default, Custom, Restrictive or Aggressive)
spark.cosmos.partitioning.targetedCount None The targeted Partition Count. This parameter is optional and ignored unless strategy==Custom is used. In this case the Spark Connector won't dynamically calculate number of partitions but stick with this value.

Throughput Control Config

Config Property Name Default Description
spark.cosmos.throughputControl.enabled false Whether throughput control is enabled
spark.cosmos.throughputControl.name None Throughput control group name
spark.cosmos.throughputControl.targetThroughput None Throughput control group target throughput
spark.cosmos.throughputControl.targetThroughputThreshold None Throughput control group target throughput threshold
spark.cosmos.throughputControl.globalControl.database None Database which will be used for throughput global control
spark.cosmos.throughputControl.globalControl.container None Container which will be used for throughput global control
spark.cosmos.throughputControl.globalControl.renewIntervalInMS 5s How often the client is going to update the throughput usage of itself
spark.cosmos.throughputControl.globalControl.expireIntervalInMS 11s How quickly an offline client will be detected