Snabbstart: Hantera data med Azure Cosmos DB Spark 3 OLTP-anslutningsapp för SQL API
GÄLLER FÖR:
SQL API
Den här självstudien är en snabbstartsguide som visar hur du använder Cosmos DB Spark-anslutningsapp för att läsa från eller skriva till Cosmos DB. Cosmos DB Spark-anslutningsappen baseras på Spark 3.1.x.
I den här snabbskursen förlitar vi oss på Azure Databricks Runtime 8.0 med Spark 3.1.1 och en Jupyter Notebook som visar hur du använder Cosmos DB Spark Connector.
Du kan även använda andra Spark 3.1.1 Spark-erbjudanden. Du bör också kunna använda alla språk som stöds av Spark (PySpark, Scala, Java osv.) eller alla Spark-gränssnitt som du är bekant med (Jupyter Notebook, Livy osv.).
Förutsättningar
Ett aktivt Azure-konto. Om du inte har ett kan du registrera dig för en kostnadsfri utvärderingsversion. Du kan också använda -Azure Cosmos DB Emulator för utveckling och testning.
Azure Databricks runtime 8.0 med Spark 3.1.1.
(Valfritt) SLF4J-bindning används för att associera ett specifikt loggningsramverk med SLF4J.
SLF4J behövs bara om du planerar att använda loggning. Du kan även ladda ned en SLF4J-bindning som länkar SLF4J-API:et med valfri loggningsimplementering. Mer information finns i användarhandboken för SLF4J.
Installera Cosmos DB Spark Connector i spark-klustret azure-cosmos-spark_3-1_2-12-4.3.1.jar
Kom igång-guiden baseras på PySpark, men du kan även använda motsvarande scala-version och du kan köra följande kodfragment i en Azure Databricks PySpark-anteckningsbok.
Skapa databaser och containrar
Ange först Cosmos DB-autentiseringsuppgifter och Cosmos DB databasnamn och containernamn.
cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"
cfg = {
"spark.cosmos.accountEndpoint" : cosmosEndpoint,
"spark.cosmos.accountKey" : cosmosMasterKey,
"spark.cosmos.database" : cosmosDatabaseName,
"spark.cosmos.container" : cosmosContainerName,
}
Sedan kan du använda det nya katalog-API:et för att skapa Cosmos DB databas och container via Spark.
# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))
När du skapar containrar med katalog-API:et kan du ange dataflödes- och partitionsnyckelsökvägen för containern som ska skapas.
Mer information finns i den fullständiga katalogens API-dokumentation.
Mata in data
Namnet på datakällan är , och i följande exempel visas hur du kan skriva en minnesdataram som består av två cosmos.oltp objekt som ska Cosmos DB:
spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
.toDF("id","name","age","isAlive") \
.write\
.format("cosmos.oltp")\
.options(**cfg)\
.mode("APPEND")\
.save()
Observera att id är ett obligatoriskt fält för Cosmos DB.
Mer information om inmatning av data finns i den fullständiga dokumentationen för skrivkonfiguration.
Söka i data
Med hjälp av cosmos.oltp samma datakälla kan vi köra frågor mot data och använda för filter att skicka ned filter:
from pyspark.sql.functions import col
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.filter(col("isAlive") == True)\
.show()
Mer information om hur du frågar efter data finns i den fullständiga dokumentationen för frågekonfiguration.
Schemaferens
När du frågar efter data kan Spark-anslutningsappen dra slutsatser om schemat baserat på sampling av befintliga objekt genom att ange spark.cosmos.read.inferSchema.enabled till true .
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.printSchema()
Du kan också skicka det anpassade schema som du vill ska användas för att läsa data:
customSchema = StructType([
StructField("id", StringType()),
StructField("name", StringType()),
StructField("type", StringType()),
StructField("age", IntegerType()),
StructField("isAlive", BooleanType())
])
df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
.load()
df.printSchema()
Om inget anpassat schema har angetts och schemaferens har inaktiverats returnerar resulterande data Json-rådata för objekten:
df = spark.read.format("cosmos.oltp").options(**cfg)\
.load()
df.printSchema()
Mer information om schemaferens finns i den fullständiga dokumentationen för schemareferenskonfiguration.
Konfigurationsreferens
Allmän konfiguration
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.accountEndpoint |
Ingen | Cosmos DB slutpunkts-URI för konto |
spark.cosmos.accountKey |
Ingen | Cosmos DB-kontonyckel |
spark.cosmos.database |
Ingen | Cosmos DB databasnamn |
spark.cosmos.container |
Ingen | Cosmos DB containernamn |
Extra justering
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.useGatewayMode |
false |
Använda gatewayläge för klientåtgärderna |
spark.cosmos.read.forceEventualConsistency |
true |
Gör att klienten använder slutlig konsekvens för läsåtgärder i stället för att använda standardkonsekvens på kontonivå |
spark.cosmos.applicationName |
Ingen | Programnamn |
spark.cosmos.preferredRegionsList |
Ingen | Lista över önskade regioner som ska användas för ett konto Cosmos DB flera regioner. Det här är ett kommaavgränsat värde (till exempel [East US, West US] eller ) som prioriterade regioner används som East US, West US tips. Du bör använda ett samplacerat Spark-kluster med ditt Cosmos DB konto och skicka Spark-klusterregionen som önskad region. Se listan över Azure-regioner här. Du kan också använda spark.cosmos.preferredRegions som alias |
spark.cosmos.diagnostics |
Ingen | Kan användas för att aktivera mer utförlig diagnostik. För närvarande är det enda alternativet som stöds att ange den här egenskapen som – vilket resulterar i att extra loggar genereras som loggar i drivrutins- och simple INFO körningsloggarna. |
Skrivkonfiguration
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.write.strategy |
ItemOverwrite |
Cosmos DB objekt skrivstrategi: (med upsert), (med hjälp av skapa, ignorera befintliga objekt ItemOverwrite som är, Konflikter), (ta bort alla dokument) (ta bort alla dokument som etag inte har ItemAppend ItemDelete ItemDeleteIfNotModified ändrats för) |
spark.cosmos.write.maxRetryCount |
10 |
Cosmos DB Skriv maximalt antal återförsök vid återförsöksbara fel (till exempel anslutningsfel) |
spark.cosmos.write.point.maxConcurrency |
Ingen | Cosmos DB högsta samtidighet för objektskrivning. Om det inte anges bestäms det baserat på vm-storleken för Spark-utföraren |
spark.cosmos.write.bulk.maxPendingOperations |
Ingen | Cosmos DB objekt , maximalt antal väntande åtgärder i massläge. Definierar en gräns för massåtgärder som bearbetas samtidigt. Om det inte anges bestäms det baserat på vm-storleken för Spark-utföraren. Om datavolymen är stor för det etablerade dataflödet i målcontainern kan den här inställningen justeras genom att följa uppskattningen av 1000 x Cores |
spark.cosmos.write.bulk.enabled |
true |
Cosmos DB massskrivning av objekt aktiverat |
Frågekonfiguration
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.read.customQuery |
Ingen | När den anpassade frågan anges bearbetas den mot Cosmos-slutpunkten i stället för att dynamiskt generera frågan via predikat-push nedåt. Vanligtvis rekommenderar vi att du förlitar dig på Spark-predikat-push-teknik eftersom det gör att du kan generera den mest effektiva uppsättningen filter baserat på frågeplanen. Men det finns ett par predikat som aggregeringar (antal, gruppera efter, medelvärde, summa osv.) som inte kan pushas ned ännu (åtminstone i Spark 3.1) – så den anpassade frågan är en återställning så att de kan pushas till frågan som skickas till Cosmos. Om den här inställningen anges med schema inferens aktiverad används även den anpassade frågan för att dra slutsatsen av schemat. |
spark.cosmos.read.maxItemCount |
1000 |
Åsidosätter det maximala antalet dokument som kan returneras för en enskild fråge- eller ändringsflödesbegäran. Standardvärdet är – överväg att bara öka detta för genomsnittliga dokumentstorlekar som är mindre än 1 kB eller när projektion minskar antalet egenskaper som valts i frågor avsevärt (till exempel när du bara väljer 1000 "ID" för dokument osv.). |
Konfiguration av schemareferens
När du utför läsåtgärder kan användarna ange ett anpassat schema eller tillåta att anslutningsappen härar det. Schemainferens är aktiverat som standard.
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.read.inferSchema.enabled |
true |
När schemaferens är inaktiverat och användaren inte tillhandahåller något schema returneras rå-json. |
spark.cosmos.read.inferSchema.query |
SELECT * FROM r |
När schemaferens har aktiverats används som anpassad fråga för att härledning av den. Om du till exempel lagrar flera entiteter med olika scheman i en container och du vill se till att inferens endast tittar på vissa dokumenttyper eller om du bara vill projicera vissa kolumner. |
spark.cosmos.read.inferSchema.samplingSize |
1000 |
Samplingsstorlek som ska användas när du härar schema och inte använder en fråga. |
spark.cosmos.read.inferSchema.includeSystemProperties |
false |
När schemaferens har aktiverats gäller om det resulterande schemat innehåller alla Cosmos DB av systemegenskaperna. |
spark.cosmos.read.inferSchema.includeTimestamp |
false |
När schema inferens är aktiverat, huruvida det resulterande schemat kommer att innehålla dokumentet Tidsstämpel ( _ts ). Krävs inte om spark.cosmos.read.inferSchema.includeSystemProperties är aktiverat, eftersom det redan innehåller alla systemegenskaper. |
spark.cosmos.read.inferSchema.forceNullableProperties |
true |
När schema inferens är aktiverat, om det resulterande schemat gör alla kolumner nullbara. Som standard behandlas alla kolumner (utom Cosmos-systemegenskaper) som nullbara även om alla rader i exempeluppsättningen har värden som inte är null. När de här kolumnerna är inaktiverade behandlas de som nullbara eller inte beroende på om någon post i exempeluppsättningen har null-värden i en kolumn. |
Serialiseringskonfiguration
Används för att påverka json-serialisering/deserialiseringsbeteende
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.serialization.inclusionMode |
Always |
Anger om null-/standardvärden serialiseras till json eller om egenskaper med null-/standardvärde hoppas över. Beteendet följer samma idéer som Michaels JsonInclude.Include. Always innebär att json-egenskaper skapas även för null- och standardvärden. NonNull innebär att inga json-egenskaper skapas för explicita null-värden. NonEmpty innebär att json-egenskaper inte skapas för tomma strängvärden eller tomma matriser/mpas. NonDefault innebär att json-egenskaper hoppas över inte bara för null/tom, utan även när värdet är identiskt med standardvärdet för 0 numeriska egenskaper till exempel. |
Ändringsflöde (endast för Spark-Streaming med cosmos.oltp.changeFeed hjälp av datakälla, som är skrivskyddade) konfiguration
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
| spark.cosmos.changeFeed.startFrom | Beginning |
ChangeFeed Start från inställningar ( Now , eller en viss tidpunkt (UTC) till exempel Beginning ) – 2020-02-10T14:15:03 standardvärdet är Beginning . Om skrivkonfigurationen innehåller en och eventuella kontrollpunkter finns fortsätter dataströmmen alltid oberoende av inställningarna – du måste ändra eller ta bort kontrollpunkter för att starta om dataströmmen om det är checkpointLocation spark.cosmos.changeFeed.startFrom checkpointLocation avsikten. |
| spark.cosmos.changeFeed.mode | Incremental |
ChangeFeed-läge ( Incremental FullFidelity eller ) – Obs! är i FullFidelity experimentellt tillstånd just nu. Det kräver att prenumerationen/kontot har aktiverats för den privata förhandsversionen och att det finns kända större ändringar FullFidelity (schemat för de returnerade dokumenten). Vi rekommenderar att du endast använder FullFidelity för icke-produktionsscenarier i det här läget. |
| spark.cosmos.changeFeed.itemCountPerTriggerHint | Ingen | Ungefärligt maximalt antal objekt som läses från ändringsflödet för varje mikrobatch/utlösare |
Konfiguration av Json-konvertering
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.read.schemaConversionMode |
Relaxed |
Schemakonverteringsbeteendet ( Relaxed , Strict ). Om ett dokument innehåller ett attribut som inte mappas till schematypen när json-dokument läses kan användaren bestämma om ett värde ska användas (avslappnad) eller ett null undantag (strikt). |
Konfiguration av partitioneringsstrategi
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.read.partitioning.strategy |
Default |
Partitioneringsstrategin som används (standard, anpassad, begränsande eller aggressiv) |
spark.cosmos.partitioning.targetedCount |
Ingen | Antal partitioner som är mål. Den här parametern är valfri och ignoreras om inte strategy==Custom används. I det här fallet beräknar Inte Spark-anslutningsappen dynamiskt antalet partitioner utan håller sig till det här värdet. |
Konfiguration av dataflödeskontroll
| Konfigurationsegenskapsnamn | Standardvärde | Beskrivning |
|---|---|---|
spark.cosmos.throughputControl.enabled |
false |
Om dataflödeskontroll är aktiverat |
spark.cosmos.throughputControl.name |
Ingen | Namn på kontrollgrupp för dataflöde |
spark.cosmos.throughputControl.targetThroughput |
Ingen | Dataflödeskontrollgruppens målflöde |
spark.cosmos.throughputControl.targetThroughputThreshold |
Ingen | Målflödeströskel för dataflödeskontrollgrupp |
spark.cosmos.throughputControl.globalControl.database |
Ingen | Databas, som ska användas för global kontroll av dataflöde |
spark.cosmos.throughputControl.globalControl.container |
Ingen | Container, som ska användas för global kontroll av dataflöde |
spark.cosmos.throughputControl.globalControl.renewIntervalInMS |
5s |
Hur ofta klienten ska uppdatera dataflödesanvändningen för sig själv |
spark.cosmos.throughputControl.globalControl.expireIntervalInMS |
11s |
Hur snabbt en offlineklient identifieras |
Nästa steg
- Azure Cosmos DB Apache Spark 3 OLTP Connector for Core (SQL) API: Viktig information och resurser
- Läs mer om Apache Spark.