Apache Spark kullanarak Apache Cassandra için Azure yönetilen örneği 'ne geçiş

Mümkün olduğunda, karma bir kümeyapılandırarak, mevcut kümenizdeki verileri Apache Cassandra Için Azure yönetilen örneğine geçirmek üzere Apache Cassandra Native çoğaltmasını kullanmanızı öneririz. Bu yaklaşım, verileri kaynak veri merkezinizden yeni yönetilen örnek veri merkezinize çoğaltmak için Apache Cassandra 'nın GOSSIP protokolünü kullanır. Ancak, kaynak veritabanı sürümünüzün uyumlu olmadığı bazı senaryolar olabilir veya karma küme kurulumu başka bir şekilde uygulanamaz.

Bu öğreticide, Cassandra Spark bağlayıcısını kullanarak çevrimdışı bir biçimde Apache Cassandra için Azure yönetilen örneği 'ne geçiş yapmak üzere verilerin nasıl geçirileceği ve Apache Spark için Azure Databricks açıklanmaktadır.

Önkoşullar

Azure Databricks kümesi sağlama

Spark 3,0 ' yi destekleyen Databricks çalışma zamanı sürüm 7,5 ' i seçmenizi öneririz.

Databricks çalışma zamanı sürümünü bulmayı gösteren ekran görüntüsü.

Bağımlılık Ekle

hem yerel hem de Azure Cosmos DB cassandra uç noktalarına bağlanmak için Apache Spark cassandra bağlayıcı kitaplığını kümenize ekleyin. Kümenizde, Kitaplıklar > Yeni > Maven'i yükleyip Maven koordinatları ' nı seçin com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 .

Databricks içinde Maven paketleri aramasını gösteren ekran görüntüsü.

Yükleme tamamlandığında , yükleme ' yi seçin ve ardından kümeyi yeniden başlatın.

Not

Cassandra bağlayıcı kitaplığı yüklendikten sonra Databricks kümesini yeniden başlattığınızdan emin olun.

Geçiş için Scala Not defteri oluşturma

Databricks içinde bir Scala Not defteri oluşturun. Kaynak ve hedef Cassandra yapılandırmalarınızı ilgili kimlik bilgileriyle ve kaynak ve hedef anahtar alanları ve tablolar ile değiştirin. Ardından aşağıdaki kodu çalıştırın:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

Not

Her bir satırın korunması veya geri tarihi varsa writetime , Canlı geçiş makalesine başvurun.

Sonraki adımlar