الترحيل إلى مثيل Azure Managed Instance المدار لـ Apache Cassandra باستخدام Apache Spark

حيثما أمكن، نوصي باستخدام النسخ المتماثل الأصلي لـ Apache Cassandra لترحيل البيانات من المجموعة الحالية إلى Azure Managed Instance لـ Apache Cassandra عن طريق تكوين ⁧⁩مجموعة مختلطة⁧⁩. سيستخدم هذا النهج بروتوكول التناقل الخاص بـ Apache Cassandra لنسخ البيانات من مركز بيانات المصدر الخاص بك إلى مركز بيانات المثيل الجديد المُدار. ومع ذلك، قد تكون هناك بعض السيناريوهات حيث لا يكون إصدار قاعدة البيانات المصدر متوافقًا، أو يكون إعداد المجموعة المختلط غير ممكن.

يصف هذا البرنامج التعليمي كيفية ترحيل البيانات إلى مثيل Azure Managed Instance المُدار لـ Apache Cassandra بطريقة غير متصلة بالإنترنت باستخدام موصل Cassandra Spark، وAzure Databricks لـ Apache Spark.

المتطلبات الأساسية

توفير مجموعة Azure Databricks

نوصي بتحديد Databricks runtime الإصدار 7.5، والذي يدعم Spark 3.0.

لقطة شاشة توضح العثور على نسخة وقت تشغيل Databricks.

إضافة التبعيات

أضف مكتبة Apache Spark Cassandra Connector إلى مجموعتك للاتصال بنقاط نهاية Azure Cosmos DB Cassandra الأصلية. في نظام المجموعة، حدد Libraries>Install New>Maven ثم أضف com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 إحداثيات Maven.

توضح لقطة شاشة البحث عن حزم Maven في Databricks.

حدد Install، ثم أعد تشغيل نظام المجموعة عند اكتمال التثبيت.

ملاحظة

تأكد من إعادة تشغيل كتلة Databricks بعد تثبيت مكتبة Cassandra Connector.

إنشاء Scala Notebook للترحيل

قم بإنشاء Scala Notebook في Databricks. استبدل المصدر والهدف تكوينات Cassandra بمعلومات تسجيل الدخول المقابلة، ومسافات المفاتيح والجداول المصدر والهدف. ثم قم بتشغيل الكود التالي:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

ملاحظة

إذا كانت لديك حاجة للحفاظ على الأصلي writetime لكل صف، فراجع عينة مرحل cassandra .

الخطوات التالية