Rychlý start: Správa dat pomocí konektoru OLTP Azure Cosmos DB Spark 3 pro SQL API

PLATÍ PRO: SQL API

Tento kurz je stručným úvodním průvodcem, který vám ukáže, jak používat Cosmos DB Spark Connector ke čtení nebo zápisu do Cosmos DB. Cosmos Spark Connector db je založený na Sparku 3.1.x.

V tomto rychlém kurzu se spoléháme na Azure Databricks Runtime 8.0 s Sparkem 3.1.1 a poznámkovým blokem Jupyter, kde ukážeme, jak používat Cosmos DB Spark Connector.

Můžete také použít jakoukoli jinou sparkovou nabídku Sparku 3.1.1, ale taky byste měli používat jakýkoliv jazyk podporovaný Sparkem (PySpark, Scala, Java atd.) nebo jakékoli rozhraní Spark, které znáte (Poznámkový blok Jupyteru, Livy atd.).

Předpoklady

SLF4J je potřeba jenom v případě, že máte v plánu používat protokolování, ale taky si stáhnete vazbu SLF4J, která propoji rozhraní SLF4J API s implementací protokolování podle vašeho výběru. Další informace najdete v uživatelské příručce SLF4J.

Nainstalujte Cosmos DB Spark Connector do spark Cluster azure-cosmos-spark_3-1_2-12-4.3.1.jar

Příručka začínáme je založená na PySparku, ale můžete použít i ekvivalentní verzi scaly a v poznámkovém bloku Azure Databricks PySpark můžete spustit následující fragment kódu.

Vytváření databází a kontejnerů

Nejdřív nastavte Cosmos účtu DB a název Cosmos databáze databáze a kontejneru.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

Potom můžete pomocí nového rozhraní Api katalogu vytvořit databázi Cosmos databáze a kontejneru prostřednictvím Sparku.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

Při vytváření kontejnerů pomocí rozhraní Catalog API můžete nastavit cestu propustnosti a klíče oddílu pro kontejner, který se má vytvořit.

Další informace najdete v úplné dokumentaci k rozhraní CATALOG API.

Ingest data

Název zdroje dat je a následující příklad ukazuje, jak můžete napsat datový rámec paměti, který se skládá ze dvou položek do cosmos.oltp Cosmos DB:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Všimněte id si, že je povinné pole pro Cosmos DB.

Další informace týkající se ingestingu dat najdete v úplné dokumentaci k konfiguraci zápisu.

Data dotazu

Pomocí stejného cosmos.oltp zdroje dat můžeme zadat dotaz na data a použít filter k nabízení filtrů:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

Další informace týkající se dotazů na data najdete v úplné dokumentaci ke konfiguraci dotazu.

Odvození schématu

Při dotazování na data může Spark Connector odvodit schéma na základě vzorkování existujících položek nastavením spark.cosmos.read.inferSchema.enabled na true .

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()

Můžete také předat vlastní schéma, které chcete použít ke čtení dat:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(customSchema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Pokud není zadané žádné vlastní schéma a odvození schématu je zakázané, výsledná data vrátí nezpracovaný obsah json položek:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Další informace týkající se odvození schématu najdete v úplné dokumentaci k konfiguraci odvození schématu.

Odkaz na konfiguraci

Obecná konfigurace

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.accountEndpoint Žádné Cosmos identifikátor URI koncového bodu účtu databáze
spark.cosmos.accountKey Žádné Cosmos DB Account Key
spark.cosmos.database Žádné Cosmos db název databáze
spark.cosmos.container Žádné Cosmos db název kontejneru

Extra tuning

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.useGatewayMode false Použití režimu brány pro klientské operace
spark.cosmos.read.forceEventualConsistency true Umožňuje klientovi používat možnost Eventual consistency pro operace čtení místo použití výchozí konzistence na úrovni účtu.
spark.cosmos.applicationName Žádné Název aplikace
spark.cosmos.preferredRegionsList Žádné Seznam upřednostňovaných oblastí, který se má použít pro více oblastí Cosmos DB. Jedná se o hodnotu oddělenou čárkami (například nebo ) za předpokladu, že preferované oblasti [East US, West US]East US, West US budou použity jako nápověda. Měli byste použít kolokovaný spark cluster s účtem Cosmos DB a předat oblast spark clusterů jako upřednostňovanou oblast. Seznam oblastí Azure najdete tady. Můžete také použít spark.cosmos.preferredRegions jako alias
spark.cosmos.diagnostics Žádné Umožňuje povolit podrobnější diagnostiku. V současné době je jedinou podporovanou možností nastavení této vlastnosti na – což bude mít za následek, že se jako protokoly v protokolech ovladače a prováděcího programu vydávají další simpleINFO protokoly.

Konfigurace zápisu

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.write.strategy ItemOverwrite Cosmos DB Items write Strategy: ItemOverwrite (using upsert), ItemAppend (using create, ignore pre-existing items that are, Conflicts), (delete all documents), (delete all documents), (delete all documents for the ItemDeleteItemDeleteIfNotModified etag't changed)
spark.cosmos.write.maxRetryCount 10 Cosmos DB Write Max Retry Attempts on retryable failures (například connection error)
spark.cosmos.write.point.maxConcurrency Žádné Cosmos souběžnost zápisu maximálního zápisu položky db Pokud není zadáno, určí se na základě velikosti virtuálního počítače prováděcího správce Sparku.
spark.cosmos.write.bulk.maxPendingOperations Žádné Cosmos počet nevyřízených operací hromadného režimu zápisu položky databáze Definuje omezení souběžných zpracování hromadných operací. Pokud není zadaný, určí se na základě velikosti virtuálního počítače prováděcího správce Sparku. Pokud je objem dat velký pro zřízenou propustnost v cílovém kontejneru, můžete toto nastavení upravit podle odhadu 1000 x Cores
spark.cosmos.write.bulk.enabled true Cosmos hromadného zápisu položky db

Konfigurace dotazu

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.read.customQuery Žádné Pokud je za předpokladu, že se vlastní dotaz zpracuje Cosmos koncový bod, místo dynamického generování dotazu pomocí predikátového posunu dolů. Obvykle se doporučuje spoléhat se na Sparkův predikát, protože to umožní vygenerovat nejefektivnější sadu filtrů na základě plánu dotazů. Existuje ale několik predikátů, jako jsou agregace (počet, seskupit podle, průměr, součet atd.) , které zatím není možné posunout dolů (aspoň ve Sparku 3.1) – takže vlastní dotaz je záložní řešení, které jim umožní posunout je do dotazu odeslaného Cosmos. Pokud je toto nastavení zadané, s povoleným odvozením schématu, použije se k odvodení schématu také vlastní dotaz.
spark.cosmos.read.maxItemCount 1000 Přepíše maximální počet dokumentů, které lze vrátit pro jeden dotaz nebo změnit žádost o informační kanál. Výchozí hodnota je – zvažte zvýšení pouze u průměrných velikostí dokumentů menších než 1 kB nebo když projekce výrazně sníží počet vlastností vybraných v dotazech (třeba když vyberete jenom 1000 ID dokumentů atd.).

Konfigurace odvození schématu

Při operacích čtení mohou uživatelé zadat vlastní schéma nebo povolit spojnici, aby ji odvodit. Odvození schématu je ve výchozím nastavení povolené.

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.read.inferSchema.enabled true Pokud je odvození schématu zakázané a uživatel nenasdílí schéma, vrátí se nezpracovaný json.
spark.cosmos.read.inferSchema.query SELECT * FROM r Pokud je odvození schématu povolené, použije se jako vlastní dotaz k odvození. Pokud například v kontejneru ukládáte více entit s různými schématy a chcete zajistit, aby odvození vypadalo jenom na určité typy dokumentů nebo chcete promítovat jenom určité sloupce.
spark.cosmos.read.inferSchema.samplingSize 1000 Velikost vzorkování, která se má použít při odvodu schématu a při použití dotazu
spark.cosmos.read.inferSchema.includeSystemProperties false Pokud je povoleno odvození schématu, zda výsledné schéma bude zahrnovat všechny Cosmos db.
spark.cosmos.read.inferSchema.includeTimestamp false Pokud je povolené odvození schématu, zda výsledné schéma bude zahrnovat časové razítko dokumentu ( _ts ). Není povinné, spark.cosmos.read.inferSchema.includeSystemProperties pokud je povoleno, protože už bude obsahovat všechny vlastnosti systému.
spark.cosmos.read.inferSchema.forceNullableProperties true Pokud je povoleno odvození schématu, zda výsledné schéma umožní mít všechny sloupce s možnou hodnotou Null. Ve výchozím nastavení budou všechny sloupce (kromě vlastností systému Kosmos) považovány za s možnou hodnotou Null, i když všechny řádky v ukázkové sadě mají jiné hodnoty než null. Pokud je tato možnost zakázaná, budou odvozené sloupce považovány za sloupce s možnou hodnotou Null nebo ne v závislosti na tom, jestli některý záznam v ukázkové sadě obsahuje hodnoty null ve sloupci.

Konfigurace pro serializaci

Používá se k ovlivnění chování json serialization/deserializace.

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.serialization.inclusionMode Always Určuje, jestli budou hodnoty null/default serializovány do formátu json nebo zda se přeskočí vlastnosti s hodnotou null/default. Chování se řídí stejnými nápady jako Jacksonův JsonInclude.Include. Always znamená, že se vlastnosti json vytvářejí i pro hodnoty null a výchozí hodnoty. NonNull znamená, že se pro explicitní hodnoty Null nebudou vytvářet žádné vlastnosti json. NonEmpty znamená, že vlastnosti json nebudou vytvořeny pro prázdné řetězcové hodnoty nebo prázdná pole/mpas. NonDefault znamená, že vlastnosti json se přeskočí nejen pro hodnotu null/empty, ale také v případě, že je hodnota shodná s výchozí hodnotou například pro 0 číselné vlastnosti.

Změna informačního kanálu (jenom Spark-Streaming pomocí cosmos.oltp.changeFeed zdroje dat, což je konfigurace jen pro čtení)

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.changeFeed.startFrom Beginning ChangeFeed Start from settings ( Now , or a certain point in time Beginning (UTC) for example 2020-02-10T14:15:03 ) - the default value is Beginning . Pokud konfigurační soubor pro zápis obsahuje a všechny kontrolní body existují, bude datový proud vždy pokračovat nezávisle na nastaveních – pokud je to záměr, musíte změnit nebo odstranit kontrolní body, aby se stream checkpointLocationspark.cosmos.changeFeed.startFromcheckpointLocation restartoval.
spark.cosmos.changeFeed.mode Incremental Režim ChangeFeed ( Incremental nebo FullFidelity ) – POZNÁMKA: je v FullFidelity experimentálním stavu. Vyžaduje, aby bylo předplatné nebo účet povolené pro privátní náhled a existují známé přerušované změny, ke které dojde FullFidelity (schéma vrácených dokumentů). V tomto okamžiku se doporučuje používat jenom pro scénáře mimo FullFidelity produkční prostředí.
spark.cosmos.changeFeed.itemCountPerTriggerHint Žádné Přibližný maximální počet položek přečtených z informačního kanálu pro každou mikrosoučástku/aktivační událost

Konfigurace převodu Json

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.read.schemaConversionMode Relaxed Chování převodu schématu ( Relaxed , Strict ). Pokud dokument při čtení dokumentů json obsahuje atribut, který se nenamapuje na typ schématu, může se uživatel rozhodnout, jestli má použít hodnotu null (Relaxed) nebo výjimku (Strict).

Konfigurace strategie dělení oddílů

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.read.partitioning.strategy Default Použitá strategie dělení oddílů (výchozí, vlastní, omezující nebo agresivní)
spark.cosmos.partitioning.targetedCount Žádné Počet cílových oddílů Tento parametr je nepovinný a ignorovaný, pokud se nepoužít strategie==Vlastní. V takovém případě Minigraf nebude dynamicky počítat počet oddílů, ale bude s touto hodnotou držet.

Konfigurace řízení propustnosti

Název vlastnosti konfigurace Výchozí Popis
spark.cosmos.throughputControl.enabled false Jestli je řízení propustnosti povolené
spark.cosmos.throughputControl.name Žádné Název skupiny řízení propustnosti
spark.cosmos.throughputControl.targetThroughput Žádné Propustnost cílové skupiny řízení propustnosti
spark.cosmos.throughputControl.targetThroughputThreshold Žádné Mezní hodnota propustnosti cílové skupiny řízení propustnosti
spark.cosmos.throughputControl.globalControl.database Žádné Databáze, která se použije pro globální řízení propustnosti
spark.cosmos.throughputControl.globalControl.container Žádné Kontejner, který se použije pro globální řízení propustnosti
spark.cosmos.throughputControl.globalControl.renewIntervalInMS 5s Jak často bude klient aktualizovat využití propustnosti samotného klienta
spark.cosmos.throughputControl.globalControl.expireIntervalInMS 11s Jak rychle bude detekován offline klient

Další kroky

  • Azure Cosmos DB Apache Spark 3 OLTP Connector for Core (SQL) API: Poznámky k verzi a materiály
  • Přečtěte si další informace o Apache Sparku.