Oktatóanyag: Csatlakozás az Azure Cosmos DB for NoSQL-be a Spark használatával

A KÖVETKEZŐRE VONATKOZIK: NoSQL

Ebben az oktatóanyagban az Azure Cosmos DB Spark-összekötő használatával olvashat vagy írhat adatokat egy Azure Cosmos DB for NoSQL-fiókból. Ez az oktatóanyag az Azure Databricks és egy Jupyter notebook használatával mutatja be, hogyan integrálható a NoSQL-hez készült API-val a Sparkból. Ez az oktatóanyag a Pythonra és a Scalára összpontosít, annak ellenére, hogy a Spark által támogatott nyelveket vagy felületeket használhatja.

Ebben az oktatóanyagban az alábbiakkal fog megismerkedni:

  • Csatlakozás egy Api for NoSQL-fiókba Spark és Jupyter notebook használatával
  • Adatbázis- és tárolóerőforrások létrehozása
  • Adatok betöltése a tárolóba
  • Adatok lekérdezése a tárolóban
  • Gyakori műveletek végrehajtása a tároló elemein

Előfeltételek

  • Egy meglévő Azure Cosmos DB for NoSQL-fiók.
  • Egy meglévő Azure Databricks-munkaterület.

Csatlakozás a Spark és a Jupyter használatával

A meglévő Azure Databricks-munkaterület használatával hozzon létre egy számítási fürtöt, amely készen áll az Apache Spark 3.4.x használatára az Azure Cosmos DB for NoSQL-fiókhoz való csatlakozáshoz.

  1. Nyissa meg az Azure Databricks-munkaterületet.

  2. A munkaterület felületén hozzon létre egy új fürtöt. Konfigurálja a fürtöt az alábbi beállításokkal, legalább:

    Érték
    Futtatókörnyezet verziója 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. A munkaterületi felületen maven-csomagokat kereshet a Maven Centralbóla csoportazonosítóvalcom.azure.cosmos.spark. Telepítse a Spark 3.4-hez tartozó csomagot a fürthöz előtaggal ellátott Összetevő-azonosítóvalazure-cosmos-spark_3-4.

  4. Végül hozzon létre egy új jegyzetfüzetet.

    Tipp.

    Alapértelmezés szerint a jegyzetfüzet a nemrég létrehozott fürthöz lesz csatolva.

  5. A jegyzetfüzeten belül állítsa be az OLTP konfigurációs beállításait a NoSQL-fiókvégponthoz, az adatbázis nevéhez és a tároló nevéhez.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Adatbázis és tároló létrehozása

A Catalog API használatával kezelheti a fiókerőforrásokat, például az adatbázisokat és a tárolókat. Ezután az OLTP használatával kezelheti a tárolóerőforrás[k] adatait.

  1. Konfigurálja a Catalog API-t a NoSQL-erőforrások API-jának a Spark használatával történő kezelésére.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Hozzon létre egy új, a következővel elnevezett adatbázist cosmicworksCREATE DATABASE IF NOT EXISTS: .

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Hozzon létre egy új, a következővel elnevezett tárolót productsCREATE TABLE IF NOT EXISTS: . Győződjön meg arról, hogy a partíciókulcs elérési útját /category úgy állítja be és engedélyezi az automatikus skálázási átviteli sebességet, hogy a kérelemegységek maximális átviteli sebessége 1000 másodpercenként (RU/s) legyen.

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Hozzon létre egy másik, hierarchikus partíciókulcs-konfigurációval /organizationelnevezett employees tárolót a , /departmentés /team a partíciókulcs elérési útjainak készleteként ebben a sorrendben. Emellett állítsa az átviteli sebességet manuális ru/s értékre 400

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Futtassa a jegyzetfüzetcellát[k] annak ellenőrzéséhez, hogy az adatbázis és a tárolók létre lettek-e hozva a NoSQL-fiók api-jában.

Adatok betöltése

Hozzon létre egy mintaadatkészletet, majd az OLTP használatával töltse be az adatokat a NoSQL-tároló API-ba.

  1. Hozzon létre egy mintaadatkészletet.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. A korábban mentett OLTP-konfigurációval spark.createDataFrame mintaadatokat adhat hozzá a céltárolóhoz.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Adatok lekérdezése

Töltse be az OLTP-adatokat egy adatkeretbe, hogy gyakori lekérdezéseket hajtson végre az adatokon. A különböző szintaxisok szűrő- vagy lekérdezési adatokat is használhatnak.

  1. Az OLTP-adatok adatkeret-objektumba való betöltésére használható spark.read . Használja ugyanazt a konfigurációt, amelyet az oktatóanyag korábbi részében használt. Emellett állítsa spark.cosmos.read.inferSchema.enabled igaz értékre, hogy a Spark-összekötő a meglévő elemek mintavételezésével következtethesse a sémát.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Renderelje az adatkeretbe betöltött adatok sémáját a következő használatával printSchema: .

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Adatsorok megjelenítése, ahol az quantity oszlop kisebb, mint 20. A lekérdezés végrehajtásához használja az where és show a függvényeket.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Az első adatsor megjelenítése, ahol az clearance oszlop igaz. A lekérdezés végrehajtásához használja a filter függvényt.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Öt adatsor megjelenítése szűrés vagy csonkolás nélkül. A függvény használatával show testre szabhatja a megjelenített sorok megjelenését és számát.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Adatok lekérdezése ezzel a nyers NoSQL-lekérdezési sztringgel: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Gyakori műveletek végrehajtása

Ha a Sparkban az API for NoSQL-adatokkal dolgozik, részleges frissítéseket hajthat végre, vagy nyers JSON-ként dolgozhat az adatokkal.

  1. Egy elem részleges frissítésének végrehajtásához hajtsa végre az alábbi lépéseket:

    1. Másolja ki a meglévő config konfigurációs változót, és módosítsa a tulajdonságokat az új példányban. Különösen; konfigurálja az írási stratégiát úgy, hogy ItemPatchletiltsa a tömeges támogatást, állítsa be az oszlopokat és a leképezett műveleteket, és végül állítsa be az alapértelmezett művelettípust a következőre Set: .

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Hozzon létre változókat a javításművelet részeként megcélozni kívánt elem partíciókulcsához és egyedi azonosítójához.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Hozzon létre egy javításobjektum-készletet a célelem megadásához, és adja meg a módosítani kívánt mezőket.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Hozzon létre egy adatkeretet a javításobjektumok készletével, és használja write a javításművelet végrehajtásához.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Futtasson egy lekérdezést a javításművelet eredményeinek áttekintéséhez. Az elemet mostantól más módosítás nélkül el kell nevezni Yamba New Surfboard .

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. A nyers JSON-adatok használatához hajtsa végre az alábbi lépéseket:

    1. Másolja ki a meglévő config konfigurációs változót, és módosítsa a tulajdonságokat az új példányban. Különösen; módosítsa a céltárolót employees a nyers JSON-adatok használatára, és konfigurálja az contacts oszlopot/mezőt.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Hozzon létre egy alkalmazottkészletet, amely betölti a tárolót.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Hozzon létre egy adatkeretet, és használja write az alkalmazotti adatok betöltésére.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Renderelje az adatokat az adatkeretből a következő használatával show: . Figyelje meg, hogy az contacts oszlop nyers JSON a kimenetben.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Következő lépés