Migrace do spravované instance Azure pro Apache Cassandra pomocí Apache Spark

Pokud je to možné, doporučujeme použít nativní replikaci Apache Cassandra k migraci dat ze stávajícího clusteru do spravované instance Azure pro Apache Cassandra konfigurací hybridního clusteru. Tento přístup bude používat protokol Gossip Apache Cassandra k replikaci dat ze zdrojových datových center do vašeho nového Datacenter spravované instance. Mohou však nastat situace, kdy verze zdrojové databáze není kompatibilní nebo pokud není možné nainstalovat hybridní cluster.

V tomto kurzu se dozvíte, jak migrovat data pro migraci na spravovanou instanci Azure pro Apache Cassandra v režimu offline pomocí konektoru Cassandra Spark a Azure Databricks pro Apache Spark.

Požadavky

Zřízení clusteru Azure Databricks

Doporučujeme vybrat modul runtime datacihly verze 7,5, který podporuje Spark 3,0.

Snímek obrazovky s informacemi o tom, jak najít verzi modulu runtime datacihly

Přidat závislosti

přidejte do clusteru knihovnu konektoru Apache Spark Cassandra, abyste se mohli připojit k nativním i Azure Cosmos DB koncovým bodům Cassandra. V clusteru vyberte knihovny > nainstalovat nový > Maven a pak přidejte com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 do souřadnic Maven.

Snímek obrazovky, který ukazuje hledání balíčků Maven v datacihlách.

Po dokončení instalace vyberte nainstalovat a pak cluster restartujte.

Poznámka

Po instalaci knihovny konektoru Cassandra se ujistěte, že restartujete cluster datacihly.

Vytvoření poznámkového bloku Scala pro migraci

V datacihlách vytvořte Scala Poznámkový blok. Své zdrojové a cílové Cassandra konfigurace nahraďte odpovídajícími přihlašovacími údaji a zdrojovými a cílovými oblastmi a tabulkami. Pak spusťte následující kód:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Poznámka

Pokud potřebujete zachovat nebo writetime navýšit datum každého řádku, přečtěte si článek migrace za provozu .

Další kroky