Migrasikan satu hingga beberapa data relasional ke akun Azure Cosmos DB SQL API

BERLAKU UNTUK: SQL API

Untuk bermigrasi dari basis data relasional ke Azure Cosmos DB SQL API, anda perlu membuat perubahan pada model data untuk pengoptimalan.

Salah satu transformasi umum adalah denormalisasi data dengan menyematkan subitem terkait dalam satu dokumen JSON. Di sini kita melihat beberapa opsi untuk ini menggunakan Azure Data Factory atau Azure Databricks. Untuk panduan umum tentang pemodelan data untuk Cosmos DB, silahkan tinjau Pemodelan data di Azure Cosmos DB.

Contoh Skenario

Asumsikan kita memiliki dua tabel berikut dalam basis data SQL, Pesanan, dan OrderDetails kami.

Screenshot that shows the Orders and OrderDetails tables in the SQL database.

Kami ingin menggabungkan satu hingga beberapa hubungan ini ke dalam satu dokumen JSON selama migrasi. Untuk melakukan ini, kita dapat membuat kueri T-SQL menggunakan "FOR JSON" seperti di bawah ini:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

Hasil kueri ini akan terlihat seperti di bawah ini:

Order Details

Idealnya, Anda ingin menggunakan satu aktivitas salinan Azure Data Factory (ADF) untuk mengkueri data SQL sebagai sumber dan menulis output langsung ke Azure Cosmos DB tenggelam sebagai objek JSON yang tepat. Saat ini, tidak mungkin untuk melakukan transformasi JSON yang diperlukan dalam satu aktivitas salinan. Jika kita mencoba menyalin hasil kueri di atas ke dalam kontainer Azure Cosmos DB SQL API, kita akan melihat bidang OrderDetails sebagai properti string dokumen kami, bukan array JSON yang diharapkan.

Kita dapat mengatasi batasan saat ini dengan salah satu cara berikut:

  • Gunakan Azure Data Factory dengan dua aktivitas salin:

    1. Dapatkan data berformat JSON dari SQL ke file teks di lokasi penyimpanan gumpalan perantara, dan
    2. Muat data dari file teks JSON ke kontainer di Azure Cosmos DB.
  • Gunakan Azure Databricks untuk membaca dari SQL dan menulis ke Azure Cosmos DB - kami akan menyajikan dua opsi di sini.

Mari kita lihat pendekatan ini secara lebih rinci:

Azure Data Factory

Meskipun kita tidak dapat menanamkan OrderDetails sebagai JSON-array di dokumen Cosmos DB tujuan, kita dapat mengatasi masalah dengan menggunakan dua Aktivitas Salinan terpisah.

Salin Aktivitas #1: SqlJsonToBlobText

Untuk data sumber, kami menggunakan kueri SQL untuk mendapatkan kumpulan hasil sebagai kolom tunggal dengan satu objek JSON (mewakili Pesanan) per baris menggunakan kemampuan SQL Server OPENJSON dan FOR JSON PATH:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

ADF copy

Untuk wastafel aktivitas salinan SqlJsonToBlobText, kami memilih "Teks Dibatasi" dan mengarahkannya ke folder tertentu di Penyimpanan Blob Azure dengan nama file unik yang dihasilkan secara dinamis (misalnya, '@concat(pipeline(). RunId,'.json'). Karena file teks kami tidak benar-benar "dibatasi" dan kami tidak ingin diuraikan ke dalam kolom terpisah menggunakan koma dan ingin mempertahankan tanda kutip ganda ("), kami mengatur "Pembatas kolom" ke Tab ("\t") - atau karakter lain yang tidak muncul dalam data - dan "Karakter kutipan" ke "Tidak ada karakter kutipan".

Screenshot that highlights the Column delimiter and Quote character settings.

Salin Aktivitas #2: BlobJsonToCosmos

Selanjutnya, kami memodifikasi pipeline ADF kami dengan menambahkan Copy Activity kedua yang terlihat di Penyimpanan Blob Azure untuk file teks yang dibuat oleh aktivitas pertama. Ini memprosesnya sebagai sumber "JSON" untuk disisipkan ke Cosmos DB tenggelam sebagai satu dokumen per baris JSON yang ditemukan dalam file teks.

Screenshot that highlights the JSON source file and the File path fields.

Secara opsional, kami juga menambahkan aktivitas "Hapus" ke pipeline sehingga menghapus semua file sebelumnya yang tersisa di folder /Orders/ sebelum setiap run. Saluran pipa ADF kami sekarang terlihat seperti ini:

Screenshot that highlights the Delete activity.

Setelah kami memicu pipeline di atas, kami melihat file yang dibuat di lokasi Penyimpanan Blob Azure perantara kami yang berisi satu objek JSON per baris:

Screenshot that shows the created file that contains the JSON objects.

Kami juga melihat dokumen Pesanan dengan OrderDetail yang disematkan dengan benar dimasukkan ke dalam koleksi Cosmos DB kami:

Screenshot that shows the order details as a part of the Cosmos DB document

Azure Databricks

Kami juga dapat menggunakan Spark di Azure Databricks untuk menyalin data dari sumber Database SQL kami ke tujuan Azure Cosmos DB tanpa membuat teks perantara / file JSON di Penyimpanan Azure Blob.

Catatan

Untuk kejelasan dan kesederhanaan, cuplikan kode di bawah ini menyertakan kata sandi database dummy secara eksplisit sebaris, tetapi Anda harus selalu menggunakan rahasia Azure Databricks.

Pertama, kami membuat dan melampirkan konektor SQL yang diperlukandan pustaka konektor Azure Cosmos DB ke klaster Azure Databricks kami. Mulai ulang kluster untuk memastikan pustaka dimuat.

Screenshot that shows where to create and attach the required SQL connector and Azure Cosmos DB connector libraries to our Azure Databricks cluster.

Selanjutnya, kami menyajikan dua sampel, untuk Scala dan Python.

Scala

Di sini, kita mendapatkan hasil kueri SQL dengan output "FOR JSON" ke dalam DataFrame:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Screenshot that shows the SQL query output in a DataFrame.

Selanjutnya, kami terhubung ke database dan koleksi Cosmos DB kami:

// Connect to Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configCosmos = Config(configMap)

Akhirnya, kami mendefinisikan skema kami dan from_json untuk menerapkan DataFrame sebelum menyimpannya ke koleksi CosmosDB.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Screenshot that highlights the proper array for saving to a Cosmos DB collection.

Python

Sebagai pendekatan alternatif, Anda mungkin perlu menjalankan transformasi JSON di Spark (jika database sumber tidak mendukung "UNTUK JSON" atau operasi serupa), atau Anda mungkin ingin menggunakan operasi paralel untuk kumpulan data yang sangat besar. Di sini kami menyajikan sampel PySpark. Mulai dengan mengonfigurasi koneksi basis data sumber dan target di sel pertama:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB SQL API account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

Kemudian, kami akan meminta Database sumber (dalam hal ini SQL Server) untuk rekaman detail pesanan dan pesanan, memasukkan hasilnya ke dalam Spark Dataframes. Kami juga akan membuat daftar yang berisi semua dokumen pesanan, dan kumpulan Thread untuk operasi paralel:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

Kemudian, buat fungsi untuk menulis Pesanan ke dalam koleksi SQL API target. Fungsi ini akan memfilter semua detail pesanan untuk ID pesanan yang diberikan, mengonversinya menjadi array JSON, dan memasukkan array ke dalam dokumen JSON yang akan kami tulis ke dalam target SQL API Collection untuk pesanan tersebut:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to cosmos db using spark the connector
  #https://docs.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

Akhirnya, kita akan memanggil hal di atas menggunakan fungsi peta pada kumpulan benang, untuk mengeksekusi secara paralel, melewati daftar kartu pesanan yang kami buat sebelumnya:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

Dalam kedua pendekatan, pada akhirnya, kita harus mendapatkan OrderDetails tertanam yang disimpan dengan benar dalam setiap dokumen Order dalam koleksi Cosmos DB:

Databricks

Langkah berikutnya