Tutorial: Conexión a Azure Cosmos DB for NoSQL mediante Spark

SE APLICA A: NoSQL

En este tutorial, usará el conector Spark de Azure Cosmos DB para leer o escribir datos de una cuenta de Azure Cosmos DB for NoSQL. En este tutorial se usa Azure Databricks y un cuaderno de Jupyter para ilustrar cómo integrar la API para NoSQL desde Spark. Este tutorial se centra en Python y Scala, aunque puede usar cualquier lenguaje o interfaz compatible con Spark.

En este tutorial, aprenderá a:

  • Conexión a una cuenta de API para NoSQL mediante Spark y un cuaderno de Jupyter
  • Creación de recursos de base de datos y contenedor
  • Ingesta de datos en el contenedor
  • Consulta de datos en el contenedor
  • Realización de operaciones comunes en elementos del contenedor

Requisitos previos

Conexión mediante Spark y Jupyter

Use el área de trabajo existente de Azure Databricks para crear un clúster de proceso listo para usar Apache Spark 3.4.x para conectarse a la cuenta de Azure Cosmos DB for NoSQL.

  1. Abra el área de trabajo de Azure Databricks.

  2. En la interfaz del área de trabajo, cree un nuevo clúster. Configure el clúster con estas opciones, como mínimo:

    Valor
    Versión del entorno de ejecución 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Use la interfaz del área de trabajo para buscar paquetes de Maven en Maven Central con un identificador de grupocom.azure.cosmos.spark. Instale el paquete específico para Spark 3.4 con un identificador de artefacto con el prefijo azure-cosmos-spark_3-4 en el clúster.

  4. Por último, cree un nuevo cuaderno.

    Sugerencia

    De forma predeterminada, el cuaderno se asociará al clúster creado recientemente.

  5. En el cuaderno, establezca las opciones de configuración de OLTP para el punto de conexión de la cuenta NoSQL, el nombre de la base de datos y el nombre del contenedor.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Creación de una base de datos y un contenedor

Use la API de catálogo para administrar los recursos de la cuenta, como las bases de datos y los contenedores. Después, puede usar OLTP para administrar los datos en los recursos de contenedor.

  1. Configure la API de catálogo para administrar los recursos de API para NoSQL mediante Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Cree una base de datos denominada cosmicworks mediante CREATE DATABASE IF NOT EXISTS.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Cree un nuevo contenedor denominado products mediante CREATE TABLE IF NOT EXISTS. Asegúrese de establecer la ruta de acceso de la clave de partición en /category y habilite la escalabilidad automática del rendimiento con un rendimiento máximo de 1000 unidades de solicitud por segundo (RU/s).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Cree otro contenedor denominado employees mediante una configuración de clave de partición jerárquica con /organization, /department y /team como conjunto de rutas de acceso de la clave de partición en ese orden específico. Además, establezca el rendimiento en una cantidad manual de 400 RU/s.

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Ejecute las celdas del cuaderno para validar que la base de datos y los contenedores se crean dentro de la cuenta de API para NoSQL.

Ingerir datos

Cree un conjunto de datos de muestra y use OLTP para ingerir esos datos en el contenedor de API para NoSQL.

  1. Cree un conjunto de datos de muestra.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Use spark.createDataFrame y la configuración de OLTP guardada anteriormente para agregar datos de muestra al contenedor de destino.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Consultar datos

Cargue los datos de OLTP en un objeto DataFrame para realizar consultas comunes en los datos. Puede usar varias sintaxis para filtrar o consultar los datos.

  1. Use spark.read para cargar los datos de OLTP en un objeto DataFrame. Use la misma configuración que se usó anteriormente en este tutorial. Además, establezca spark.cosmos.read.inferSchema.enabled en true para permitir que el conector Spark infiera el esquema mediante el muestreo de elementos existentes.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Represente el esquema de los datos cargados en el objeto DataFrame mediante printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Represente filas de datos en las que la columna quantity es menor que 20. Use las funciones where y show para realizar esta consulta.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Represente la primera fila de datos donde la columna clearance es true. Use la función filter para realizar esta consulta.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Represente cinco filas de datos sin filtro ni truncamiento. Use la función show para personalizar la apariencia y el número de filas que se representan.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Consulte los datos mediante esta cadena de consulta NoSQL sin formato: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Realización de operaciones comunes

Al trabajar con datos de la API para NoSQL en Spark, puede realizar actualizaciones parciales o trabajar con datos como JSON sin procesar.

  1. Para realizar una actualización parcial de un elemento, siga estos pasos:

    1. Copie la variable de configuración config existente y modifique las propiedades de la nueva copia. Específicamente, configure la estrategia de escritura en ItemPatch, deshabilite la compatibilidad masiva, establezca las columnas y las operaciones asignadas y, por último, establezca el tipo de operación predeterminado en Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Cree variables para la clave de partición del elemento y el identificador único que quiere tener como destino como parte de esta operación de revisión.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Cree un conjunto de objetos de revisión para definir el elemento de destino y especifique los campos que se deben modificar.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Cree un objeto DataFrame mediante el conjunto de objetos de revisión y use write para realizar la operación de revisión.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Ejecute una consulta para revisar los resultados de la operación de revisión. El elemento ahora debería denominarse Yamba New Surfboard sin ningún otro cambio.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Para trabajar con datos JSON sin procesar, realice estos pasos:

    1. Copie la variable de configuración config existente y modifique las propiedades de la nueva copia. Específicamente, cambie el contenedor de destino a employees y configure la columna o el campo contacts para usar datos JSON sin procesar.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Cree un conjunto de empleados que se ingerirán en el contenedor.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Cree un objeto DataFrame y use write para ingerir los datos de los empleados.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Represente los datos del objeto DataFrame mediante show. Observe que la columna contacts son datos JSON sin procesar en la salida.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Paso siguiente