Menyalin data dari Azure Cosmos DB ke dalam kumpulan SQL khusus menggunakan Apache Spark

Azure Synapse Link for Azure Cosmos DB memungkinkan pengguna menjalankan analitik mendekati real-time melalui data operasional di Azure Cosmos DB. Namun, ada kalanya beberapa data perlu diagregasikan dan diperkaya untuk melayani pengguna gudang data. Mengkurasi dan mengekspor data Link Azure Synapse dapat dilakukan hanya dengan beberapa sel di buku catatan.

Prasyarat

Langkah-langkah

Dalam tutorial ini, Anda akan tersambung ke penyimpanan analitis sehingga tidak ada dampak pada penyimpanan transaksional (tidak akan mengkonsumsi Unit Permintaan apa pun). Kita akan melalui langkah-langkah berikut:

  1. Membaca kontainer HTAP Azure Cosmos DB ke dalam dataframe Spark
  2. Agregasi hasil dalam dataframe baru
  3. Serap data ke dalam kumpulan SQL khusus

Spark ke SQL Langkah 1

Data

Dalam contoh tersebut, kita menggunakan kontainer HTAP yang disebut RetailSales. Ini adalah bagian dari layanan yang ditautkan yang disebut ConnectedData, dan memiliki skema berikut:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

Kita akan mengagregasi penjualan (kuantitas, pendapatan (harga x kuantitas) dengan productCode dan weekStarting untuk tujuan pelaporan. Terakhir, kita akan mengekspor data tersebut ke dalam tabel kumpulan SQL khusus yang disebut dbo.productsales.

Mengonfigurasi Spark Notebook

Buat Spark Notebook dengan Scala sebagai Spark (Scala) sebagai bahasa utama. Kita menggunakan pengaturan default buku catatan untuk sesi tersebut.

Membaca data di Spark

Baca kontainer HTAP Azure Cosmos DB dengan Spark ke dalam kerangka data di sel pertama.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Mengagregasi hasil dalam dataframe baru

Di sel kedua, kita menjalankan transformasi dan agregat yang diperlukan untuk dataframe baru sebelum memuatnya ke dalam database kumpulan SQL khusus.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Memuat hasilnya ke dalam kumpulan SQL khusus

Di sel ketiga, kita memuat data ke dalam kumpulan SQL khusus. Ini secara otomatis akan membuat tabel eksternal sementara, sumber data eksternal, dan format file eksternal yang akan dihapus setelah pekerjaan selesai.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Meminta hasil dengan SQL

Anda dapat meminta hasil menggunakan kueri SQL sederhana seperti skrip SQL berikut:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Kueri Anda akan menyajikan hasil berikut dalam mode bagan: Spark ke Langkah SQL 2

Langkah berikutnya