الترحيل إلى مثيل Azure Managed Instance المدار لـ Apache Cassandra باستخدام Apache Spark
حيثما أمكن، نوصي باستخدام النسخ المتماثل الأصلي لـ Apache Cassandra لترحيل البيانات من المجموعة الحالية إلى Azure Managed Instance لـ Apache Cassandra عن طريق تكوين مجموعة مختلطة. سيستخدم هذا النهج بروتوكول التناقل الخاص بـ Apache Cassandra لنسخ البيانات من مركز بيانات المصدر الخاص بك إلى مركز بيانات المثيل الجديد المُدار. ومع ذلك، قد تكون هناك بعض السيناريوهات حيث لا يكون إصدار قاعدة البيانات المصدر متوافقًا، أو يكون إعداد المجموعة المختلط غير ممكن.
يصف هذا البرنامج التعليمي كيفية ترحيل البيانات إلى مثيل Azure Managed Instance المُدار لـ Apache Cassandra بطريقة غير متصلة بالإنترنت باستخدام موصل Cassandra Spark، وAzure Databricks لـ Apache Spark.
المتطلبات الأساسية
توفير مثيل Azure Managed Instance المُدار لمجموعة Apache Cassandra باستخدام مدخل Microsoft Azure أو Azure CLI، وتأكد من أنه يمكنك الاتصال بالمجموعة باستخدام CQLSH.
قم بتوفير حساب Azure Databricks داخل Cassandra VNet المُدارة. تأكد من أن لديه أيضًا وصول للشبكة إلى مجموعة Cassandra المصدر الخاصة بك.
تأكد من أنك قمت بالفعل بترحيل المساحة الرئيسية/ مخطط الجدول من قاعدة بيانات Cassandra المصدر إلى قاعدة بيانات Cassandra Managed Instance المستهدفة.
توفير مجموعة Azure Databricks
نوصي بتحديد Databricks runtime الإصدار 7.5، والذي يدعم Spark 3.0.
إضافة التبعيات
أضف مكتبة Apache Spark Cassandra Connector إلى مجموعتك للاتصال بنقاط نهاية Azure Cosmos DB Cassandra الأصلية. في نظام المجموعة، حدد Libraries>Install New>Maven ثم أضف com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
إحداثيات Maven.
حدد Install، ثم أعد تشغيل نظام المجموعة عند اكتمال التثبيت.
ملاحظة
تأكد من إعادة تشغيل كتلة Databricks بعد تثبيت مكتبة Cassandra Connector.
إنشاء Scala Notebook للترحيل
قم بإنشاء Scala Notebook في Databricks. استبدل المصدر والهدف تكوينات Cassandra بمعلومات تسجيل الدخول المقابلة، ومسافات المفاتيح والجداول المصدر والهدف. ثم قم بتشغيل الكود التالي:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val sourceCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "false",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val targetCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(sourceCassandra)
.load
//Write to target Cassandra
DFfromSourceCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(targetCassandra)
.mode(SaveMode.Append) // only required for Spark 3.x
.save
ملاحظة
إذا كانت لديك حاجة للحفاظ على الأصلي writetime
لكل صف، فراجع عينة مرحل cassandra .