Rychlý start: Správa dat pomocí konektoru OLTP Sparku 3 služby Azure Cosmos DB pro rozhraní API SQL

PLATÍ PRO: SQL API

Tento kurz je rychlý úvodní průvodce, který ukazuje, jak pomocí konektoru Spark pro Cosmos DB číst nebo zapisovat do Cosmos DB. Cosmos DB Spark Connector podporuje Spark 3.1.x a 3.2.x.

V tomto rychlém kurzu spoléháme na Azure Databricks Runtime 8.0 se Sparkem 3.1.1 a Jupyter Notebook, abychom ukázali, jak používat konektor Spark pro Cosmos DB, ale můžete také použít Azure Databricks Runtime 10.3 se Sparkem 3.2.1.

Můžete také použít jakoukoli jinou nabídku Sparku 3.1.1 nebo 3.2.1, také byste měli být schopni používat libovolný jazyk podporovaný Sparkem (PySpark, Scala, Java atd.) nebo libovolné rozhraní Sparku, které znáte (Jupyter Notebook, Livy atd.).

Požadavky

SLF4J je potřeba jenom v případě, že plánujete používat protokolování, stáhněte si také vazbu SLF4J, která propoje rozhraní API SLF4J s implementací protokolování podle vašeho výběru. Další informace najdete v uživatelské příručce SLF4J .

Nainstalujte do clusteru Sparku konektor Cosmos DB pomocí nejnovější verze Sparku 3.1.x nebo použijte nejnovější verzi Sparku 3.2.x.

Příručka Začínáme je založená na PySparku, ale můžete použít i ekvivalentní verzi scaly a v poznámkovém bloku Azure Databricks PySpark můžete spustit následující fragment kódu.

Vytváření databází a kontejnerů

Nejprve nastavte přihlašovací údaje účtu databáze Cosmos a název databáze Cosmos DATABÁZE a název kontejneru.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

V dalším kroku můžete pomocí nového rozhraní API katalogu vytvořit databázi a kontejner databáze Cosmos prostřednictvím Sparku.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

Při vytváření kontejnerů pomocí rozhraní API katalogu můžete nastavit propustnost a cestu klíče oddílu pro vytvoření kontejneru.

Další informace najdete v úplné dokumentaci k rozhraní API katalogu .

Ingestace dat

Název zdroje dat je cosmos.oltpa následující příklad ukazuje, jak můžete napsat datový rámec paměti skládající se ze dvou položek do Cosmos DATABÁZE:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Všimněte si, že id je povinné pole pro databázi Cosmos.

Další informace související s ingestováním dat najdete v úplné dokumentaci ke konfiguraci zápisu .

Dotazování dat

Pomocí stejného cosmos.oltp zdroje dat můžeme dotazovat data a použít filter k nasdílení filtrů:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

Další informace související s dotazováním dat najdete v úplné dokumentaci ke konfiguraci dotazů .

Odvozování schémat

Při dotazování na data může konektor Spark odvodit schéma na základě vzorkování existujících položek nastavením spark.cosmos.read.inferSchema.enabled na true.

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()

Případně můžete předat vlastní schéma, které chcete použít ke čtení dat:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(customSchema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Pokud není zadáno žádné vlastní schéma a odvozování schématu je zakázané, výsledná data vrátí nezpracovaný obsah Json položek:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Další informace týkající se odvozování schématu najdete v úplné dokumentaci k odvozování schématu .

Referenční informace o konfiguraci

Konektor Azure Cosmos DB Spark 3 OLTP pro rozhraní API pro SQL obsahuje kompletní referenční informace k konfiguraci, která poskytuje další a upřesňující nastavení zápisu a dotazování dat, serializace, streamování pomocí kanálu změn, dělení na oddíly a správu propustnosti a další. Úplný výpis s podrobnostmi najdete v našich referenčních informacích ke konfiguraci konektoru Sparku na GitHub.

Další kroky