Snabbstart: Hantera data med Azure Cosmos DB Spark 3 OLTP-anslutningsapp för SQL API

GÄLLER FÖR: SQL API

Den här självstudien är en snabbstartsguide som visar hur du använder Cosmos DB Spark-anslutningsapp för att läsa från eller skriva till Cosmos DB. Cosmos DB Spark-anslutningsappen baseras på Spark 3.1.x.

I den här snabbskursen förlitar vi oss på Azure Databricks Runtime 8.0 med Spark 3.1.1 och en Jupyter Notebook som visar hur du använder Cosmos DB Spark Connector.

Du kan även använda andra Spark 3.1.1 Spark-erbjudanden. Du bör också kunna använda alla språk som stöds av Spark (PySpark, Scala, Java osv.) eller alla Spark-gränssnitt som du är bekant med (Jupyter Notebook, Livy osv.).

Förutsättningar

SLF4J behövs bara om du planerar att använda loggning. Du kan även ladda ned en SLF4J-bindning som länkar SLF4J-API:et med valfri loggningsimplementering. Mer information finns i användarhandboken för SLF4J.

Installera Cosmos DB Spark Connector i spark-klustret azure-cosmos-spark_3-1_2-12-4.3.1.jar

Kom igång-guiden baseras på PySpark, men du kan även använda motsvarande scala-version och du kan köra följande kodfragment i en Azure Databricks PySpark-anteckningsbok.

Skapa databaser och containrar

Ange först Cosmos DB-autentiseringsuppgifter och Cosmos DB databasnamn och containernamn.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

Sedan kan du använda det nya katalog-API:et för att skapa Cosmos DB databas och container via Spark.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

När du skapar containrar med katalog-API:et kan du ange dataflödes- och partitionsnyckelsökvägen för containern som ska skapas.

Mer information finns i den fullständiga katalogens API-dokumentation.

Mata in data

Namnet på datakällan är , och i följande exempel visas hur du kan skriva en minnesdataram som består av två cosmos.oltp objekt som ska Cosmos DB:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Observera att id är ett obligatoriskt fält för Cosmos DB.

Mer information om inmatning av data finns i den fullständiga dokumentationen för skrivkonfiguration.

Söka i data

Med hjälp av cosmos.oltp samma datakälla kan vi köra frågor mot data och använda för filter att skicka ned filter:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

Mer information om hur du frågar efter data finns i den fullständiga dokumentationen för frågekonfiguration.

Schemaferens

När du frågar efter data kan Spark-anslutningsappen dra slutsatser om schemat baserat på sampling av befintliga objekt genom att ange spark.cosmos.read.inferSchema.enabled till true .

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()

Du kan också skicka det anpassade schema som du vill ska användas för att läsa data:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Om inget anpassat schema har angetts och schemaferens har inaktiverats returnerar resulterande data Json-rådata för objekten:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Mer information om schemaferens finns i den fullständiga dokumentationen för schemareferenskonfiguration.

Konfigurationsreferens

Allmän konfiguration

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.accountEndpoint Ingen Cosmos DB slutpunkts-URI för konto
spark.cosmos.accountKey Ingen Cosmos DB-kontonyckel
spark.cosmos.database Ingen Cosmos DB databasnamn
spark.cosmos.container Ingen Cosmos DB containernamn

Extra justering

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.useGatewayMode false Använda gatewayläge för klientåtgärderna
spark.cosmos.read.forceEventualConsistency true Gör att klienten använder slutlig konsekvens för läsåtgärder i stället för att använda standardkonsekvens på kontonivå
spark.cosmos.applicationName Ingen Programnamn
spark.cosmos.preferredRegionsList Ingen Lista över önskade regioner som ska användas för ett konto Cosmos DB flera regioner. Det här är ett kommaavgränsat värde (till exempel [East US, West US] eller ) som prioriterade regioner används som East US, West US tips. Du bör använda ett samplacerat Spark-kluster med ditt Cosmos DB konto och skicka Spark-klusterregionen som önskad region. Se listan över Azure-regioner här. Du kan också använda spark.cosmos.preferredRegions som alias
spark.cosmos.diagnostics Ingen Kan användas för att aktivera mer utförlig diagnostik. För närvarande är det enda alternativet som stöds att ange den här egenskapen som – vilket resulterar i att extra loggar genereras som loggar i drivrutins- och simple INFO körningsloggarna.

Skrivkonfiguration

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.write.strategy ItemOverwrite Cosmos DB objekt skrivstrategi: (med upsert), (med hjälp av skapa, ignorera befintliga objekt ItemOverwrite som är, Konflikter), (ta bort alla dokument) (ta bort alla dokument som etag inte har ItemAppend ItemDelete ItemDeleteIfNotModified ändrats för)
spark.cosmos.write.maxRetryCount 10 Cosmos DB Skriv maximalt antal återförsök vid återförsöksbara fel (till exempel anslutningsfel)
spark.cosmos.write.point.maxConcurrency Ingen Cosmos DB högsta samtidighet för objektskrivning. Om det inte anges bestäms det baserat på vm-storleken för Spark-utföraren
spark.cosmos.write.bulk.maxPendingOperations Ingen Cosmos DB objekt , maximalt antal väntande åtgärder i massläge. Definierar en gräns för massåtgärder som bearbetas samtidigt. Om det inte anges bestäms det baserat på vm-storleken för Spark-utföraren. Om datavolymen är stor för det etablerade dataflödet i målcontainern kan den här inställningen justeras genom att följa uppskattningen av 1000 x Cores
spark.cosmos.write.bulk.enabled true Cosmos DB massskrivning av objekt aktiverat

Frågekonfiguration

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.read.customQuery Ingen När den anpassade frågan anges bearbetas den mot Cosmos-slutpunkten i stället för att dynamiskt generera frågan via predikat-push nedåt. Vanligtvis rekommenderar vi att du förlitar dig på Spark-predikat-push-teknik eftersom det gör att du kan generera den mest effektiva uppsättningen filter baserat på frågeplanen. Men det finns ett par predikat som aggregeringar (antal, gruppera efter, medelvärde, summa osv.) som inte kan pushas ned ännu (åtminstone i Spark 3.1) – så den anpassade frågan är en återställning så att de kan pushas till frågan som skickas till Cosmos. Om den här inställningen anges med schema inferens aktiverad används även den anpassade frågan för att dra slutsatsen av schemat.
spark.cosmos.read.maxItemCount 1000 Åsidosätter det maximala antalet dokument som kan returneras för en enskild fråge- eller ändringsflödesbegäran. Standardvärdet är – överväg att bara öka detta för genomsnittliga dokumentstorlekar som är mindre än 1 kB eller när projektion minskar antalet egenskaper som valts i frågor avsevärt (till exempel när du bara väljer 1000 "ID" för dokument osv.).

Konfiguration av schemareferens

När du utför läsåtgärder kan användarna ange ett anpassat schema eller tillåta att anslutningsappen härar det. Schemainferens är aktiverat som standard.

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.read.inferSchema.enabled true När schemaferens är inaktiverat och användaren inte tillhandahåller något schema returneras rå-json.
spark.cosmos.read.inferSchema.query SELECT * FROM r När schemaferens har aktiverats används som anpassad fråga för att härledning av den. Om du till exempel lagrar flera entiteter med olika scheman i en container och du vill se till att inferens endast tittar på vissa dokumenttyper eller om du bara vill projicera vissa kolumner.
spark.cosmos.read.inferSchema.samplingSize 1000 Samplingsstorlek som ska användas när du härar schema och inte använder en fråga.
spark.cosmos.read.inferSchema.includeSystemProperties false När schemaferens har aktiverats gäller om det resulterande schemat innehåller alla Cosmos DB av systemegenskaperna.
spark.cosmos.read.inferSchema.includeTimestamp false När schema inferens är aktiverat, huruvida det resulterande schemat kommer att innehålla dokumentet Tidsstämpel ( _ts ). Krävs inte om spark.cosmos.read.inferSchema.includeSystemProperties är aktiverat, eftersom det redan innehåller alla systemegenskaper.
spark.cosmos.read.inferSchema.forceNullableProperties true När schema inferens är aktiverat, om det resulterande schemat gör alla kolumner nullbara. Som standard behandlas alla kolumner (utom Cosmos-systemegenskaper) som nullbara även om alla rader i exempeluppsättningen har värden som inte är null. När de här kolumnerna är inaktiverade behandlas de som nullbara eller inte beroende på om någon post i exempeluppsättningen har null-värden i en kolumn.

Serialiseringskonfiguration

Används för att påverka json-serialisering/deserialiseringsbeteende

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.serialization.inclusionMode Always Anger om null-/standardvärden serialiseras till json eller om egenskaper med null-/standardvärde hoppas över. Beteendet följer samma idéer som Michaels JsonInclude.Include. Always innebär att json-egenskaper skapas även för null- och standardvärden. NonNull innebär att inga json-egenskaper skapas för explicita null-värden. NonEmpty innebär att json-egenskaper inte skapas för tomma strängvärden eller tomma matriser/mpas. NonDefault innebär att json-egenskaper hoppas över inte bara för null/tom, utan även när värdet är identiskt med standardvärdet för 0 numeriska egenskaper till exempel.

Ändringsflöde (endast för Spark-Streaming med cosmos.oltp.changeFeed hjälp av datakälla, som är skrivskyddade) konfiguration

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.changeFeed.startFrom Beginning ChangeFeed Start från inställningar ( Now , eller en viss tidpunkt (UTC) till exempel Beginning ) – 2020-02-10T14:15:03 standardvärdet är Beginning . Om skrivkonfigurationen innehåller en och eventuella kontrollpunkter finns fortsätter dataströmmen alltid oberoende av inställningarna – du måste ändra eller ta bort kontrollpunkter för att starta om dataströmmen om det är checkpointLocation spark.cosmos.changeFeed.startFrom checkpointLocation avsikten.
spark.cosmos.changeFeed.mode Incremental ChangeFeed-läge ( Incremental FullFidelity eller ) – Obs! är i FullFidelity experimentellt tillstånd just nu. Det kräver att prenumerationen/kontot har aktiverats för den privata förhandsversionen och att det finns kända större ändringar FullFidelity (schemat för de returnerade dokumenten). Vi rekommenderar att du endast använder FullFidelity för icke-produktionsscenarier i det här läget.
spark.cosmos.changeFeed.itemCountPerTriggerHint Ingen Ungefärligt maximalt antal objekt som läses från ändringsflödet för varje mikrobatch/utlösare

Konfiguration av Json-konvertering

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.read.schemaConversionMode Relaxed Schemakonverteringsbeteendet ( Relaxed , Strict ). Om ett dokument innehåller ett attribut som inte mappas till schematypen när json-dokument läses kan användaren bestämma om ett värde ska användas (avslappnad) eller ett null undantag (strikt).

Konfiguration av partitioneringsstrategi

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.read.partitioning.strategy Default Partitioneringsstrategin som används (standard, anpassad, begränsande eller aggressiv)
spark.cosmos.partitioning.targetedCount Ingen Antal partitioner som är mål. Den här parametern är valfri och ignoreras om inte strategy==Custom används. I det här fallet beräknar Inte Spark-anslutningsappen dynamiskt antalet partitioner utan håller sig till det här värdet.

Konfiguration av dataflödeskontroll

Konfigurationsegenskapsnamn Standardvärde Beskrivning
spark.cosmos.throughputControl.enabled false Om dataflödeskontroll är aktiverat
spark.cosmos.throughputControl.name Ingen Namn på kontrollgrupp för dataflöde
spark.cosmos.throughputControl.targetThroughput Ingen Dataflödeskontrollgruppens målflöde
spark.cosmos.throughputControl.targetThroughputThreshold Ingen Målflödeströskel för dataflödeskontrollgrupp
spark.cosmos.throughputControl.globalControl.database Ingen Databas, som ska användas för global kontroll av dataflöde
spark.cosmos.throughputControl.globalControl.container Ingen Container, som ska användas för global kontroll av dataflöde
spark.cosmos.throughputControl.globalControl.renewIntervalInMS 5s Hur ofta klienten ska uppdatera dataflödesanvändningen för sig själv
spark.cosmos.throughputControl.globalControl.expireIntervalInMS 11s Hur snabbt en offlineklient identifieras

Nästa steg