Adatok másolása az Azure Cosmos DB-ből egy dedikált SQL-készletbe az Apache Spark használatával

Azure Synapse Link az Azure Cosmos DB-hez lehetővé teszi a felhasználók számára, hogy közel valós idejű elemzéseket futtassanak az Azure Cosmos DB-ben lévő operatív adatokon. Vannak azonban olyan esetek, amikor egyes adatokat összesíteni és bővíteni kell az adattárház felhasználóinak kiszolgálásához. A csatolási adatok Azure Synapse összefűzésével és exportálásával a jegyzetfüzet néhány celláját használhatja.

Előfeltételek

Lépések

Ebben az oktatóanyagban az elemzési tárhoz fog kapcsolódni, így nincs hatással a tranzakciós tárolóra (a kérelemegységek nem lesznek felhasználva). A következő lépéseket fogjuk elvégezni:

  1. Az Azure Cosmos DB HTAP-tároló beolvasása Spark-adatkeretbe
  2. Az eredmények összesítése új adatkeretben
  3. Adatok betöltése dedikált SQL-készletbe

Spark–SQL– 1. lépés

Adatok

Ebben a példában egy RetailSales nevű HTAP-tárolót használunk. Ez egy ConnectedData nevű társított szolgáltatás része, és a következő sémával rendelkezik:

  • _rid: sztring (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: sztring (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: sztring (nullable = true)
  • reklám: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: sztring (nullable = true)

Az értékesítéseket (mennyiség, bevétel (ár x mennyiség) összesítjük productCode és weekStarting jelentéskészítési célokra. Végül exportáljuk az adatokat egy nevű dedikált SQL-készlettáblába dbo.productsales.

Spark-jegyzetfüzet konfigurálása

Hozzon létre egy Spark-jegyzetfüzetet a Scala mint Spark (Scala) fő nyelvvel. A munkamenethez a jegyzetfüzet alapértelmezett beállítását használjuk.

Adatok olvasása a Sparkban

Olvassa be az Azure Cosmos DB HTAP-tárolót a Sparkkal egy adatkeretbe az első cellában.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Az eredmények összesítése új adatkeretben

A második cellában futtatjuk az új adatkerethez szükséges átalakítást és összesítéseket, mielőtt betöltenénk egy dedikált SQL-készlet adatbázisába.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Az eredmények betöltése egy dedikált SQL-készletbe

A harmadik cellában egy dedikált SQL-készletbe töltjük be az adatokat. Automatikusan létrehoz egy ideiglenes külső táblát, külső adatforrást és külső fájlformátumot, amely a feladat befejezése után törlődik.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Az eredmények lekérdezése SQL-lel

Az eredményt lekérdezheti egy egyszerű SQL-lekérdezéssel, például a következő SQL-szkripttel:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

A lekérdezés a következő eredményeket jeleníti meg diagram módban: Spark–SQL 2. lépés

Következő lépések