您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

将一对多关系数据迁移到 Azure Cosmos DB SQL API 帐户中Migrate one-to-few relational data into Azure Cosmos DB SQL API account

适用于: SQL API

若要从关系数据库迁移到 Azure Cosmos DB SQL API,可能需要更改数据模型以进行优化。In order to migrate from a relational database to Azure Cosmos DB SQL API, it can be necessary to make changes to the data model for optimization.

一种常见的转换方法是,通过将相关子项嵌入到一个 JSON 文档来反规范化数据。One common transformation is denormalizing data by embedding related subitems within one JSON document. 本文探讨使用 Azure 数据工厂或 Azure Databricks 实现此目的的几个选项。Here we look at a few options for this using Azure Data Factory or Azure Databricks. 有关 Cosmos DB 的数据建模的一般指导,请查看 Azure Cosmos DB 中的数据建模For general guidance on data modeling for Cosmos DB, please review Data modeling in Azure Cosmos DB.

示例方案Example Scenario

假设 SQL 数据库中包含以下两个表:Orders 和 OrderDetails。Assume we have the following two tables in our SQL database, Orders and OrderDetails.

屏幕截图,显示 SQL 数据库中的 Orders 和 OrderDetails 表。

我们希望在迁移期间,将此一对多关系合并到一个 JSON 文档中。We want to combine this one-to-few relationship into one JSON document during migration. 为此,我们可以按如下所示,使用“FOR JSON”创建一个 T-SQL 查询:To do this, we can create a T-SQL query using "FOR JSON" as below:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

此查询的结果如下所示:The results of this query would look as below:

订单详细信息

理想情况下,你希望使用单个 Azure 数据工厂 (ADF) 复制活动来查询用作源的 SQL 数据,并将输出作为适当的 JSON 对象直接写入 Azure Cosmos DB 接收器。Ideally, you want to use a single Azure Data Factory (ADF) copy activity to query SQL data as the source and write the output directly to Azure Cosmos DB sink as proper JSON objects. 目前,无法在一个复制活动中执行所需的 JSON 转换。Currently, it is not possible to perform the needed JSON transformation in one copy activity. 如果我们尝试将上述查询的结果复制到 Azure Cosmos DB SQL API 容器中,将会看到文档的字符串属性形式的 OrderDetails 字段,而不是预期的 JSON 数组。If we try to copy the results of the above query into an Azure Cosmos DB SQL API container, we will see the OrderDetails field as a string property of our document, instead of the expected JSON array.

可通过以下方式之一解决当前的这种限制:We can work around this current limitation in one of the following ways:

  • 使用包含两个复制活动的 Azure 数据工厂Use Azure Data Factory with two copy activities:

    1. 将 SQL 中的 JSON 格式的数据提取到位于中间 Blob 存储位置的某个文本文件,并Get JSON-formatted data from SQL to a text file in an intermediary blob storage location, and
    2. 将 JSON 文本文件中的数据加载到 Azure Cosmos DB 中的某个容器。Load data from the JSON text file to a container in Azure Cosmos DB.
  • 使用 Azure Databricks 从 SQL 中读取数据并将其写入 Azure Cosmos DB - 我们将演示这两个选项。Use Azure Databricks to read from SQL and write to Azure Cosmos DB - we will present two options here.

让我们更详细地了解这些方法:Let’s look at these approaches in more detail:

Azure 数据工厂Azure Data Factory

尽管我们无法将 OrderDetails 作为 JSON 数组嵌入到目标 Cosmos DB 文档中,但可以使用两个独立的复制活动来解决该问题。Although we cannot embed OrderDetails as a JSON-array in the destination Cosmos DB document, we can work around the issue by using two separate Copy Activities.

复制活动 #1:SqlJsonToBlobTextCopy Activity #1: SqlJsonToBlobText

对于源数据,我们使用 SQL 查询通过 SQL Server OPENJSON 和 FOR JSON PATH 功能获取结果集,该结果集以单列的形式提供,每行包含一个 JSON 对象(表示订单):For the source data, we use a SQL query to get the result set as a single column with one JSON object (representing the Order) per row using the SQL Server OPENJSON and FOR JSON PATH capabilities:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

ADF 复制

对于 SqlJsonToBlobText 复制活动的接收器,我们选择“分隔文本”,并使用动态生成的唯一文件名(例如,'@concat(pipeline().RunId,'.json')将其指向 Azure Blob 存储中的特定文件夹。For the sink of the SqlJsonToBlobText copy activity, we choose "Delimited Text" and point it to a specific folder in Azure Blob Storage with a dynamically generated unique file name (for example, '@concat(pipeline().RunId,'.json'). 由于我们的文本文件实际上并不是“分隔的”,并且我们不希望使用逗号将其分析成单独的列,而是要保留双引号 ("),因此我们将“列分隔符”设置为制表符 ("\t") 或数据中未出现其他字符,并将“引号字符”设置为“无引号字符”。Since our text file is not really "delimited" and we do not want it to be parsed into separate columns using commas and want to preserve the double-quotes ("), we set "Column delimiter" to a Tab ("\t") - or another character not occurring in the data - and "Quote character" to "No quote character".

屏幕截图,突出显示了“列分隔符”和“引号字符”设置。

复制活动 #2:BlobJsonToCosmosCopy Activity #2: BlobJsonToCosmos

接下来,我们修改 ADF 管道:添加第二个复制活动,用于在 Azure Blob 存储中查找第一个活动创建的文本文件。Next, we modify our ADF pipeline by adding the second Copy Activity that looks in Azure Blob Storage for the text file that was created by the first activity. 第二个复制活动将结果作为“JSON”源进行处理,将文本文件中找到的每个 JSON 行作为一个文档插入到 Cosmos DB 接收器中。It processes it as "JSON" source to insert to Cosmos DB sink as one document per JSON-row found in the text file.

屏幕截图,突出显示了“JSON 源文件”和“文件路径”字段。

(可选)我们还将一个“删除”活动添加到了管道,以便在每次运行之前删除 /Orders/ 文件夹中剩余的所有旧文件。Optionally, we also add a "Delete" activity to the pipeline so that it deletes all of the previous files remaining in the /Orders/ folder prior to each run. 现在,我们的 ADF 管道如下所示:Our ADF pipeline now looks something like this:

突出显示“删除”活动的屏幕截图。

触发上述管道后,会看到中间 Azure Blob 存储位置创建了一个文件,其中的每行包含一个 JSON 对象:After we trigger the pipeline above, we see a file created in our intermediary Azure Blob Storage location containing one JSON-object per row:

屏幕截图,显示了包含 JSON 对象的已创建文件。

我们还会看到 Orders 文档,其中适当嵌入的 OrderDetails 已插入到 Cosmos DB 集合中:We also see Orders documents with properly embedded OrderDetails inserted into our Cosmos DB collection:

屏幕截图,显示了作为 Cosmos DB 文档一部分的订单详细信息

Azure DatabricksAzure Databricks

我们还可以在 Azure Databricks 中使用 Spark,将 SQL 数据库源中的数据复制到 Azure Cosmos DB 目标,而无需在 Azure Blob 存储中创建中间文本/JSON 文件。We can also use Spark in Azure Databricks to copy the data from our SQL Database source to the Azure Cosmos DB destination without creating the intermediary text/JSON files in Azure Blob Storage.

备注

为清楚起见,以下代码片段显式包含了虚拟数据库密码,但用户应始终使用 Azure Databricks 的机密。For clarity and simplicity, the code snippets below include dummy database passwords explicitly inline, but you should always use Azure Databricks secrets.

首先,创建所需的 SQL 连接器 并将 Azure Cosmos DB 连接器库并连接到 Azure Databricks 群集。First, we create and attach the required SQL connector and Azure Cosmos DB connector libraries to our Azure Databricks cluster. 重启群集以确保加载库。Restart the cluster to make sure libraries are loaded.

屏幕截图,显示了在何处创建所需的 SQL 连接器并将 Azure Cosmos DB 连接器库连接到 Azure Databricks 群集。

接下来,我们提供了 Scala 和 Python 的两个示例。Next, we present two samples, for Scala and Python.

ScalaScala

在这里,我们从数据帧中获取输出为“FOR JSON”的 SQL 查询结果:Here, we get the results of the SQL query with “FOR JSON” output into a DataFrame:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

屏幕截图,显示了数据帧中的 SQL 查询输出。

接下来,我们将连接到 Cosmos DB 数据库和集合:Next, we connect to our Cosmos DB database and collection:

// Connect to Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configCosmos = Config(configMap)

最后,我们定义架构并使用 from_json 应用数据帧,然后将数据帧保存到 CosmosDB 集合。Finally, we define our schema and use from_json to apply the DataFrame prior to saving it to the CosmosDB collection.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

屏幕截图,突出显示了保存到 Cosmos DB 集合的正确数组。

PythonPython

作为替代方法,可能需要在 Spark 中执行 JSON 转换(如果源数据库不支持“FOR JSON”或类似的操作),或者可能希望对非常大的数据集使用并行操作。As an alternative approach, you may need to execute JSON transformations in Spark (if the source database does not support "FOR JSON" or a similar operation), or you may wish to use parallel operations for a very large data set. 这里提供了一个 PySpark 示例。Here we present a PySpark sample. 首先配置第一个单元中的源数据库和目标数据库连接:Start by configuring the source and target database connections in the first cell:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB SQL API account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

然后,我们将查询源数据库(在本案例中为 SQL Server)中的订单及订单明细记录,并将结果放入 Spark 数据帧。Then, we will query the source Database (in this case SQL Server) for both the order and order detail records, putting the results into Spark Dataframes. 我们还将创建一个列表,其中包含所有订单 ID 和并行操作的线程池:We will also create a list containing all the order IDs, and a Thread pool for parallel operations:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

然后,创建一个函数用于将 Orders 写入目标 SQL API 集合。Then, create a function for writing Orders into the target SQL API collection. 此函数将根据给定订单 ID 筛选所有订单明细,将其转换为 JSON 数组,然后将该数组插入 JSON 文档中,我们对该订单将文档写入目标 SQL API 集合中:This function will filter all order details for the given order ID, convert them into a JSON array, and insert the array into a JSON document that we will write into the target SQL API Collection for that order:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to cosmos db using spark the connector
  #https://docs.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

最后,我们将在线程池上使用映射函数调用上述函数并行执行,传入之前创建的订单 ID 列表:Finally, we will call the above using a map function on the thread pool, to execute in parallel, passing in the list of order IDs we created earlier:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

在这两种方法中,最终都将在 Cosmos DB 集合中的每个 Order 文档中获得正确保存的嵌入 OrderDetails:In either approach, at the end, we should get properly saved embedded OrderDetails within each Order document in Cosmos DB collection:

Databricks

后续步骤Next steps