共用方式為


使用 Apache Spark 移轉至 Azure Managed Instance for Apache Cassandra

可能的話,建議您使用 Apache Cassandra 原生複寫,藉由設定混合式叢集,以將資料從現有的叢集移轉至 Azure Managed Instance for Apache Cassandra。 此方式將會使用 Apache Cassandra 的 Gossip 通訊協定,以將資料從來源資料中心複寫至新的受控執行個體資料中心。 不過,某些情況是您的來源資料庫版本不相容,或混合式叢集設定不可行。

本教學課程說明如何使用 Cassandra Spark 連接器和適用於 Apache Spark 的 Azure Databricks,以離線方式將資料移轉至 Azure Managed Instance for Apache Cassandra。

必要條件

佈建 Azure Databricks 叢集

建議您選取支援 Spark 3.0 的 Databricks Runtime 7.5 版本。

Screenshot that shows finding the Databricks runtime version.

新增相依性

將 Apache Spark Cassandra 連接器程式庫新增至您的叢集,以連線至原生和 Azure Cosmos DB Cassandra 端點。 在您的叢集中,選取 [程式庫] > [安裝新的] > [Maven],然後在 Maven 座標中新增 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0

Screenshot that shows searching for Maven packages in Databricks.

選取 [安裝],然後在安裝完成時重新啟動叢集。

注意

安裝 Cassandra 連接器程式庫之後,請務必重新啟動 Databricks 叢集。

建立 Scala 筆記本以進行移轉

在 Databricks 中建立 Scala 筆記本。 將您的來源和目標 Cassandra 設定取代為對應的認證以及來源和目標 keyspace 和資料表。 然後執行下列程式碼:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

注意

如果您需要保留每個資料列的原始 writetime,則請參閱 cassandra migrator 範例。

下一步