Quickstart: Gegevens beheren met Azure Cosmos DB Spark 3 OLTP-connector voor SQL API

VAN TOEPASSING OP: SQL-API

Deze zelfstudie is een snelstartgids om te laten zien hoe u met Cosmos DB Spark-connector kunt lezen van of schrijven naar Cosmos DB. Cosmos DB Spark-connector is gebaseerd op Spark 3.1.x.

In deze snelle zelfstudie vertrouwen we op Azure Databricks Runtime 8.0 met Spark 3.1.1 en een Jupyter Notebook om te laten zien hoe u de Cosmos DB Spark-connector gebruikt.

U kunt ook elke andere Spark 3.1.1 Spark-aanbieding gebruiken. U moet ook elke taal kunnen gebruiken die wordt ondersteund door Spark (PySpark, Scala, Java, enzovoort) of een Spark-interface die u kent (Jupyter Notebook, Livy, enzovoort).

Vereisten

SLF4J is alleen nodig als u logboekregistratie wilt gebruiken. Download ook een SLF4J-binding, die de SLF4J-API koppelt aan de implementatie van de logboekregistratie van uw keuze. Zie de SLF4J-gebruikershandleiding voor meer informatie.

Installeer Cosmos DB Spark-connector in uw Spark-cluster azure-cosmos-spark_3-1_2-12-4.3.1.jar

De aan de slag-handleiding is gebaseerd op PySpark, maar u kunt ook de equivalente scala-versie gebruiken en u kunt het volgende codefragment uitvoeren in een Azure Databricks PySpark-notebook.

Databases en containers maken

Stel eerst de Cosmos DB in en de Cosmos DB databasenaam en containernaam.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

Vervolgens kunt u de nieuwe Catalog-API gebruiken om een Cosmos DB database en container te maken via Spark.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

Wanneer u containers maakt met de Catalog-API, kunt u het doorvoer- en partitiesleutelpad instellen voor de container die moet worden gemaakt.

Zie de volledige catalogi-API-documentatie voor meer informatie.

Gegevens opnemen

De naam van de gegevensbron is en in het volgende voorbeeld ziet u hoe u een geheugengegevensframe kunt schrijven dat bestaat uit twee items die cosmos.oltp u Cosmos DB:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Houd er id rekening mee dat een verplicht veld voor Cosmos DB.

Zie de volledige documentatie voor schrijfconfiguratie voor meer informatie over het opnemen van gegevens.

Querygegevens

Met dezelfde gegevensbron kunnen we gegevens cosmos.oltp opvragen en gebruiken om filter filters omlaag te pushen:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

Zie de volledige documentatie over queryconfiguratie voor meer informatie over het uitvoeren van query's op gegevens.

Schema-deductie

Bij het uitvoeren van query's op gegevens kan de Spark-connector het schema afleiden op basis van het nemen van steekproeven van bestaande items door in te spark.cosmos.read.inferSchema.enabled stellen op true .

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()

U kunt ook het aangepaste schema doorgeven dat u wilt gebruiken om de gegevens te lezen:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Als er geen aangepast schema is opgegeven en schemadeferentie is uitgeschakeld, retourneren de resulterende gegevens de onbewerkte JSON-inhoud van de items:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Zie de volledige documentatie voor de configuratie van schemadeferentie voor meer informatie met betrekking tot schemadeferentie.

Configuratieverwijzing

Algemene configuratie

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.accountEndpoint Geen Cosmos DB account-eindpunt-URI
spark.cosmos.accountKey Geen Cosmos DB accountsleutel
spark.cosmos.database Geen Cosmos DB databasenaam
spark.cosmos.container Geen Cosmos DB containernaam

Extra afstemming

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.useGatewayMode false Gatewaymodus gebruiken voor de clientbewerkingen
spark.cosmos.read.forceEventualConsistency true Zorgt ervoor dat de client de consistentie Uiteindelijk gebruikt voor leesbewerkingen in plaats van de consistentie op standaardaccountniveau te gebruiken
spark.cosmos.applicationName Geen De naam van de toepassing
spark.cosmos.preferredRegionsList Geen Lijst met voorkeursregio's die moeten worden gebruikt voor een account voor Cosmos DB meerdere regio's. Dit is een door komma's gescheiden waarde (bijvoorbeeld of ) opgegeven voorkeursregio's [East US, West US] East US, West US worden gebruikt als hint. Gebruik een co-ocated Spark-cluster met uw Cosmos DB-account en geef de spark-clusterregio door als voorkeursregio. Bekijk hier de lijst met Azure-regio's. U kunt ook gebruiken spark.cosmos.preferredRegions als alias
spark.cosmos.diagnostics Geen Kan worden gebruikt om uitgebreidere diagnostische gegevens in te stellen. Momenteel is de enige ondersteunde optie om deze eigenschap in te stellen op . Hierdoor worden extra logboeken als logboeken in de logboeken stuurprogramma's simple INFO en uitvoerders uitgezonden.

Configuratie schrijven

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.write.strategy ItemOverwrite Cosmos DB Schrijfstrategie voor items: (met upsert), (met behulp van maken, bestaande items negeren die ItemOverwrite ItemAppend conflicten zijn), (alle documenten ItemDelete verwijderen) (alle documenten verwijderen waarvoor de etag niet is ItemDeleteIfNotModified gewijzigd)
spark.cosmos.write.maxRetryCount 10 Cosmos DB Aantal nieuwe pogingen schrijven bij mislukte pogingen die opnieuw kunnen worden geprobeerd (bijvoorbeeld verbindingsfout)
spark.cosmos.write.point.maxConcurrency Geen Cosmos DB Maximale gelijktijdigheid van item schrijven. Als dit niet wordt opgegeven, wordt dit bepaald op basis van de VM-grootte van de Spark-uitvoerder
spark.cosmos.write.bulk.maxPendingOperations Geen Cosmos DB voor de modus Item schrijven maximaal aantal in behandeling zijnde bewerkingen. Hiermee definieert u een limiet voor het aantal bulkbewerkingen dat gelijktijdig wordt verwerkt. Als dit niet wordt opgegeven, wordt dit bepaald op basis van de VM-grootte van de Spark-uitvoerder. Als de hoeveelheid gegevens groot is voor de inrichtende doorvoer op de doelcontainer, kan deze instelling worden aangepast aan de hand van de schatting van 1000 x Cores
spark.cosmos.write.bulk.enabled true Cosmos DB Item schrijven bulksgewijs ingeschakeld

Query-configuratie

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.read.customQuery Geen Wanneer deze wordt opgegeven, wordt de aangepaste query verwerkt op het Cosmos-eindpunt in plaats van de query dynamisch te genereren via push-down predicate. Meestal is het raadzaam om te vertrouwen op de predicaat-push van Spark, omdat hierdoor de meest efficiënte set filters kan worden gegenereerd op basis van het queryplan. Maar er zijn een aantal predicaten zoals statistische gegevens (aantal, groeperen op, gemiddelde, som, enzovoort) die nog niet kunnen worden gepusteld (ten minste in Spark 3.1). De aangepaste query is dus een terugval om ze te pushen naar de query die naar Cosmos wordt verzonden. Als dit is opgegeven, wordt de aangepaste query ook gebruikt om het schema af te ronden als schemadeferentie is ingeschakeld.
spark.cosmos.read.maxItemCount 1000 Overschrijvingen het maximum aantal documenten dat kan worden geretourneerd voor één query- of wijzigingsfeedaanvraag. De standaardwaarde is . Overweeg dit alleen te verhogen voor gemiddelde documentgrootten kleiner dan 1 kB of wanneer projectie het aantal eigenschappen dat in query's is geselecteerd aanzienlijk vermindert (bijvoorbeeld wanneer u alleen 'ID' van documenten 1000 selecteert, enzovoort).

Configuratie van schemadeferentie

Bij het uitvoeren van leesbewerkingen kunnen gebruikers een aangepast schema opgeven of de connector toestaan om dit af te lezen. Schemadeferentie is standaard ingeschakeld.

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.read.inferSchema.enabled true Wanneer schemadeferentie is uitgeschakeld en de gebruiker geen schema oplevert, wordt onbewerkte json geretourneerd.
spark.cosmos.read.inferSchema.query SELECT * FROM r Wanneer schemadeferentie is ingeschakeld, gebruikt als aangepaste query om deze af te afleiden. Als u bijvoorbeeld meerdere entiteiten met verschillende schema's in een container opgeslagen en u ervoor wilt zorgen dat de deferentie alleen bepaalde documenttypen bekijkt of als u alleen bepaalde kolommen wilt projecteren.
spark.cosmos.read.inferSchema.samplingSize 1000 Steekproefgrootte die moet worden gebruikt bij het afleiden van een schema en niet voor het gebruik van een query.
spark.cosmos.read.inferSchema.includeSystemProperties false Wanneer schemadeferentie is ingeschakeld, of het resulterende schema alle systeemeigenschappen Cosmos DB bevat.
spark.cosmos.read.inferSchema.includeTimestamp false Wanneer schemadeferentie is ingeschakeld, of het resulterende schema de tijdstempel van het document bevat ( _ts ). Niet vereist als spark.cosmos.read.inferSchema.includeSystemProperties is ingeschakeld, omdat deze al alle systeemeigenschappen bevat.
spark.cosmos.read.inferSchema.forceNullableProperties true Wanneer schemadeferentie is ingeschakeld, of het resulterende schema ervoor zorgt dat alle kolommen null zijn. Standaard worden alle kolommen (met uitzondering van cosmos-systeemeigenschappen) beschouwd als null-waarden, zelfs als alle rijen in de voorbeeldset niet-null-waarden hebben. Wanneer deze functie is uitgeschakeld, worden de afgeleide kolommen beschouwd als null-waarden of niet, afhankelijk van of een record in de voorbeeldset null-waarden in een kolom heeft.

Serialisatie-configuratie

Wordt gebruikt om het json-serialisatie-/deserialisatiegedrag te beïnvloeden

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.serialization.inclusionMode Always Hiermee bepaalt u of null-/standaardwaarden worden geseraliseerd naar json of dat eigenschappen met een null-/standaardwaarde worden overgeslagen. Het gedrag volgt dezelfde ideeën als JsonInclude.Includevan Ideas. Always betekent dat json-eigenschappen worden gemaakt, zelfs voor null- en standaardwaarden. NonNull betekent dat er geen json-eigenschappen worden gemaakt voor expliciete null-waarden. NonEmpty betekent dat json-eigenschappen niet worden gemaakt voor lege tekenreekswaarden of lege matrices/mpas. NonDefault betekent dat json-eigenschappen niet alleen worden overgeslagen voor null/leeg, maar ook wanneer de waarde bijvoorbeeld identiek is aan de standaardwaarde voor 0 numerieke eigenschappen.

Feed wijzigen (alleen voor Spark-Streaming van cosmos.oltp.changeFeed een gegevensbron, die alleen-lezen is)

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.changeFeed.startFrom Beginning ChangeFeed Start from settings ( , of een bepaald Now Beginning tijdstip (UTC) bijvoorbeeld 2020-02-10T14:15:03 ) - de standaardwaarde is Beginning . Als de schrijf config een bevat en er controlepunten bestaan, wordt de stroom altijd voortgezet onafhankelijk van de instellingen. U moet controlepunten wijzigen of verwijderen om de stroom opnieuw te starten als dat de bedoeling checkpointLocation spark.cosmos.changeFeed.startFrom checkpointLocation is.
spark.cosmos.changeFeed.mode Incremental ChangeFeed mode ( Incremental of FullFidelity ) - OPMERKING: is momenteel FullFidelity experimenteel. Hiervoor is vereist dat het abonnement/account is ingeschakeld voor de privépreview en dat er bekende belangrijke wijzigingen zullen plaatsvinden voor (schema van FullFidelity de geretourneerde documenten). Het is raadzaam om op dit moment alleen te gebruiken voor FullFidelity niet-productiescenario's.
spark.cosmos.changeFeed.itemCountPerTriggerHint Geen Bij benadering maximum aantal items dat uit de wijzigingsfeed wordt gelezen voor elke microbatch/trigger

Configuratie van JSON-conversie

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.read.schemaConversionMode Relaxed Het gedrag van schemaconversie ( Relaxed , Strict ). Als een document bij het lezen van JSON-documenten een kenmerk bevat dat niet is toe te schrijven aan het schematype, kan de gebruiker beslissen of een waarde (Relaxed) of een uitzondering null (Strict) moet worden gebruikt.

Configuratie van partitioneringsstrategie

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.read.partitioning.strategy Default De gebruikte partitioneringsstrategie (standaard, aangepast, beperkend of agressief)
spark.cosmos.partitioning.targetedCount Geen Het beoogde aantal partities. Deze parameter is optioneel en wordt genegeerd tenzij strategy==Custom wordt gebruikt. In dit geval berekent de Spark-connector niet dynamisch het aantal partities, maar houdt deze waarde aan.

Configuratie van doorvoerbeheer

Naam van configuratie-eigenschap Standaard Beschrijving
spark.cosmos.throughputControl.enabled false Of doorvoerbeheer is ingeschakeld
spark.cosmos.throughputControl.name Geen Naam van besturingselementgroep voor doorvoer
spark.cosmos.throughputControl.targetThroughput Geen Doeldoorvoer van controlegroep voor doorvoer
spark.cosmos.throughputControl.targetThroughputThreshold Geen Doeldoorvoerdrempel voor doorvoerbeheergroep
spark.cosmos.throughputControl.globalControl.database Geen Database, die wordt gebruikt voor globale doorvoercontrole
spark.cosmos.throughputControl.globalControl.container Geen Container, die wordt gebruikt voor globale doorvoercontrole
spark.cosmos.throughputControl.globalControl.renewIntervalInMS 5s Hoe vaak de client het doorvoergebruik van zichzelf gaat bijwerken
spark.cosmos.throughputControl.globalControl.expireIntervalInMS 11s Hoe snel een offline client wordt gedetecteerd

Volgende stappen