Руководство по Подключение в Azure Cosmos DB для NoSQL с помощью Spark

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

В этом руководстве вы используете соединитель Spark Azure Cosmos DB для чтения или записи данных из учетной записи Azure Cosmos DB для NoSQL. В этом руководстве используется Azure Databricks и записная книжка Jupyter для иллюстрации интеграции с API для NoSQL из Spark. В этом руководстве основное внимание уделяется Python и Scala, даже если вы можете использовать любой язык или интерфейс, поддерживаемый Spark.

В этом руководстве описано следующее:

  • Подключение в учетную запись API для NoSQL с помощью Spark и записной книжки Jupyter
  • Создание ресурсов базы данных и контейнеров
  • Прием данных в контейнер
  • Запрос данных в контейнере
  • Выполнение распространенных операций с элементами в контейнере

Необходимые компоненты

Подключение с помощью Spark и Jupyter

Используйте существующую рабочую область Azure Databricks для создания вычислительного кластера, готового к использованию Apache Spark 3.4.x для подключения к учетной записи Azure Cosmos DB для NoSQL.

  1. Откройте рабочую область Azure Databricks.

  2. В интерфейсе рабочей области создайте новый кластер. Настройте кластер с этими параметрами как минимум:

    Value
    Версия среды выполнения 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Используйте интерфейс рабочей области для поиска пакетов Maven из Maven Central с идентификатором com.azure.cosmos.sparkгруппы. Установите пакет, характерный для Spark 3.4, с префиксом Идентификатора артефакта, заданным в azure-cosmos-spark_3-4 кластере.

  4. Наконец, создайте новую записную книжку.

    Совет

    По умолчанию записная книжка будет присоединена к недавно созданному кластеру.

  5. В записной книжке задайте параметры конфигурации OLTP для конечной точки учетной записи NoSQL, имени базы данных и имени контейнера.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Создание базы данных и контейнера

Используйте API каталога для управления ресурсами учетной записи, такими как базы данных и контейнеры. Затем можно использовать OLTP для управления данными в ресурсах контейнера].

  1. Настройте API каталога для управления ресурсами API NoSQL с помощью Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Создайте новую базу данных с именем cosmicworks с помощью CREATE DATABASE IF NOT EXISTS.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Создайте новый контейнер с именем products с помощью CREATE TABLE IF NOT EXISTS. Убедитесь, что путь /category ключа секции задан и включена пропускная способность автомасштабирования с максимальной пропускной способностью 1000 единиц запросов в секунду (ЕЗ/с).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Создайте другой контейнер с именем employees с помощью конфигурации иерархического ключа секции и /organization/department/team в качестве набора путей ключа секции в определенном порядке. Кроме того, задайте пропускную способность вручную в 400 ЕЗ/с.

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Запустите ячейки записной книжки, чтобы убедиться, что база данных и контейнеры созданы в вашей учетной записи API для NoSQL.

Прием данных

Создайте пример набора данных, а затем используйте OLTP для приема данных в контейнер API для NoSQL.

  1. Создайте пример набора данных.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Используйте spark.createDataFrame и ранее сохраненную конфигурацию OLTP для добавления примеров данных в целевой контейнер.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Запрос данных

Загрузите данные OLTP в кадр данных для выполнения общих запросов к данным. Вы можете использовать различные синтаксисы фильтрации или запроса данных.

  1. Используется spark.read для загрузки данных OLTP в объект кадра данных. Используйте ту же конфигурацию, используемую ранее в этом руководстве. Кроме того, задайте spark.cosmos.read.inferSchema.enabled значение true, чтобы разрешить соединителю Spark выводить схему путем выборки существующих элементов.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Отрисовка схемы данных, загруженных в кадр данных с помощью printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Отрисовка строк данных, quantity в которых столбец меньше 20. where Используйте функции и show функции для выполнения этого запроса.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Отрисовка первой строки данных, clearance в которой столбец имеет значение true. Используйте функцию filter для выполнения этого запроса.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Отрисовка пяти строк данных без фильтрации или усечения. Используйте функцию show для настройки внешнего вида и количества отображаемых строк.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Запросите данные с помощью этой необработанной строки запроса NoSQL: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Выполнение распространенных операций

При работе с данными API для NoSQL в Spark можно выполнять частичные обновления или работать с данными как необработанные JSON.

  1. Чтобы выполнить частичное обновление элемента, выполните следующие действия.

    1. Скопируйте существующую config переменную конфигурации и измените свойства в новой копии. Специально; настройте стратегию записи, чтобы ItemPatchотключить массовую поддержку, задать столбцы и сопоставленные операции и, наконец, задать тип Setоперации по умолчанию.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Создайте переменные для ключа секции элемента и уникальный идентификатор, предназначенный для выполнения этой операции исправления.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Создайте набор объектов исправлений, чтобы указать целевой элемент и указать поля, которые необходимо изменить.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Создайте кадр данных с помощью набора объектов исправлений и используйте write для выполнения операции исправления.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Запустите запрос, чтобы просмотреть результаты операции исправления. Теперь элемент должен быть назван Yamba New Surfboard без других изменений.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Чтобы работать с необработанными данными JSON, выполните следующие действия:

    1. Скопируйте существующую config переменную конфигурации и измените свойства в новой копии. Специально; измените целевой контейнер employeescontacts на столбец или поле для использования необработанных данных JSON.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Создайте набор сотрудников для приема в контейнер.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Создайте кадр данных и используйте write для приема данных сотрудника.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Отрисовка данных из кадра данных с помощью show. Обратите внимание, что contacts столбец является необработанным JSON в выходных данных.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Следующий шаг