Adatok másolása az Azure Cosmos DB-ből egy dedikált SQL-készletbe az Apache Spark használatával
Azure Synapse Link az Azure Cosmos DB-hez lehetővé teszi a felhasználók számára, hogy közel valós idejű elemzéseket futtassanak az Azure Cosmos DB-ben lévő operatív adatokon. Vannak azonban olyan esetek, amikor egyes adatokat összesíteni és bővíteni kell az adattárház felhasználóinak kiszolgálásához. A csatolási adatok Azure Synapse összefűzésével és exportálásával a jegyzetfüzet néhány celláját használhatja.
Előfeltételek
- Synapse-munkaterület kiépítése a következőkkel:
- Azure Cosmos DB-fiók kiépítése HTAP-tárolóval adatokkal
- Az Azure Cosmos DB HTAP-tároló csatlakoztatása a munkaterülethez
- A megfelelő beállítással importálhat adatokat egy dedikált SQL-készletbe a Sparkból
Lépések
Ebben az oktatóanyagban az elemzési tárhoz fog kapcsolódni, így nincs hatással a tranzakciós tárolóra (a kérelemegységek nem lesznek felhasználva). A következő lépéseket fogjuk elvégezni:
- Az Azure Cosmos DB HTAP-tároló beolvasása Spark-adatkeretbe
- Az eredmények összesítése új adatkeretben
- Adatok betöltése dedikált SQL-készletbe
Adatok
Ebben a példában egy RetailSales nevű HTAP-tárolót használunk. Ez egy ConnectedData nevű társított szolgáltatás része, és a következő sémával rendelkezik:
- _rid: sztring (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: sztring (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: sztring (nullable = true)
- reklám: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: sztring (nullable = true)
Az értékesítéseket (mennyiség, bevétel (ár x mennyiség) összesítjük productCode és weekStarting jelentéskészítési célokra. Végül exportáljuk az adatokat egy nevű dedikált SQL-készlettáblába dbo.productsales
.
Spark-jegyzetfüzet konfigurálása
Hozzon létre egy Spark-jegyzetfüzetet a Scala mint Spark (Scala) fő nyelvvel. A munkamenethez a jegyzetfüzet alapértelmezett beállítását használjuk.
Adatok olvasása a Sparkban
Olvassa be az Azure Cosmos DB HTAP-tárolót a Sparkkal egy adatkeretbe az első cellában.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Az eredmények összesítése új adatkeretben
A második cellában futtatjuk az új adatkerethez szükséges átalakítást és összesítéseket, mielőtt betöltenénk egy dedikált SQL-készlet adatbázisába.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Az eredmények betöltése egy dedikált SQL-készletbe
A harmadik cellában egy dedikált SQL-készletbe töltjük be az adatokat. Automatikusan létrehoz egy ideiglenes külső táblát, külső adatforrást és külső fájlformátumot, amely a feladat befejezése után törlődik.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Az eredmények lekérdezése SQL-lel
Az eredményt lekérdezheti egy egyszerű SQL-lekérdezéssel, például a következő SQL-szkripttel:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
A lekérdezés a következő eredményeket jeleníti meg diagram módban: