Migrasikan satu hingga beberapa data relasional ke akun Azure Cosmos DB SQL API
BERLAKU UNTUK:
SQL API
Untuk bermigrasi dari basis data relasional ke Azure Cosmos DB SQL API, anda perlu membuat perubahan pada model data untuk pengoptimalan.
Salah satu transformasi umum adalah denormalisasi data dengan menyematkan subitem terkait dalam satu dokumen JSON. Di sini kita melihat beberapa opsi untuk ini menggunakan Azure Data Factory atau Azure Databricks. Untuk panduan umum tentang pemodelan data untuk Cosmos DB, silahkan tinjau Pemodelan data di Azure Cosmos DB.
Contoh Skenario
Asumsikan kita memiliki dua tabel berikut dalam basis data SQL, Pesanan, dan OrderDetails kami.
Kami ingin menggabungkan satu hingga beberapa hubungan ini ke dalam satu dokumen JSON selama migrasi. Untuk melakukan ini, kita dapat membuat kueri T-SQL menggunakan "FOR JSON" seperti di bawah ini:
SELECT
o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;
Hasil kueri ini akan terlihat seperti di bawah ini:
Idealnya, Anda ingin menggunakan satu aktivitas salinan Azure Data Factory (ADF) untuk mengkueri data SQL sebagai sumber dan menulis output langsung ke Azure Cosmos DB tenggelam sebagai objek JSON yang tepat. Saat ini, tidak mungkin untuk melakukan transformasi JSON yang diperlukan dalam satu aktivitas salinan. Jika kita mencoba menyalin hasil kueri di atas ke dalam kontainer Azure Cosmos DB SQL API, kita akan melihat bidang OrderDetails sebagai properti string dokumen kami, bukan array JSON yang diharapkan.
Kita dapat mengatasi batasan saat ini dengan salah satu cara berikut:
Gunakan Azure Data Factory dengan dua aktivitas salin:
- Dapatkan data berformat JSON dari SQL ke file teks di lokasi penyimpanan gumpalan perantara, dan
- Muat data dari file teks JSON ke kontainer di Azure Cosmos DB.
Gunakan Azure Databricks untuk membaca dari SQL dan menulis ke Azure Cosmos DB - kami akan menyajikan dua opsi di sini.
Mari kita lihat pendekatan ini secara lebih rinci:
Azure Data Factory
Meskipun kita tidak dapat menanamkan OrderDetails sebagai JSON-array di dokumen Cosmos DB tujuan, kita dapat mengatasi masalah dengan menggunakan dua Aktivitas Salinan terpisah.
Salin Aktivitas #1: SqlJsonToBlobText
Untuk data sumber, kami menggunakan kueri SQL untuk mendapatkan kumpulan hasil sebagai kolom tunggal dengan satu objek JSON (mewakili Pesanan) per baris menggunakan kemampuan SQL Server OPENJSON dan FOR JSON PATH:
SELECT [value] FROM OPENJSON(
(SELECT
id = o.OrderID,
o.OrderDate,
o.FirstName,
o.LastName,
o.Address,
o.City,
o.State,
o.PostalCode,
o.Country,
o.Phone,
o.Total,
(select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o FOR JSON PATH)
)
Untuk wastafel aktivitas salinan SqlJsonToBlobText, kami memilih "Teks Dibatasi" dan mengarahkannya ke folder tertentu di Penyimpanan Blob Azure dengan nama file unik yang dihasilkan secara dinamis (misalnya, '@concat(pipeline(). RunId,'.json'). Karena file teks kami tidak benar-benar "dibatasi" dan kami tidak ingin diuraikan ke dalam kolom terpisah menggunakan koma dan ingin mempertahankan tanda kutip ganda ("), kami mengatur "Pembatas kolom" ke Tab ("\t") - atau karakter lain yang tidak muncul dalam data - dan "Karakter kutipan" ke "Tidak ada karakter kutipan".
Salin Aktivitas #2: BlobJsonToCosmos
Selanjutnya, kami memodifikasi pipeline ADF kami dengan menambahkan Copy Activity kedua yang terlihat di Penyimpanan Blob Azure untuk file teks yang dibuat oleh aktivitas pertama. Ini memprosesnya sebagai sumber "JSON" untuk disisipkan ke Cosmos DB tenggelam sebagai satu dokumen per baris JSON yang ditemukan dalam file teks.
Secara opsional, kami juga menambahkan aktivitas "Hapus" ke pipeline sehingga menghapus semua file sebelumnya yang tersisa di folder /Orders/ sebelum setiap run. Saluran pipa ADF kami sekarang terlihat seperti ini:
Setelah kami memicu pipeline di atas, kami melihat file yang dibuat di lokasi Penyimpanan Blob Azure perantara kami yang berisi satu objek JSON per baris:
Kami juga melihat dokumen Pesanan dengan OrderDetail yang disematkan dengan benar dimasukkan ke dalam koleksi Cosmos DB kami:
Azure Databricks
Kami juga dapat menggunakan Spark di Azure Databricks untuk menyalin data dari sumber Database SQL kami ke tujuan Azure Cosmos DB tanpa membuat teks perantara / file JSON di Penyimpanan Azure Blob.
Catatan
Untuk kejelasan dan kesederhanaan, cuplikan kode di bawah ini menyertakan kata sandi database dummy secara eksplisit sebaris, tetapi Anda harus selalu menggunakan rahasia Azure Databricks.
Pertama, kami membuat dan melampirkan konektor SQL yang diperlukandan pustaka konektor Azure Cosmos DB ke klaster Azure Databricks kami. Mulai ulang kluster untuk memastikan pustaka dimuat.
Selanjutnya, kami menyajikan dua sampel, untuk Scala dan Python.
Scala
Di sini, kita mendapatkan hasil kueri SQL dengan output "FOR JSON" ke dalam DataFrame:
// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
"url" -> "xxxx.database.windows.net",
"databaseName" -> "xxxx",
"queryCustom" -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
"user" -> "xxxx",
"password" -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)
Selanjutnya, kami terhubung ke database dan koleksi Cosmos DB kami:
// Connect to Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
"Endpoint" -> "https://xxxx.documents.azure.com:443/",
// NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
"Masterkey" -> "xxxx",
"Database" -> "StoreDatabase",
"Collection" -> "Orders")
val configCosmos = Config(configMap)
Akhirnya, kami mendefinisikan skema kami dan from_json untuk menerapkan DataFrame sebelum menyimpannya ke koleksi CosmosDB.
// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
StructField("OrderDetailId",IntegerType,true),
StructField("ProductId",IntegerType,true),
StructField("UnitPrice",DoubleType,true),
StructField("Quantity",IntegerType,true)
)))
val ordersWithSchema = orders.select(
col("OrderId").cast("string").as("id"),
col("OrderDate").cast("string"),
col("FirstName").cast("string"),
col("LastName").cast("string"),
col("Address").cast("string"),
col("City").cast("string"),
col("State").cast("string"),
col("PostalCode").cast("string"),
col("Country").cast("string"),
col("Phone").cast("string"),
col("Total").cast("double"),
from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)
Python
Sebagai pendekatan alternatif, Anda mungkin perlu menjalankan transformasi JSON di Spark (jika database sumber tidak mendukung "UNTUK JSON" atau operasi serupa), atau Anda mungkin ingin menggunakan operasi paralel untuk kumpulan data yang sangat besar. Di sini kami menyajikan sampel PySpark. Mulai dengan mengonfigurasi koneksi basis data sumber dan target di sel pertama:
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
#Connect details for Target Azure Cosmos DB SQL API account
writeConfig = {
"Endpoint": "Endpoint",
"Masterkey": "Masterkey",
"Database": "OrdersDB",
"Collection": "Orders",
"Upsert": "true"
}
Kemudian, kami akan meminta Database sumber (dalam hal ini SQL Server) untuk rekaman detail pesanan dan pesanan, memasukkan hasilnya ke dalam Spark Dataframes. Kami juga akan membuat daftar yang berisi semua dokumen pesanan, dan kumpulan Thread untuk operasi paralel:
import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
#get all OrderId values to pass to map function
orderids = orders.select('OrderId').collect()
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)
Kemudian, buat fungsi untuk menulis Pesanan ke dalam koleksi SQL API target. Fungsi ini akan memfilter semua detail pesanan untuk ID pesanan yang diberikan, mengonversinya menjadi array JSON, dan memasukkan array ke dalam dokumen JSON yang akan kami tulis ke dalam target SQL API Collection untuk pesanan tersebut:
def writeOrder(orderid):
#filter the order on current value passed from map function
order = orders.filter(orders['OrderId'] == orderid[0])
#set id to be a uuid
order = order.withColumn("id", lit(str(uuid.uuid1())))
#add details field to order dataframe
order = order.withColumn("details", lit(''))
#filter order details dataframe to get details we want to merge into the order document
orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
#convert dataframe to pandas
orderpandas = order.toPandas()
#convert the order dataframe to json and remove enclosing brackets
orderjson = orderpandas.to_json(orient='records', force_ascii=False)
orderjson = orderjson[1:-1]
#convert orderjson to a dictionaory so we can set the details element with order details later
orderjsondata = json.loads(orderjson)
#convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
if (orderdetailsgroup.count() !=0):
#convert orderdetailsgroup to pandas dataframe to work better with json
orderdetailsgroup = orderdetailsgroup.toPandas()
#convert orderdetailsgroup to json string
jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
#convert jsonstring to dictionary to ensure correct encoding and no corrupt records
jsonstring = json.loads(jsonstring)
#set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order
orderjsondata['details'] = jsonstring
#convert dictionary to json
orderjsondata = json.dumps(orderjsondata)
#read the json into spark dataframe
df = spark.read.json(sc.parallelize([orderjsondata]))
#write the dataframe (this will be a single order record with merged many-to-one order details) to cosmos db using spark the connector
#https://docs.microsoft.com/azure/cosmos-db/spark-connector
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()
Akhirnya, kita akan memanggil hal di atas menggunakan fungsi peta pada kumpulan benang, untuk mengeksekusi secara paralel, melewati daftar kartu pesanan yang kami buat sebelumnya:
#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)
Dalam kedua pendekatan, pada akhirnya, kita harus mendapatkan OrderDetails tertanam yang disimpan dengan benar dalam setiap dokumen Order dalam koleksi Cosmos DB:
Langkah berikutnya
- Pelajari tentang pemodelan data dalam Azure Cosmos DB
- Pelajari cara memodelkan dan mempartisi data di Azure Cosmos DB
