Rychlý start: Správa dat pomocí konektoru OLTP pro Azure Cosmos DB Spark 3 pro SQL API

PLATÍ PRO: SQL API

Tento kurz je úvodní příručkou, která ukazuje, jak pomocí konektoru Spark pro Cosmos DB číst nebo zapisovat do Cosmos DB. Cosmos DB Spark Connector je založený na Sparku 3.1.x.

V tomto rychlém kurzu se spoléháme na Azure Databricks Runtime 8.0 se Sparkem 3.1.1 a Jupyter Notebook a ukážeme si, jak používat konektor Spark pro Cosmos DB.

Můžete také použít jakoukoli jinou nabídku Sparku 3.1.1 Sparku. Také byste měli být schopni používat libovolný jazyk podporovaný Sparkem (PySpark, Scala, Java atd.) nebo jakékoli rozhraní Sparku, které už znáte (Jupyter Notebook, Livy atd.).

Požadavky

  • Aktivní účet Azure. Pokud žádný nemáte, můžete si zaregistrovat bezplatný účet. Alternativně můžete použít Azure Cosmos DB Emulator pro vývoj a testování.

  • Azure Databricks runtime 8.0 se Sparkem 3.1.1.

  • (Volitelné) Vazba SLF4J slouží k přidružení konkrétního protokolovací architektury k SLF4J.

SLF4J je potřeba jenom v případě, že máte v plánu používat protokolování, stáhněte si také vazbu SLF4J, která propoji s rozhraním API SLF4J s implementací protokolování podle vašeho výběru. Další informace najdete v uživatelské příručce SLF4J.

Nainstalujte Cosmos DB Spark Connector do clusteru spark azure-cosmos-spark_3-1_2-12-4.3.1.jar.

Příručka Začínáme je založená na PySparku, ale můžete použít také ekvivalentní verzi Scala a následující fragment kódu můžete spustit v poznámkovém bloku Azure Databricks PySpark.

Vytváření databází a kontejnerů

Nejprve nastavte Cosmos účtu databáze a název a název kontejneru Cosmos DB.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

V dalším kroku můžete pomocí nového rozhraní API katalogu vytvořit databázi Cosmos DB a kontejner prostřednictvím Sparku.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

Při vytváření kontejnerů pomocí rozhraní API katalogu můžete nastavit propustnost a cestu ke klíči oddílu pro kontejner, který se má vytvořit.

Další informace najdete v úplné dokumentaci k rozhraní API katalogu.

Ingestace dat

Název zdroje dat je a následující příklad ukazuje, jak můžete napsat datový rámec paměti sestávající ze dvou položek do cosmos.oltp Cosmos DB:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Všimněte id si, že je povinné pole pro Cosmos DB.

Další informace týkající se ingestování dat najdete v úplné dokumentaci ke konfiguraci zápisu.

Dotazování dat

Pomocí stejného cosmos.oltp zdroje dat se můžeme dotazovat na data a používat k filter nabízení filtrů:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

Další informace týkající se dotazování dat najdete v úplné dokumentaci ke konfiguraci dotazů.

Odvození schématu

Při dotazování dat může konektor Spark odvodit schéma na základě vzorkování existujících položek nastavením spark.cosmos.read.inferSchema.enabled na true .

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()

Alternativně můžete předat vlastní schéma, které chcete použít ke čtení dat:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Pokud není zadané žádné vlastní schéma a odvození schématu je zakázané, výsledná data budou vracet nezpracovaný obsah JSON položek:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Další informace týkající se odvozování schématu najdete v úplné dokumentaci ke konfiguraci odvozování schémat.

Referenční informace ke konfiguraci

Obecná konfigurace

Název konfigurační vlastnosti Výchozí Description
spark.cosmos.accountEndpoint Žádná Cosmos URI koncového bodu účtu databáze
spark.cosmos.accountKey Žádné Cosmos klíč účtu databáze
spark.cosmos.database Žádné Cosmos NÁZEV DATABÁZE
spark.cosmos.container Žádné Cosmos DB – název kontejneru

Dodatečné ladění

Název konfigurační vlastnosti Výchozí Description
spark.cosmos.useGatewayMode false Použití režimu brány pro operace klienta
spark.cosmos.read.forceEventualConsistency true Umožňuje klientovi používat konzistenci Typu Případné pro operace čtení místo použití výchozí konzistence na úrovni účtu.
spark.cosmos.applicationName Žádné Název aplikace
spark.cosmos.preferredRegionsList Žádné Seznam preferovaných oblastí, který se má použít pro účet Cosmos DB. Jedná se o hodnotu oddělenou čárkami (například nebo ), pokud se jako nápověda použije [East US, West US] East US, West US upřednostňované oblasti. Měli byste použít kolokovaný cluster Spark s vaším účtem Cosmos DB a předat oblast clusteru Spark jako upřednostňovanou oblast. Seznam oblastí Azure najdete tady. Můžete také použít jako spark.cosmos.preferredRegions alias.
spark.cosmos.diagnostics Žádné Lze použít k povolení podrobnější diagnostiky. V současné době je jedinou podporovanou možností nastavit tuto vlastnost na , což bude mít za následek vysílání dalších protokolů jako protokolů v protokolech simple INFO Driver and Executor.

Zápis konfigurace

Název konfigurační vlastnosti Výchozí Description
spark.cosmos.write.strategy ItemOverwrite Cosmos strategie zápisu položek databáze: ItemOverwrite (pomocí upsert), (pomocí příkazu ItemAppend create, ignore pre-existing items that are, Conflicts), (delete all documents) (odstraňte všechny dokumenty, u kterých se nezměnila ItemDelete ItemDeleteIfNotModified ztag)
spark.cosmos.write.maxRetryCount 10 Cosmos opakování pokusů o zápis do databáze při neúspěšných pokusech o opakování (například chyba připojení)
spark.cosmos.write.point.maxConcurrency Žádné Cosmos souběžnosti zápisu položky databáze Pokud není zadaný, určí se na základě velikosti virtuálního počítače exekutoru Sparku.
spark.cosmos.write.bulk.maxPendingOperations Žádné Cosmos maximální počet nevyřízených operací hromadného zápisu do položky databáze. Definuje omezení souběžně zpracovávaných hromadných operací. Pokud není zadaný, určí se na základě velikosti virtuálního počítače exekutoru Sparku. Pokud je objem dat pro zřízenou propustnost v cílovém kontejneru velký, můžete toto nastavení upravit podle odhadu 1000 x Cores
spark.cosmos.write.bulk.enabled true Cosmos hromadného zápisu položky databáze

Konfigurace dotazů

Název konfigurační vlastnosti Výchozí Description
spark.cosmos.read.customQuery Žádná Pokud se zadáte, vlastní dotaz se zpracuje s koncovým bodem Cosmos místo dynamického generování dotazu prostřednictvím predikátového nabízení. Obvykle se doporučuje spoléhat na predikát Sparku push down, protože to umožní vygenerovat nejefektivnější sadu filtrů na základě plánu dotazu. Existuje však několik predikátů, jako jsou agregace (počet, seskupení, průměr, součet atd.), které se zatím neschytá (alespoň ve Sparku 3.1), takže vlastní dotaz je záložní možností, aby je bylo možné odeslat do dotazu odeslaného do Cosmos. Pokud je zadané, s povoleným odvozováním schématu se k odvození schématu použije také vlastní dotaz.
spark.cosmos.read.maxItemCount 1000 Přepíše maximální počet dokumentů, které je možné vrátit pro jeden požadavek dotazu nebo kanálu změn. Výchozí hodnota je – zvažte zvýšení pouze pro průměrné velikosti dokumentů menší než 1 kB nebo když projekce výrazně snižuje počet vlastností vybraných v dotazech (například při výběru 1000 "ID" dokumentů atd.).

Konfigurace odvozování schémat

Při provádění operací čtení mohou uživatelé zadat vlastní schéma nebo povolit konektoru, aby ho odvodit. Odvozování schématu je ve výchozím nastavení povolené.

Název vlastnosti konfigurace Výchozí Description
spark.cosmos.read.inferSchema.enabled true Když je odvození schématu zakázané a uživatel neposkytuje schéma, vrátí se nezpracovaný kód JSON.
spark.cosmos.read.inferSchema.query SELECT * FROM r Když je povoleno odvození schématu, slouží jako vlastní dotaz k odvození. Například pokud ukládáte více entit s různými schématy v rámci kontejneru a chcete zajistit, aby odvození bylo pouze u určitých typů dokumentů nebo chcete projektovat pouze konkrétní sloupce.
spark.cosmos.read.inferSchema.samplingSize 1000 Velikost vzorkování, která se má použít při odvození schématu a nepoužití dotazu
spark.cosmos.read.inferSchema.includeSystemProperties false když je povoleno odvození schématu, zda výsledné schéma bude zahrnovat všechny vlastnosti Cosmos DB systému.
spark.cosmos.read.inferSchema.includeTimestamp false Když je povoleno odvození schématu, zda výsledné schéma bude zahrnovat časové razítko dokumentu ( _ts ). Není vyžadováno spark.cosmos.read.inferSchema.includeSystemProperties , pokud je povoleno, protože již obsahuje všechny systémové vlastnosti.
spark.cosmos.read.inferSchema.forceNullableProperties true Když je povoleno odvození schématu, zda výsledné schéma nastaví všechny sloupce s možnou hodnotou null. Ve výchozím nastavení se všechny sloupce (s výjimkou Cosmos systémových vlastností) považují za Nullable, i když všechny řádky v ukázkové sadě mají hodnoty, které nejsou null. Je-li tato možnost zakázána, odvozené sloupce jsou považovány za hodnotu null nebo nejsou v závislosti na tom, zda některý záznam v ukázkové sadě má hodnoty null ve sloupci.

Konfigurace serializace

Slouží k ovlivnění chování serializace a deserializace JSON.

Název vlastnosti konfigurace Výchozí Description
spark.cosmos.serialization.inclusionMode Always Určuje, zda budou hodnoty null/výchozí hodnoty serializovány do formátu JSON nebo zda budou vlastnosti s hodnotou null/výchozí hodnotou vynechány. Chování se řídí stejnými nápady jako JsonInclude. include. Always znamená, že vlastnosti JSON jsou vytvořeny i pro hodnoty null a výchozí hodnoty. NonNull znamená, že pro explicitní hodnoty null nebudou vytvořeny žádné vlastnosti JSON. NonEmpty znamená, že se nevytvoří vlastnosti JSON pro prázdné řetězcové hodnoty nebo prázdná pole/Mpas. NonDefault znamená, že vlastnosti JSON budou vynechány nejen pro hodnotu null nebo prázdné, ale také v případě, že je hodnota shodná s výchozí hodnotou 0 pro číselné vlastnosti.

Změnit kanál (pouze pro Spark-Streaming používající cosmos.oltp.changeFeed zdroj dat, který je jen pro čtení)

Název vlastnosti konfigurace Výchozí Description
Spark. Cosmos. changeFeed. startFrom Beginning ChangeFeed Start z nastavení ( Now Beginning nebo určitého bodu v čase (UTC) 2020-02-10T14:15:03 ) – výchozí hodnota je Beginning . Pokud konfigurace zápisu obsahuje checkpointLocation a všechny kontrolní body existují, datový proud vždy pokračuje nezávisle na spark.cosmos.changeFeed.startFrom nastavení – checkpointLocation Pokud je to záměr, je třeba změnit nebo odstranit kontrolní body pro restartování datového proudu.
Spark. Cosmos. changeFeed. Mode Incremental ChangeFeed režim ( Incremental nebo FullFidelity )-Poznámka: FullFidelity je v experimentálním stavu hned teď. Vyžaduje, aby byl odběr nebo účet povolen pro soukromou verzi Preview a byly známy zásadní změny, ke kterým dojde FullFidelity (schéma vrácených dokumentů). V tuto chvíli se doporučuje používat jenom FullFidelity pro scénáře, které nejsou v produkčním prostředí.
Spark. Cosmos. changeFeed. itemCountPerTriggerHint Žádné Přibližný maximální počet položek načtených z kanálu změn pro každou mikrodávku nebo Trigger

Konfigurace převodu JSON

Název vlastnosti konfigurace Výchozí Description
spark.cosmos.read.schemaConversionMode Relaxed Chování převodu schématu ( Relaxed , Strict ). Při čtení dokumentů JSON, pokud dokument obsahuje atribut, který není namapován na typ schématu, uživatel se může rozhodnout, jestli má použít null hodnotu (odlehčenou) nebo výjimku (Strict).

Konfigurace strategie dělení

Název vlastnosti konfigurace Výchozí Description
spark.cosmos.read.partitioning.strategy Default Použitá strategie dělení (výchozí, vlastní, omezující nebo agresivní)
spark.cosmos.partitioning.targetedCount Žádné Počet cílových oddílů. Tento parametr je nepovinný a ignoruje se, pokud se nepoužívá strategie = = vlastní. V tomto případě konektor Spark nebude dynamicky počítat počet oddílů, ale s touto hodnotou.

Konfigurace řízení propustnosti

Název vlastnosti konfigurace Výchozí Description
spark.cosmos.throughputControl.enabled false Zda je povoleno řízení propustnosti
spark.cosmos.throughputControl.name Žádné Název skupiny řízení propustnosti
spark.cosmos.throughputControl.targetThroughput Žádné Propustnost cíle skupiny řízení propustnosti
spark.cosmos.throughputControl.targetThroughputThreshold Žádné Prahová hodnota propustnosti cíle skupiny řízení propustnosti
spark.cosmos.throughputControl.globalControl.database Žádné Databáze, která se bude používat pro globální řízení propustnosti
spark.cosmos.throughputControl.globalControl.container Žádné Kontejner, který bude použit pro globální řízení propustnosti
spark.cosmos.throughputControl.globalControl.renewIntervalInMS 5s Jak často klient bude aktualizovat využití propustnosti sebe sama
spark.cosmos.throughputControl.globalControl.expireIntervalInMS 11s Jak rychle se zjistí offline klient

Další kroky