Zelfstudie: Verbinding maken naar Azure Cosmos DB for NoSQL met behulp van Spark
VAN TOEPASSING OP: NoSQL
In deze zelfstudie gebruikt u de Azure Cosmos DB Spark-connector voor het lezen of schrijven van gegevens uit een Azure Cosmos DB for NoSQL-account. In deze zelfstudie wordt gebruikgemaakt van Azure Databricks en een Jupyter-notebook om te laten zien hoe u integreert met de API voor NoSQL vanuit Spark. Deze zelfstudie is gericht op Python en Scala, ook al kunt u elke taal of interface gebruiken die wordt ondersteund door Spark.
In deze zelfstudie leert u het volgende:
- Verbinding maken naar een API voor NoSQL-account met behulp van Spark en een Jupyter-notebook
- Database- en containerbronnen maken
- Gegevens opnemen in de container
- Query's uitvoeren op gegevens in de container
- Algemene bewerkingen uitvoeren op items in de container
Vereisten
- Een bestaand Azure Cosmos DB for NoSQL-account.
- Als u een bestaand Azure-abonnement hebt, maakt u een nieuw account.
- Geen Azure-abonnement? U kunt Azure Cosmos DB gratis proberen zonder dat er een creditcard is vereist.
- Een bestaande Azure Databricks-werkruimte.
Verbinding maken spark en Jupyter gebruiken
Gebruik uw bestaande Azure Databricks-werkruimte om een rekencluster te maken dat gereed is voor het gebruik van Apache Spark 3.4.x om verbinding te maken met uw Azure Cosmos DB for NoSQL-account.
Open uw Azure Databricks-werkruimte.
Maak een nieuw cluster in de werkruimte-interface. Configureer het cluster met deze instellingen minimaal:
Value Runtime-versie 13.3 LTS (Scala 2.12, Spark 3.4.1) Gebruik de werkruimte-interface om te zoeken naar Maven-pakketten van Maven Central met een groeps-id van
com.azure.cosmos.spark
. Installeer het pakket dat specifiek is voor Spark 3.4 met een artefact-id voorafgegaan doorazure-cosmos-spark_3-4
het cluster.Maak ten slotte een nieuw notitieblok.
Tip
Standaard wordt het notebook gekoppeld aan het onlangs gemaakte cluster.
Stel in het notebook OLTP-configuratie-instellingen in voor noSQL-accounteindpunt, databasenaam en containernaam.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Een database en container maken
Gebruik de Catalogus-API om accountbronnen, zoals databases en containers, te beheren. Vervolgens kunt u OLTP gebruiken om gegevens te beheren binnen de containerresource[s].
Configureer de Catalogus-API voor het beheren van API voor NoSQL-resources met behulp van Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Maak een nieuwe database met de naam
cosmicworks
.CREATE DATABASE IF NOT EXISTS
# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Maak een nieuwe container met de naam
products
.CREATE TABLE IF NOT EXISTS
Zorg ervoor dat u het pad/category
naar de partitiesleutel instelt en doorvoer voor automatische schaalaanpassing inschakelt met een maximale doorvoer van1000
aanvraageenheden per seconde (RU/s).# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Maak een andere container met de naam
employees
met behulp van een hiƫrarchische partitiesleutelconfiguratie met/organization
,/department
en/team
als de set partitiesleutelpaden in die specifieke volgorde. Stel de doorvoer ook in op een handmatige400
hoeveelheid RU/s# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Voer de notebookcel[s] uit om te controleren of uw database en containers zijn gemaakt in uw API voor NoSQL-account.
Gegevens opnemen
Maak een voorbeeldgegevensset en gebruik vervolgens OLTP om die gegevens op te nemen in de API voor NoSQL-container.
Maak een voorbeeldgegevensset.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Gebruik
spark.createDataFrame
en de eerder opgeslagen OLTP-configuratie om voorbeeldgegevens toe te voegen aan de doelcontainer.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Querygegevens
LAAD OLTP-gegevens in een gegevensframe om algemene query's uit te voeren op de gegevens. U kunt verschillende syntaxisfilters gebruiken of gegevens opvragen.
Gebruik
spark.read
deze functie om de OLTP-gegevens in een dataframeobject te laden. Gebruik dezelfde configuratie die eerder in deze zelfstudie is gebruikt.spark.cosmos.read.inferSchema.enabled
Stel deze eigenschap ook in op true zodat de Spark-connector het schema kan afleiden door bestaande items te bemonsteren.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Geef het schema weer van de gegevens die in het dataframe zijn geladen met behulp van
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Gegevensrijen weergeven waarin de
quantity
kolom kleiner is dan20
. Gebruik dewhere
enshow
functies om deze query uit te voeren.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Geef de eerste gegevensrij weer waarin de
clearance
kolom waar is. Gebruik defilter
functie om deze query uit te voeren.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Geef vijf rijen met gegevens weer zonder filter of afkapping. Gebruik de
show
functie om het uiterlijk en het aantal rijen aan te passen dat wordt weergegeven.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Voer een query uit op uw gegevens met behulp van deze onbewerkte NoSQL-queryreeks:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Algemene bewerkingen uitvoeren
Wanneer u werkt met API voor NoSQL-gegevens in Spark, kunt u gedeeltelijke updates uitvoeren of met gegevens werken als onbewerkte JSON.
Voer de volgende stappen uit om een gedeeltelijke update van een item uit te voeren:
Kopieer de bestaande configuratievariabele
config
en wijzig de eigenschappen in de nieuwe kopie. Specifiek; configureer de schrijfstrategie om bulkondersteuning uit teItemPatch
schakelen, de kolommen en toegewezen bewerkingen in te stellen en ten slotte het standaardbewerkingstype in teSet
stellen op .# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Maak variabelen voor de partitiesleutel van het item en de unieke id die u wilt targeten als onderdeel van deze patchbewerking.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Maak een set patchobjecten om het doelitem op te geven en geef velden op die moeten worden gewijzigd.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Maak een gegevensframe met behulp van de set patchobjecten en gebruik
write
deze om de patchbewerking uit te voeren.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Voer een query uit om de resultaten van de patchbewerking te bekijken. Het item moet nu worden benoemd
Yamba New Surfboard
zonder andere wijzigingen.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Voer de volgende stappen uit om te werken met onbewerkte JSON-gegevens:
Kopieer de bestaande configuratievariabele
config
en wijzig de eigenschappen in de nieuwe kopie. Specifiek; wijzig de doelcontainer inemployees
en configureer de kolom/hetcontacts
veld om onbewerkte JSON-gegevens te gebruiken.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Maak een set werknemers die u wilt opnemen in de container.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Maak een gegevensframe en gebruik
write
deze om de werknemersgegevens op te nemen.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Geef de gegevens uit het gegevensframe weer met behulp van
show
. U ziet dat decontacts
kolom onbewerkte JSON in de uitvoer is.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Gerelateerde inhoud
- Apache Spark
- Catalogus-API voor Azure Cosmos DB
- Naslaginformatie over configuratieparameters
- Voorbeeld van notebook 'New York City Taxi-gegevens'
- Migreren van Spark 2.4 naar Spark 3.*
- Versiecompatibiliteit
- Opmerkingen bij de release
- Koppelingen downloaden