البرنامج التعليمي: الاتصال إلى Azure Cosmos DB ل NoSQL باستخدام Spark

ينطبق على: NoSQL

في هذا البرنامج التعليمي، يمكنك استخدام موصل Azure Cosmos DB Spark لقراءة البيانات أو كتابتها من حساب Azure Cosmos DB لحساب NoSQL. يستخدم هذا البرنامج التعليمي Azure Databricks ودفتر ملاحظات Jupyter لتوضيح كيفية التكامل مع واجهة برمجة التطبيقات ل NoSQL من Spark. يركز هذا البرنامج التعليمي على Python وSc scala، على الرغم من أنه يمكنك استخدام أي لغة أو واجهة يدعمها Spark.

في هذا البرنامج التعليمي، تتعلم كيفية:

  • الاتصال إلى API لحساب NoSQL باستخدام Spark ودفتر ملاحظات Jupyter.
  • إنشاء موارد حاويات وقواعد بيانات.
  • استيعاب البيانات إلى الحاوية.
  • الاستعلام عن البيانات في الحاوية.
  • تنفيذ العمليات الشائعة على العناصر الموجودة في الحاوية.

المتطلبات الأساسية

الاتصال باستخدام Spark وJupyter

استخدم مساحة عمل Azure Databricks الحالية لإنشاء مجموعة حوسبة جاهزة لاستخدام Apache Spark 3.4.x للاتصال بحساب Azure Cosmos DB الخاص بك ل NoSQL.

  1. افتح مساحة عمل Azure Databricks.

  2. في واجهة مساحة العمل، قم بإنشاء نظام مجموعة جديد. قم بتكوين نظام المجموعة باستخدام هذه الإعدادات، كحد أدنى:

    إصدار القيمة‬
    إصدار وقت التشغيل 13.3 LTS (Scala 2.12، Spark 3.4.1)
  3. استخدم واجهة مساحة العمل للبحث عن حزم Maven من Maven Central مع معرف مجموعة .com.azure.cosmos.spark قم بتثبيت الحزمة خصيصا ل Spark 3.4 مع معرف Artifact مسبوق بالمجموعة azure-cosmos-spark_3-4 .

  4. وأخيرا، قم بإنشاء دفتر ملاحظات جديد.

    تلميح

    بشكل افتراضي، يتم إرفاق دفتر الملاحظات بالمجموعة التي تم إنشاؤها مؤخرا.

  5. ضمن دفتر الملاحظات، قم بتعيين إعدادات تكوين معالجة المعاملات عبر الإنترنت (OLTP) لنقطة نهاية حساب NoSQL واسم قاعدة البيانات واسم الحاوية.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

إنشاء قاعدة بيانات وحاوية

استخدم واجهة برمجة تطبيقات الكتالوج لإدارة موارد الحساب مثل قواعد البيانات والحاويات. بعد ذلك، يمكنك استخدام OLTP لإدارة البيانات داخل موارد الحاوية.

  1. تكوين واجهة برمجة تطبيقات الكتالوج لإدارة واجهة برمجة التطبيقات لموارد NoSQL باستخدام Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. إنشاء قاعدة بيانات جديدة باسم cosmicworks باستخدام CREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. إنشاء حاوية جديدة باسم products باستخدام CREATE TABLE IF NOT EXISTS. تأكد من تعيين مسار مفتاح القسم إلى /category وتمكين معدل نقل التحجيم التلقائي مع الحد الأقصى لمعدل نقل 1000 وحدات الطلب (RUs) في الثانية.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. إنشاء حاوية أخرى باسم employees باستخدام تكوين مفتاح قسم هرمي. استخدم /organizationو /departmentو /team كمسارات مفتاح القسم. اتبع هذا الترتيب المحدد. أيضا، قم بتعيين معدل النقل إلى كمية يدوية من 400 وحدات الطلب.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. قم بتشغيل خلايا دفتر الملاحظات للتحقق من إنشاء قاعدة البيانات والحاويات داخل واجهة برمجة التطبيقات لحساب NoSQL.

استيعاب البيانات

إنشاء عينة مجموعة بيانات. ثم استخدم OLTP لاستيعاب تلك البيانات إلى واجهة برمجة التطبيقات لحاوية NoSQL.

  1. إنشاء عينة مجموعة بيانات.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. استخدم spark.createDataFrame وتكوين OLTP المحفوظ مسبقا لإضافة بيانات نموذجية إلى الحاوية الهدف.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

بيانات الاستعلام

تحميل بيانات OLTP في إطار بيانات لإجراء استعلامات شائعة على البيانات. يمكنك استخدام بناء الجمل المختلفة لتصفية البيانات أو الاستعلام عن البيانات.

  1. استخدم spark.read لتحميل بيانات OLTP في كائن إطار بيانات. استخدم نفس التكوين الذي استخدمته سابقا في هذا البرنامج التعليمي. أيضا، قم بتعيين spark.cosmos.read.inferSchema.enabled إلى true للسماح لموصل Spark باستنتاج المخطط عن طريق أخذ عينات من العناصر الموجودة.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. عرض مخطط البيانات المحملة في إطار البيانات باستخدام printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. عرض صفوف البيانات حيث quantity يكون العمود أقل من 20. where استخدم الدالتين و show لتنفيذ هذا الاستعلام.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. عرض صف البيانات الأول حيث clearance يكون trueالعمود . استخدم الدالة filter لتنفيذ هذا الاستعلام.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. عرض خمسة صفوف من البيانات بدون عامل تصفية أو اقتطاع. استخدم الدالة show لتخصيص مظهر وعدد الصفوف التي يتم عرضها.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. استعلم عن بياناتك باستخدام سلسلة استعلام NoSQL الأولية هذه: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

تنفيذ العمليات الشائعة

عند العمل مع واجهة برمجة التطبيقات لبيانات NoSQL في Spark، يمكنك إجراء تحديثات جزئية أو العمل مع البيانات ك JSON أولي.

  1. لإجراء تحديث جزئي لعنصر:

    1. انسخ متغير التكوين الموجود config وعدل الخصائص في النسخة الجديدة. على وجه التحديد، قم بتكوين استراتيجية الكتابة إلى ItemPatch. ثم قم بتعطيل الدعم المجمع. تعيين الأعمدة والعمليات المعينة. وأخيرا، قم بتعيين نوع العملية الافتراضية إلى Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. إنشاء متغيرات لمفتاح قسم العنصر والمعرف الفريد الذي تنوي استهدافه كجزء من عملية التصحيح هذه.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. إنشاء مجموعة من كائنات التصحيح لتحديد العنصر الهدف وتحديد الحقول التي يجب تعديلها.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. إنشاء إطار بيانات باستخدام مجموعة من كائنات التصحيح. استخدم write لتنفيذ عملية التصحيح.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. قم بتشغيل استعلام لمراجعة نتائج عملية التصحيح. يجب الآن تسمية Yamba New Surfboard العنصر دون أي تغييرات أخرى.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. للعمل مع بيانات JSON الأولية:

    1. انسخ متغير التكوين الموجود config وعدل الخصائص في النسخة الجديدة. على وجه التحديد، قم بتغيير الحاوية الهدف إلى employees. ثم قم بتكوين contacts العمود/الحقل لاستخدام بيانات JSON الأولية.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. إنشاء مجموعة من الموظفين لاستيعابها في الحاوية.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. إنشاء إطار بيانات واستخدامه write لاستيعاب بيانات الموظف.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. عرض البيانات من إطار البيانات باستخدام show. لاحظ أن contacts العمود هو JSON الخام في الإخراج.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

الخطوة التالية