Tutorial: Conexión a Azure Cosmos DB for NoSQL mediante Spark
SE APLICA A: NoSQL
En este tutorial, usará el conector Spark de Azure Cosmos DB para leer o escribir datos de una cuenta de Azure Cosmos DB for NoSQL. En este tutorial se usa Azure Databricks y un cuaderno de Jupyter para ilustrar cómo integrar la API para NoSQL desde Spark. Este tutorial se centra en Python y Scala, aunque puede usar cualquier lenguaje o interfaz compatible con Spark.
En este tutorial, aprenderá a:
- Conexión a una cuenta de API para NoSQL mediante Spark y un cuaderno de Jupyter
- Creación de recursos de base de datos y contenedor
- Ingesta de datos en el contenedor
- Consulta de datos en el contenedor
- Realización de operaciones comunes en elementos del contenedor
Requisitos previos
- Una cuenta existente de Azure Cosmos DB for NoSQL.
- Si tiene una suscripción de Azure, cree una nueva cuenta.
- ¿No tiene una suscripción de Azure? Puede probar Azure Cosmos DB de forma gratuita, sin necesidad de usar su tarjeta de crédito.
- Un área de trabajo existente de Azure Databricks.
Conexión mediante Spark y Jupyter
Use el área de trabajo existente de Azure Databricks para crear un clúster de proceso listo para usar Apache Spark 3.4.x para conectarse a la cuenta de Azure Cosmos DB for NoSQL.
Abra el área de trabajo de Azure Databricks.
En la interfaz del área de trabajo, cree un nuevo clúster. Configure el clúster con estas opciones, como mínimo:
Valor Versión del entorno de ejecución 13.3 LTS (Scala 2.12, Spark 3.4.1) Use la interfaz del área de trabajo para buscar paquetes de Maven en Maven Central con un identificador de grupo
com.azure.cosmos.spark
. Instale el paquete específico para Spark 3.4 con un identificador de artefacto con el prefijoazure-cosmos-spark_3-4
en el clúster.Por último, cree un nuevo cuaderno.
Sugerencia
De forma predeterminada, el cuaderno se asociará al clúster creado recientemente.
En el cuaderno, establezca las opciones de configuración de OLTP para el punto de conexión de la cuenta NoSQL, el nombre de la base de datos y el nombre del contenedor.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Creación de una base de datos y un contenedor
Use la API de catálogo para administrar los recursos de la cuenta, como las bases de datos y los contenedores. Después, puede usar OLTP para administrar los datos en los recursos de contenedor.
Configure la API de catálogo para administrar los recursos de API para NoSQL mediante Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Cree una base de datos denominada
cosmicworks
medianteCREATE DATABASE IF NOT EXISTS
.# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Cree un nuevo contenedor denominado
products
medianteCREATE TABLE IF NOT EXISTS
. Asegúrese de establecer la ruta de acceso de la clave de partición en/category
y habilite la escalabilidad automática del rendimiento con un rendimiento máximo de1000
unidades de solicitud por segundo (RU/s).# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Cree otro contenedor denominado
employees
mediante una configuración de clave de partición jerárquica con/organization
,/department
y/team
como conjunto de rutas de acceso de la clave de partición en ese orden específico. Además, establezca el rendimiento en una cantidad manual de400
RU/s.# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Ejecute las celdas del cuaderno para validar que la base de datos y los contenedores se crean dentro de la cuenta de API para NoSQL.
Ingerir datos
Cree un conjunto de datos de muestra y use OLTP para ingerir esos datos en el contenedor de API para NoSQL.
Cree un conjunto de datos de muestra.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Use
spark.createDataFrame
y la configuración de OLTP guardada anteriormente para agregar datos de muestra al contenedor de destino.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Consultar datos
Cargue los datos de OLTP en un objeto DataFrame para realizar consultas comunes en los datos. Puede usar varias sintaxis para filtrar o consultar los datos.
Use
spark.read
para cargar los datos de OLTP en un objeto DataFrame. Use la misma configuración que se usó anteriormente en este tutorial. Además, establezcaspark.cosmos.read.inferSchema.enabled
en true para permitir que el conector Spark infiera el esquema mediante el muestreo de elementos existentes.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Represente el esquema de los datos cargados en el objeto DataFrame mediante
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Represente filas de datos en las que la columna
quantity
es menor que20
. Use las funcioneswhere
yshow
para realizar esta consulta.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Represente la primera fila de datos donde la columna
clearance
es true. Use la funciónfilter
para realizar esta consulta.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Represente cinco filas de datos sin filtro ni truncamiento. Use la función
show
para personalizar la apariencia y el número de filas que se representan.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Consulte los datos mediante esta cadena de consulta NoSQL sin formato:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Realización de operaciones comunes
Al trabajar con datos de la API para NoSQL en Spark, puede realizar actualizaciones parciales o trabajar con datos como JSON sin procesar.
Para realizar una actualización parcial de un elemento, siga estos pasos:
Copie la variable de configuración
config
existente y modifique las propiedades de la nueva copia. Específicamente, configure la estrategia de escritura enItemPatch
, deshabilite la compatibilidad masiva, establezca las columnas y las operaciones asignadas y, por último, establezca el tipo de operación predeterminado enSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Cree variables para la clave de partición del elemento y el identificador único que quiere tener como destino como parte de esta operación de revisión.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Cree un conjunto de objetos de revisión para definir el elemento de destino y especifique los campos que se deben modificar.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Cree un objeto DataFrame mediante el conjunto de objetos de revisión y use
write
para realizar la operación de revisión.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Ejecute una consulta para revisar los resultados de la operación de revisión. El elemento ahora debería denominarse
Yamba New Surfboard
sin ningún otro cambio.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Para trabajar con datos JSON sin procesar, realice estos pasos:
Copie la variable de configuración
config
existente y modifique las propiedades de la nueva copia. Específicamente, cambie el contenedor de destino aemployees
y configure la columna o el campocontacts
para usar datos JSON sin procesar.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Cree un conjunto de empleados que se ingerirán en el contenedor.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Cree un objeto DataFrame y use
write
para ingerir los datos de los empleados.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Represente los datos del objeto DataFrame mediante
show
. Observe que la columnacontacts
son datos JSON sin procesar en la salida.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Contenido relacionado
- Spark de Apache
- API de catálogo de Azure Cosmos DB
- Referencia de parámetros de configuración
- Cuaderno de ejemplo "Datos de los taxis de Nueva York"
- Migración de Spark 2.4 a Spark 3.*
- Compatibilidad de versiones
- Notas de la versión
- Vínculos de descarga