التشغيل السريع: إدارة البيانات باستخدام Azure Cosmos DB Spark 3 OLTP Connector لواجهة برمجة تطبيقات SQL
ينطبق على:
واجهة برمجة تطبيقات SQL
هذا البرنامج التعليمي هو دليل بداية سريعة لإظهار كيفية استخدام Cosmos DB Spark Connector للقراءة من أو الكتابة إلى Cosmos DB. يدعم موصل Cosmos DB Spark Spark 3.1.x و 3.2.x.
خلال هذا البرنامج التعليمي السريع، نعتمد على Azure Databricks Runtime 8.0 مع Spark 3.1.1 و Jupyter Notebook لإظهار كيفية استخدام موصل Cosmos DB Spark، ولكن يمكنك أيضًا استخدام Azure Databricks Runtime 10.3 مع Spark 3.2.1.
يمكنك استخدام أي عرض Spark 3.1.1 أو Spark 3.2.1آخر كذلك، كما يجب أن تكون قادراً على استخدام أي لغة يدعمها Spark (PySpark، Scala، Java، إلخ)، أو أي واجهة Spark أنت على دراية بها (Jupyter Notebook Livy، إلخ).
المتطلبات الأساسية
حساب Azure نشط. إذا لم يكن لديك حساب، فيُمكنك تسجيل حساب مجاني. بدلاً من ذلك، يمكنك استخدام محاكي Azure Cosmos DB للتطوير والاختبار.
قواعد بيانات Azure وقت التشغيل 8.0 مع Spark 3.1.1 أو قواعد بيانات Azure وقت التشغيل 10.3 مع Spark 3.2.1.
(اختياري) يتم استخدام ربط SLF4J لربط إطار عمل تسجيل محدد مع SLF4J.
SLF4J مطلوب فقط إذا كنت تخطط لاستخدام التسجيل، وأيضاً تحميل ربط SLF4J، والذي سوف يربط API SLF4J مع تنفيذ للتسجيل من اختيارك. راجع دليل المستخدم SLF4J للحصول على مزيد من المعلومات.
قم بتثبيت موصل سبارك DB من Cosmos في مجموعة Spark باستخدام أحدث إصدار لـ Spark 3.1.x أو باستخدام أحدث إصدار لـ Spark 3.2.x .
dsjk دليل الشروع في العمل على PySpark، ولكن يمكنك استخدام الإصدار scala المكافئ كذلك، ويمكنك تشغيل مقتطف التعليمات البرمجية التالية في دفتر ملاحظات Azure Databricks PySpark.
إنشاء قواعد بيانات وحاويات
أولاً، تعيين بيانات اعتماد حساب Cosmos DB واسم قاعدة بيانات Cosmos DB واسم الحاوية.
cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"
cfg = {
"spark.cosmos.accountEndpoint" : cosmosEndpoint,
"spark.cosmos.accountKey" : cosmosMasterKey,
"spark.cosmos.database" : cosmosDatabaseName,
"spark.cosmos.container" : cosmosContainerName,
}
بعد ذلك، يمكنك استخدام واجهة برمجة تطبيقات Catalog الجديدة لإنشاء قاعدة بيانات وحاوية COSMOS DB خلال Spark.
# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))
عند إنشاء حاويات بواجهة برمجة تطبيقات Catalog، يمكنك تعيين معدل النقل ومسار مفتاح القسم للحاوية المراد إنشاؤها.
لمزيد من المعلومات، راجع وثائق واجهة برمجة التطبيقات لـ Catalog.
استيعاب البيانات
اسم مصدر البيانات هو cosmos.oltp، ويظهر المثال التالي كيف يمكنك كتابة حاسب بيانات ذاكرة يتكون من عنصرين إلى Cosmos DB:
spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
.toDF("id","name","age","isAlive") \
.write\
.format("cosmos.oltp")\
.options(**cfg)\
.mode("APPEND")\
.save()
لاحظ أن id حقل إلزامي لـ DB Cosmos.
لمزيد من المعلومات المتعلقة بالحصول على البيانات، راجع وثائق تكوين الكتابة الكاملة.
بيانات الاستعلام
باستخدام نفس مصدر البيانات cosmos.oltp، يمكننا الاستعلام عن البيانات واستخدام filter لدفع عوامل التصفية إلى الأسفل:
from pyspark.sql.functions import col
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.filter(col("isAlive") == True)\
.show()
لمزيد من المعلومات المتعلقة بالاستعلام عن البيانات، راجع وثائق تكوين الاستعلام الكاملة.
استنتاج المخطط
عند الاستعلام عن البيانات، يمكن لـ Spark Connector استنتاج المخطط استناداً إلى أخذ عينات من العناصر الموجودة عن طريق إعداد spark.cosmos.read.inferSchema.enabled إلى true.
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.printSchema()
بدلاً من ذلك، يمكنك تمرير المخطط المخصص الذي تريد استخدامه لقراءة البيانات:
customSchema = StructType([
StructField("id", StringType()),
StructField("name", StringType()),
StructField("type", StringType()),
StructField("age", IntegerType()),
StructField("isAlive", BooleanType())
])
df = spark.read.schema(customSchema).format("cosmos.oltp").options(**cfg)\
.load()
df.printSchema()
في حالة عدم تحديد مخطط مخصص وتعطيل استنتاج المخطط، ستقوم البيانات الناتجة بإرجاع محتوى Json الخام من العناصر:
df = spark.read.format("cosmos.oltp").options(**cfg)\
.load()
df.printSchema()
لمزيد من المعلومات المتعلقة بمرجع المخطط، راجع وثائق تكوين مرجع المخطط الكامل.
مرجع التكوين
يحتوي موصل Azure Cosmos DB Spark 3 OLTP لـ SQL API على مرجع تكوين كامل يوفر إعدادات إضافية ومتقدمة لكتابة البيانات والاستعلام عنها، وإنشاء تسلسل، ودفق باستخدام تغذية التغيير وإدارة التقسيم ومعدل النقل والمزيد. للحصول على قائمة كاملة بالتفاصيل، تحقق من مرجع تكوين Spark Connector على GitHub.
الخطوات التالية
- Azure Cosmos DB Apache Spark 3 OLTP Connector لواجهة برمجة تطبيقات Core (SQL): ملاحظات الإصدار والموارد
- تعرف على المزيد حول Apache Spark.