Migrera till Azure Managed Instance för Apache Cassandra med Apache Spark

Om möjligt rekommenderar vi att du använder intern Apache Cassandra-replikering för att migrera data från ditt befintliga kluster till Azure Managed Instance för Apache Cassandra genom att konfigurera ett hybridkluster. Den här metoden använder Apache Cassandras skvallerprotokoll för att replikera data från ditt källdatacenter till ditt nya hanterade instansdatacenter. Det kan dock finnas vissa scenarier där din källdatabasversion inte är kompatibel, eller så är det inte möjligt att konfigurera ett hybridkluster.

Den här självstudien beskriver hur du migrerar data till Migrera till Azure Managed Instance för Apache Cassandra offline med cassandra Spark-anslutningsappen och Azure Databricks för Apache Spark.

Förutsättningar

Etablera ett Azure Databricks kluster

Vi rekommenderar att du väljer Databricks Runtime version 7.5, som stöder Spark 3.0.

Skärmbild som visar hur du hittar Databricks-körningsversionen.

Lägga till beroenden

Lägg till Apache Spark Cassandra Connector-biblioteket i klustret för att ansluta till både interna och Azure Cosmos DB Cassandra-slutpunkter. I klustret väljer du Bibliotek Installera > ny > Maven och lägger sedan till com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 i Maven-koordinater.

Skärmbild som visar sökning efter Maven-paket i Databricks.

Välj Installera och starta sedan om klustret när installationen är klar.

Anteckning

Se till att du startar om Databricks-klustret när Cassandra Connector-biblioteket har installerats.

Skapa Scala Notebook för migrering

Skapa en Scala Notebook i Databricks. Ersätt dina Cassandra-käll- och målkonfigurationer med motsvarande autentiseringsuppgifter samt käll- och målnyckelutrymmen och tabeller. Kör sedan följande kod:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Anteckning

Om du behöver bevara eller bakåtdatera för writetime varje rad kan du läsa artikeln om direktmigrering.

Nästa steg