Apache Spark を使用して Azure Managed Instance for Apache Cassandra に移行する

可能な限り、Apache Cassandra のネイティブ レプリケーションを使用し、ハイブリッド クラスターを構成することで、既存のクラスターから Azure Managed Instance for Apache Cassandra にデータを移行することをお勧めします。 このアプローチでは、Apache Cassandra のゴシップ プロトコルを使用して、ソース データセンターから新しいマネージド インスタンス データセンターにデータをレプリケートします。 ただし、ソース データベースのバージョンに互換性がない場合や、ハイブリッド クラスターのセットアップが実行できない場合があります。

このチュートリアルでは、Cassandra Spark コネクタと Apache Spark 向けの Azure Databricks を使用して、オフラインで Azure Managed Instance for Apache Cassandra に移行するためにデータを移行する方法について説明します。

前提条件

Azure Databricks クラスターのプロビジョニング

Spark 3.0 をサポートする Databricks ランタイム バージョン 7.5 を選択することをお勧めします。

Databricks ランタイムのバージョンを見つける方法を示すスクリーンショット。

依存関係を追加する

Apache Spark Cassandra コネクタ ライブラリをクラスターに追加して、ネイティブと Azure Cosmos DB Cassandra 両方のエンドポイントに接続します。 自分のクラスターで、 [ライブラリ]>[新規インストール]>[Maven] の順に選択し、Maven 座標に com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 を追加します。

Databricks で Maven パッケージを検索する方法を示すスクリーンショット。

[インストール] を選択し、インストールが完了したらクラスターを再起動します。

Note

Cassandra コネクタ ライブラリがインストールされたら、必ず Databricks クラスターを再起動してください。

移行用の Scala ノートブックを作成する

Databricks で Scala ノートブックを作成します。 ソースとターゲットの Cassandra 構成を、対応する資格情報、ソースとターゲットのキースペースおよびテーブルで置き換えます。 次に、下のコードを実行します。

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Note

各行の元の writetime を保持する必要がある場合は、cassandra migrator のサンプルを参照してください。

次のステップ