Rychlý start: Správa dat pomocí konektoru OLTP pro Azure Cosmos DB Spark 3 pro SQL API
PLATÍ PRO:
SQL API
Tento kurz je úvodní příručkou, která ukazuje, jak pomocí konektoru Spark pro Cosmos DB číst nebo zapisovat do Cosmos DB. Cosmos DB Spark Connector je založený na Sparku 3.1.x.
V tomto rychlém kurzu se spoléháme na Azure Databricks Runtime 8.0 se Sparkem 3.1.1 a Jupyter Notebook a ukážeme si, jak používat konektor Spark pro Cosmos DB.
Můžete také použít jakoukoli jinou nabídku Sparku 3.1.1 Sparku. Také byste měli být schopni používat libovolný jazyk podporovaný Sparkem (PySpark, Scala, Java atd.) nebo jakékoli rozhraní Sparku, které už znáte (Jupyter Notebook, Livy atd.).
Požadavky
Aktivní účet Azure. Pokud žádný nemáte, můžete si zaregistrovat bezplatný účet. Alternativně můžete použít Azure Cosmos DB Emulator pro vývoj a testování.
Azure Databricks runtime 8.0 se Sparkem 3.1.1.
(Volitelné) Vazba SLF4J slouží k přidružení konkrétního protokolovací architektury k SLF4J.
SLF4J je potřeba jenom v případě, že máte v plánu používat protokolování, stáhněte si také vazbu SLF4J, která propoji s rozhraním API SLF4J s implementací protokolování podle vašeho výběru. Další informace najdete v uživatelské příručce SLF4J.
Nainstalujte Cosmos DB Spark Connector do clusteru spark azure-cosmos-spark_3-1_2-12-4.3.1.jar.
Příručka Začínáme je založená na PySparku, ale můžete použít také ekvivalentní verzi Scala a následující fragment kódu můžete spustit v poznámkovém bloku Azure Databricks PySpark.
Vytváření databází a kontejnerů
Nejprve nastavte Cosmos účtu databáze a název a název kontejneru Cosmos DB.
cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"
cfg = {
"spark.cosmos.accountEndpoint" : cosmosEndpoint,
"spark.cosmos.accountKey" : cosmosMasterKey,
"spark.cosmos.database" : cosmosDatabaseName,
"spark.cosmos.container" : cosmosContainerName,
}
V dalším kroku můžete pomocí nového rozhraní API katalogu vytvořit databázi Cosmos DB a kontejner prostřednictvím Sparku.
# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))
Při vytváření kontejnerů pomocí rozhraní API katalogu můžete nastavit propustnost a cestu ke klíči oddílu pro kontejner, který se má vytvořit.
Další informace najdete v úplné dokumentaci k rozhraní API katalogu.
Ingestace dat
Název zdroje dat je a následující příklad ukazuje, jak můžete napsat datový rámec paměti sestávající ze dvou položek do cosmos.oltp Cosmos DB:
spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
.toDF("id","name","age","isAlive") \
.write\
.format("cosmos.oltp")\
.options(**cfg)\
.mode("APPEND")\
.save()
Všimněte id si, že je povinné pole pro Cosmos DB.
Další informace týkající se ingestování dat najdete v úplné dokumentaci ke konfiguraci zápisu.
Dotazování dat
Pomocí stejného cosmos.oltp zdroje dat se můžeme dotazovat na data a používat k filter nabízení filtrů:
from pyspark.sql.functions import col
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.filter(col("isAlive") == True)\
.show()
Další informace týkající se dotazování dat najdete v úplné dokumentaci ke konfiguraci dotazů.
Odvození schématu
Při dotazování dat může konektor Spark odvodit schéma na základě vzorkování existujících položek nastavením spark.cosmos.read.inferSchema.enabled na true .
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.printSchema()
Alternativně můžete předat vlastní schéma, které chcete použít ke čtení dat:
customSchema = StructType([
StructField("id", StringType()),
StructField("name", StringType()),
StructField("type", StringType()),
StructField("age", IntegerType()),
StructField("isAlive", BooleanType())
])
df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg)\
.load()
df.printSchema()
Pokud není zadané žádné vlastní schéma a odvození schématu je zakázané, výsledná data budou vracet nezpracovaný obsah JSON položek:
df = spark.read.format("cosmos.oltp").options(**cfg)\
.load()
df.printSchema()
Další informace týkající se odvozování schématu najdete v úplné dokumentaci ke konfiguraci odvozování schémat.
Referenční informace ke konfiguraci
Obecná konfigurace
| Název konfigurační vlastnosti | Výchozí | Description |
|---|---|---|
spark.cosmos.accountEndpoint |
Žádná | Cosmos URI koncového bodu účtu databáze |
spark.cosmos.accountKey |
Žádné | Cosmos klíč účtu databáze |
spark.cosmos.database |
Žádné | Cosmos NÁZEV DATABÁZE |
spark.cosmos.container |
Žádné | Cosmos DB – název kontejneru |
Dodatečné ladění
| Název konfigurační vlastnosti | Výchozí | Description |
|---|---|---|
spark.cosmos.useGatewayMode |
false |
Použití režimu brány pro operace klienta |
spark.cosmos.read.forceEventualConsistency |
true |
Umožňuje klientovi používat konzistenci Typu Případné pro operace čtení místo použití výchozí konzistence na úrovni účtu. |
spark.cosmos.applicationName |
Žádné | Název aplikace |
spark.cosmos.preferredRegionsList |
Žádné | Seznam preferovaných oblastí, který se má použít pro účet Cosmos DB. Jedná se o hodnotu oddělenou čárkami (například nebo ), pokud se jako nápověda použije [East US, West US] East US, West US upřednostňované oblasti. Měli byste použít kolokovaný cluster Spark s vaším účtem Cosmos DB a předat oblast clusteru Spark jako upřednostňovanou oblast. Seznam oblastí Azure najdete tady. Můžete také použít jako spark.cosmos.preferredRegions alias. |
spark.cosmos.diagnostics |
Žádné | Lze použít k povolení podrobnější diagnostiky. V současné době je jedinou podporovanou možností nastavit tuto vlastnost na , což bude mít za následek vysílání dalších protokolů jako protokolů v protokolech simple INFO Driver and Executor. |
Zápis konfigurace
| Název konfigurační vlastnosti | Výchozí | Description |
|---|---|---|
spark.cosmos.write.strategy |
ItemOverwrite |
Cosmos strategie zápisu položek databáze: ItemOverwrite (pomocí upsert), (pomocí příkazu ItemAppend create, ignore pre-existing items that are, Conflicts), (delete all documents) (odstraňte všechny dokumenty, u kterých se nezměnila ItemDelete ItemDeleteIfNotModified ztag) |
spark.cosmos.write.maxRetryCount |
10 |
Cosmos opakování pokusů o zápis do databáze při neúspěšných pokusech o opakování (například chyba připojení) |
spark.cosmos.write.point.maxConcurrency |
Žádné | Cosmos souběžnosti zápisu položky databáze Pokud není zadaný, určí se na základě velikosti virtuálního počítače exekutoru Sparku. |
spark.cosmos.write.bulk.maxPendingOperations |
Žádné | Cosmos maximální počet nevyřízených operací hromadného zápisu do položky databáze. Definuje omezení souběžně zpracovávaných hromadných operací. Pokud není zadaný, určí se na základě velikosti virtuálního počítače exekutoru Sparku. Pokud je objem dat pro zřízenou propustnost v cílovém kontejneru velký, můžete toto nastavení upravit podle odhadu 1000 x Cores |
spark.cosmos.write.bulk.enabled |
true |
Cosmos hromadného zápisu položky databáze |
Konfigurace dotazů
| Název konfigurační vlastnosti | Výchozí | Description |
|---|---|---|
spark.cosmos.read.customQuery |
Žádná | Pokud se zadáte, vlastní dotaz se zpracuje s koncovým bodem Cosmos místo dynamického generování dotazu prostřednictvím predikátového nabízení. Obvykle se doporučuje spoléhat na predikát Sparku push down, protože to umožní vygenerovat nejefektivnější sadu filtrů na základě plánu dotazu. Existuje však několik predikátů, jako jsou agregace (počet, seskupení, průměr, součet atd.), které se zatím neschytá (alespoň ve Sparku 3.1), takže vlastní dotaz je záložní možností, aby je bylo možné odeslat do dotazu odeslaného do Cosmos. Pokud je zadané, s povoleným odvozováním schématu se k odvození schématu použije také vlastní dotaz. |
spark.cosmos.read.maxItemCount |
1000 |
Přepíše maximální počet dokumentů, které je možné vrátit pro jeden požadavek dotazu nebo kanálu změn. Výchozí hodnota je – zvažte zvýšení pouze pro průměrné velikosti dokumentů menší než 1 kB nebo když projekce výrazně snižuje počet vlastností vybraných v dotazech (například při výběru 1000 "ID" dokumentů atd.). |
Konfigurace odvozování schémat
Při provádění operací čtení mohou uživatelé zadat vlastní schéma nebo povolit konektoru, aby ho odvodit. Odvozování schématu je ve výchozím nastavení povolené.
| Název vlastnosti konfigurace | Výchozí | Description |
|---|---|---|
spark.cosmos.read.inferSchema.enabled |
true |
Když je odvození schématu zakázané a uživatel neposkytuje schéma, vrátí se nezpracovaný kód JSON. |
spark.cosmos.read.inferSchema.query |
SELECT * FROM r |
Když je povoleno odvození schématu, slouží jako vlastní dotaz k odvození. Například pokud ukládáte více entit s různými schématy v rámci kontejneru a chcete zajistit, aby odvození bylo pouze u určitých typů dokumentů nebo chcete projektovat pouze konkrétní sloupce. |
spark.cosmos.read.inferSchema.samplingSize |
1000 |
Velikost vzorkování, která se má použít při odvození schématu a nepoužití dotazu |
spark.cosmos.read.inferSchema.includeSystemProperties |
false |
když je povoleno odvození schématu, zda výsledné schéma bude zahrnovat všechny vlastnosti Cosmos DB systému. |
spark.cosmos.read.inferSchema.includeTimestamp |
false |
Když je povoleno odvození schématu, zda výsledné schéma bude zahrnovat časové razítko dokumentu ( _ts ). Není vyžadováno spark.cosmos.read.inferSchema.includeSystemProperties , pokud je povoleno, protože již obsahuje všechny systémové vlastnosti. |
spark.cosmos.read.inferSchema.forceNullableProperties |
true |
Když je povoleno odvození schématu, zda výsledné schéma nastaví všechny sloupce s možnou hodnotou null. Ve výchozím nastavení se všechny sloupce (s výjimkou Cosmos systémových vlastností) považují za Nullable, i když všechny řádky v ukázkové sadě mají hodnoty, které nejsou null. Je-li tato možnost zakázána, odvozené sloupce jsou považovány za hodnotu null nebo nejsou v závislosti na tom, zda některý záznam v ukázkové sadě má hodnoty null ve sloupci. |
Konfigurace serializace
Slouží k ovlivnění chování serializace a deserializace JSON.
| Název vlastnosti konfigurace | Výchozí | Description |
|---|---|---|
spark.cosmos.serialization.inclusionMode |
Always |
Určuje, zda budou hodnoty null/výchozí hodnoty serializovány do formátu JSON nebo zda budou vlastnosti s hodnotou null/výchozí hodnotou vynechány. Chování se řídí stejnými nápady jako JsonInclude. include. Always znamená, že vlastnosti JSON jsou vytvořeny i pro hodnoty null a výchozí hodnoty. NonNull znamená, že pro explicitní hodnoty null nebudou vytvořeny žádné vlastnosti JSON. NonEmpty znamená, že se nevytvoří vlastnosti JSON pro prázdné řetězcové hodnoty nebo prázdná pole/Mpas. NonDefault znamená, že vlastnosti JSON budou vynechány nejen pro hodnotu null nebo prázdné, ale také v případě, že je hodnota shodná s výchozí hodnotou 0 pro číselné vlastnosti. |
Změnit kanál (pouze pro Spark-Streaming používající cosmos.oltp.changeFeed zdroj dat, který je jen pro čtení)
| Název vlastnosti konfigurace | Výchozí | Description |
|---|---|---|
| Spark. Cosmos. changeFeed. startFrom | Beginning |
ChangeFeed Start z nastavení ( Now Beginning nebo určitého bodu v čase (UTC) 2020-02-10T14:15:03 ) – výchozí hodnota je Beginning . Pokud konfigurace zápisu obsahuje checkpointLocation a všechny kontrolní body existují, datový proud vždy pokračuje nezávisle na spark.cosmos.changeFeed.startFrom nastavení – checkpointLocation Pokud je to záměr, je třeba změnit nebo odstranit kontrolní body pro restartování datového proudu. |
| Spark. Cosmos. changeFeed. Mode | Incremental |
ChangeFeed režim ( Incremental nebo FullFidelity )-Poznámka: FullFidelity je v experimentálním stavu hned teď. Vyžaduje, aby byl odběr nebo účet povolen pro soukromou verzi Preview a byly známy zásadní změny, ke kterým dojde FullFidelity (schéma vrácených dokumentů). V tuto chvíli se doporučuje používat jenom FullFidelity pro scénáře, které nejsou v produkčním prostředí. |
| Spark. Cosmos. changeFeed. itemCountPerTriggerHint | Žádné | Přibližný maximální počet položek načtených z kanálu změn pro každou mikrodávku nebo Trigger |
Konfigurace převodu JSON
| Název vlastnosti konfigurace | Výchozí | Description |
|---|---|---|
spark.cosmos.read.schemaConversionMode |
Relaxed |
Chování převodu schématu ( Relaxed , Strict ). Při čtení dokumentů JSON, pokud dokument obsahuje atribut, který není namapován na typ schématu, uživatel se může rozhodnout, jestli má použít null hodnotu (odlehčenou) nebo výjimku (Strict). |
Konfigurace strategie dělení
| Název vlastnosti konfigurace | Výchozí | Description |
|---|---|---|
spark.cosmos.read.partitioning.strategy |
Default |
Použitá strategie dělení (výchozí, vlastní, omezující nebo agresivní) |
spark.cosmos.partitioning.targetedCount |
Žádné | Počet cílových oddílů. Tento parametr je nepovinný a ignoruje se, pokud se nepoužívá strategie = = vlastní. V tomto případě konektor Spark nebude dynamicky počítat počet oddílů, ale s touto hodnotou. |
Konfigurace řízení propustnosti
| Název vlastnosti konfigurace | Výchozí | Description |
|---|---|---|
spark.cosmos.throughputControl.enabled |
false |
Zda je povoleno řízení propustnosti |
spark.cosmos.throughputControl.name |
Žádné | Název skupiny řízení propustnosti |
spark.cosmos.throughputControl.targetThroughput |
Žádné | Propustnost cíle skupiny řízení propustnosti |
spark.cosmos.throughputControl.targetThroughputThreshold |
Žádné | Prahová hodnota propustnosti cíle skupiny řízení propustnosti |
spark.cosmos.throughputControl.globalControl.database |
Žádné | Databáze, která se bude používat pro globální řízení propustnosti |
spark.cosmos.throughputControl.globalControl.container |
Žádné | Kontejner, který bude použit pro globální řízení propustnosti |
spark.cosmos.throughputControl.globalControl.renewIntervalInMS |
5s |
Jak často klient bude aktualizovat využití propustnosti sebe sama |
spark.cosmos.throughputControl.globalControl.expireIntervalInMS |
11s |
Jak rychle se zjistí offline klient |
Další kroky
- Azure Cosmos DB Apache Spark 3 OLTP konektor pro jádro (SQL) API: poznámky k verzi a prostředky
- Přečtěte si další informace o Apache Sparku.