Apache Spark kullanarak Apache Cassandra için Azure yönetilen örneği 'ne geçiş
Mümkün olduğunda, karma bir kümeyapılandırarak, mevcut kümenizdeki verileri Apache Cassandra Için Azure yönetilen örneğine geçirmek üzere Apache Cassandra Native çoğaltmasını kullanmanızı öneririz. Bu yaklaşım, verileri kaynak veri merkezinizden yeni yönetilen örnek veri merkezinize çoğaltmak için Apache Cassandra 'nın GOSSIP protokolünü kullanır. Ancak, kaynak veritabanı sürümünüzün uyumlu olmadığı bazı senaryolar olabilir veya karma küme kurulumu başka bir şekilde uygulanamaz.
Bu öğreticide, Cassandra Spark bağlayıcısını kullanarak çevrimdışı bir biçimde Apache Cassandra için Azure yönetilen örneği 'ne geçiş yapmak üzere verilerin nasıl geçirileceği ve Apache Spark için Azure Databricks açıklanmaktadır.
Önkoşullar
Azure Portal veya Azure CLI kullanarak Apache Cassandra kümesi için Azure yönetilen örneği sağlayın ve csınsh ile kümenize bağlanbileceğinizdenemin olun.
Managed Cassandra VNET 'iniz içinde Azure Databricks bir hesap sağlayın. Ayrıca, kaynak Cassandra kümenize ağ erişimi olduğundan emin olun.
Kaynak Cassandra veritabanından hedef Cassandra yönetilen örnek veritabanınıza anahtar alanı/tablo düzenini zaten geçirdiğinizden emin olun.
Azure Databricks kümesi sağlama
Spark 3,0 ' yi destekleyen Databricks çalışma zamanı sürüm 7,5 ' i seçmenizi öneririz.
Bağımlılık Ekle
hem yerel hem de Azure Cosmos DB cassandra uç noktalarına bağlanmak için Apache Spark cassandra bağlayıcı kitaplığını kümenize ekleyin. Kümenizde, Kitaplıklar > Yeni > Maven'i yükleyip Maven koordinatları ' nı seçin com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 .
Yükleme tamamlandığında , yükleme ' yi seçin ve ardından kümeyi yeniden başlatın.
Not
Cassandra bağlayıcı kitaplığı yüklendikten sonra Databricks kümesini yeniden başlattığınızdan emin olun.
Geçiş için Scala Not defteri oluşturma
Databricks içinde bir Scala Not defteri oluşturun. Kaynak ve hedef Cassandra yapılandırmalarınızı ilgili kimlik bilgileriyle ve kaynak ve hedef anahtar alanları ve tablolar ile değiştirin. Ardından aşağıdaki kodu çalıştırın:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val sourceCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "false",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val targetCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(sourceCassandra)
.load
//Write to target Cassandra
DFfromSourceCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(targetCassandra)
.mode(SaveMode.Append) // only required for Spark 3.x
.save
Not
Her bir satırın korunması veya geri tarihi varsa writetime , Canlı geçiş makalesine başvurun.