使用 Apache Spark 移轉至 Azure Managed Instance for Apache Cassandra
可能的話,建議您使用 Apache Cassandra 原生複寫,藉由設定混合式叢集,以將資料從現有的叢集移轉至 Azure Managed Instance for Apache Cassandra。 此方式將會使用 Apache Cassandra 的 Gossip 通訊協定,以將資料從來源資料中心複寫至新的受控執行個體資料中心。 不過,某些情況是您的來源資料庫版本不相容,或混合式叢集設定不可行。
本教學課程說明如何使用 Cassandra Spark 連接器和適用於 Apache Spark 的 Azure Databricks,以離線方式將資料移轉至 Azure Managed Instance for Apache Cassandra。
必要條件
使用 Azure 入口網站或 Azure CLI 來佈建 Azure Managed Instance for Apache Cassandra 叢集,並確定您可以使用 CQLSH 來連線至叢集。
在受控 Cassandra VNet 內佈建 Azure Databricks 帳戶。 也請確定其具有來源 Cassandra 叢集的網路存取權。
請確定您已將 keyspace/資料表配置從來源 Cassandra 資料庫移轉至目標 Cassandra 受控執行個體資料庫。
佈建 Azure Databricks 叢集
建議您選取支援 Spark 3.0 的 Databricks Runtime 7.5 版本。
新增相依性
將 Apache Spark Cassandra 連接器程式庫新增至您的叢集,以連線至原生和 Azure Cosmos DB Cassandra 端點。 在您的叢集中,選取 [程式庫] > [安裝新的] > [Maven],然後在 Maven 座標中新增 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
。
選取 [安裝],然後在安裝完成時重新啟動叢集。
注意
安裝 Cassandra 連接器程式庫之後,請務必重新啟動 Databricks 叢集。
建立 Scala 筆記本以進行移轉
在 Databricks 中建立 Scala 筆記本。 將您的來源和目標 Cassandra 設定取代為對應的認證以及來源和目標 keyspace 和資料表。 然後執行下列程式碼:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val sourceCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "false",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val targetCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(sourceCassandra)
.load
//Write to target Cassandra
DFfromSourceCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(targetCassandra)
.mode(SaveMode.Append) // only required for Spark 3.x
.save
注意
如果您需要保留每個資料列的原始 writetime
,則請參閱 cassandra migrator 範例。