Migrate data from Cassandra to Azure Cosmos DB Cassandra API account using Azure Databricks

APPLIES TO: Cassandra API

Cassandra API in Azure Cosmos DB has become a great choice for enterprise workloads running on Apache Cassandra for a variety of reasons such as:

  • No overhead of managing and monitoring: It eliminates the overhead of managing and monitoring a myriad of settings across OS, JVM, and yaml files and their interactions.

  • Significant cost savings: You can save cost with Azure Cosmos DB, which includes the cost of VM’s, bandwidth, and any applicable licenses. Additionally, you don’t have to manage the data centers, servers, SSD storage, networking, and electricity costs.

  • Ability to use existing code and tools: Azure Cosmos DB provides wire protocol level compatibility with existing Cassandra SDKs and tools. This compatibility ensures you can use your existing codebase with Azure Cosmos DB Cassandra API with trivial changes.

There are various ways to migrate database workloads from one platform to another. Azure Databricks is a platform as a service offering for Apache Spark that offers a way to perform offline migrations at large scale. This article describes the steps required to migrate data from native Apache Cassandra keyspaces/tables to Azure Cosmos DB Cassandra API using Azure Databricks.

Prerequisites

Provision an Azure Databricks cluster

You can follow instructions to Provision an Azure Databricks cluster. However, please note Apache Spark 3.x is not currently supported for the Apache Cassandra Connector. You will need to provision a Databricks runtime with a supported v2.x version of Apache Spark. We recommend selecting a version of the Databricks runtime which supports the latest version of Spark 2.x, with no later than Scala version 2.11:

Databricks runtime

Add dependencies

You will need to add the Apache Spark Cassandra connector library to your cluster in order to connect to both native and Cosmos DB Cassandra endpoints. In your cluster select libraries -> install new -> maven -> search packages:

Databricks search packages

Type Cassandra in the search box, and select the latest spark-cassandra-connector maven repository available, then select install:

Databricks select packages

Note

Ensure that you restart the Databricks cluster after the Cassandra Connector library has been installed.

Create Scala Notebook for migration

Create a Scala Notebook in Databricks with the following code. Replace your source and target cassandra configurations with corresponding credentials, and source/target keyspaces and tables, then run:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val nativeCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val cosmosCassandra = Map( 
    "spark.cassandra.connection.host" -> "<USERNAME>.cassandra.cosmos.azure.com",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.connection.connections_per_executor_max" -> "10",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from native Cassandra
val DFfromNativeCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(nativeCassandra)
  .load
  
//Write to CosmosCassandra
DFfromNativeCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(cosmosCassandra)
  .save

Note

The spark.cassandra.output.batch.size.rows, spark.cassandra.output.concurrent.writes and connections_per_executor_max configurations are important to avoid rate limiting, which happens when requests to Azure Cosmos DB exceed provisioned throughput/(request units). You may need to adjust these settings depending on the number of executors in the Spark cluster, and potentially the size (and therefore RU cost) of each record being written to the target tables.

Troubleshooting

Rate limiting (429 error)

You may see an error code of 429 or request rate is large error text, despite reducing the above settings to their minimum values. The following are some such scenarios:

  • Throughput allocated to the table is less than 6000 request units. Even at minimum settings, Spark will be able to execute writes at a rate of around 6000 request units or more. If you have provisioned a table in a keyspace with shared throughput provisioned, it is possible that this table has less than 6000 RUs available at runtime. Ensure the table you are migrating to has at least 6000 RUs available to it when running the migration, and if necessary allocate dedicated request units to that table.

  • Excessive data skew with large data volume. If you have a large amount of data (that is table rows) to migrate into a given table but have a significant skew in the data (i.e. a large number of records being written for the same partition key value), then you may still experience rate-limiting even if you have a large amount of request units provisioned in your table. This is because request units are divided equally among physical partitions, and heavy data skew can result in a bottleneck of requests to a single partition, causing rate limiting. In this scenario, it is advised to reduce to minimal throughput settings in Spark to avoid rate limiting and force the migration to run slowly. This scenario can be more common when migrating reference or control tables, where access is less frequent but skew can be high. However, if a significant skew is present in any other type of table, it may also be advisable to review your data model to avoid hot partition issues for your workload during steady-state operations.

  • Unable to get count on large table. Running select count(*) from table is not currently supported for large tables. You can get the count from metrics in Azure portal (see our troubleshooting article), but if you need to determine the count of a large table from within the context of a Spark job, you can copy the data to a temporary table and then use Spark SQL to get the count, e.g. below (replace <primary key> with some field from the resulting temporary table).

    val ReadFromCosmosCassandra = sqlContext
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(cosmosCassandra)
      .load
    
    ReadFromCosmosCassandra.createOrReplaceTempView("CosmosCassandraResult")
    %sql
    select count(<primary key>) from CosmosCassandraResult
    

Next steps