Menyalin data dari Azure Cosmos DB ke dalam kumpulan SQL khusus menggunakan Apache Spark
Azure Synapse Link for Azure Cosmos DB memungkinkan pengguna menjalankan analitik mendekati real-time melalui data operasional di Azure Cosmos DB. Namun, ada kalanya beberapa data perlu diagregasikan dan diperkaya untuk melayani pengguna gudang data. Mengkurasi dan mengekspor data Link Azure Synapse dapat dilakukan hanya dengan beberapa sel di buku catatan.
Prasyarat
- Memprovisikan ruang kerja Synapse dengan:
- Memprovisikan akun Azure Cosmos DB dengan kontainer HTAP dengan data
- Menyambungkan kontainer Azure Cosmos DB HTAP ke ruang kerja
- Memiliki pengaturan yang tepat untuk mengimpor data ke dalam kumpulan SQL khusus dari Spark
Langkah-langkah
Dalam tutorial ini, Anda akan tersambung ke penyimpanan analitis sehingga tidak ada dampak pada penyimpanan transaksional (tidak akan mengkonsumsi Unit Permintaan apa pun). Kita akan melalui langkah-langkah berikut:
- Membaca kontainer HTAP Azure Cosmos DB ke dalam dataframe Spark
- Agregasi hasil dalam dataframe baru
- Serap data ke dalam kumpulan SQL khusus
Data
Dalam contoh tersebut, kita menggunakan kontainer HTAP yang disebut RetailSales. Ini adalah bagian dari layanan yang ditautkan yang disebut ConnectedData, dan memiliki skema berikut:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
Kita akan mengagregasi penjualan (kuantitas, pendapatan (harga x kuantitas) dengan productCode dan weekStarting untuk tujuan pelaporan. Terakhir, kita akan mengekspor data tersebut ke dalam tabel kumpulan SQL khusus yang disebut dbo.productsales
.
Mengonfigurasi Spark Notebook
Buat Spark Notebook dengan Scala sebagai Spark (Scala) sebagai bahasa utama. Kita menggunakan pengaturan default buku catatan untuk sesi tersebut.
Membaca data di Spark
Baca kontainer HTAP Azure Cosmos DB dengan Spark ke dalam kerangka data di sel pertama.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Mengagregasi hasil dalam dataframe baru
Di sel kedua, kita menjalankan transformasi dan agregat yang diperlukan untuk dataframe baru sebelum memuatnya ke dalam database kumpulan SQL khusus.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Memuat hasilnya ke dalam kumpulan SQL khusus
Di sel ketiga, kita memuat data ke dalam kumpulan SQL khusus. Ini secara otomatis akan membuat tabel eksternal sementara, sumber data eksternal, dan format file eksternal yang akan dihapus setelah pekerjaan selesai.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Meminta hasil dengan SQL
Anda dapat meminta hasil menggunakan kueri SQL sederhana seperti skrip SQL berikut:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Kueri Anda akan menyajikan hasil berikut dalam mode bagan: