Migrowanie danych relacyjnych jeden do kilku do konta usługi Azure Cosmos DB for NoSQL

DOTYCZY: NoSQL

Aby przeprowadzić migrację z relacyjnej bazy danych do usługi Azure Cosmos DB for NoSQL, konieczne może być wprowadzenie zmian w modelu danych na potrzeby optymalizacji.

Jedną z typowych przekształceń jest denormalizowanie danych przez osadzanie powiązanych podwitami w jednym dokumencie JSON. W tym miejscu przyjrzymy się kilku opcjom korzystania z usługi Azure Data Factory lub Azure Databricks. Aby uzyskać więcej informacji na temat modelowania danych dla usługi Azure Cosmos DB, zobacz Modelowanie danych w usłudze Azure Cosmos DB.

Przykładowy scenariusz

Załóżmy, że mamy dwie poniższe tabele w bazie danych SQL Database, Orders i OrderDetails.

Screenshot that shows the Orders and OrderDetails tables in the SQL database.

Chcemy połączyć tę relację jeden do kilku w jeden dokument JSON podczas migracji. Aby utworzyć pojedynczy dokument, utwórz zapytanie T-SQL przy użyciu polecenia FOR JSON:

SELECT
  o.OrderID,
  o.OrderDate,
  o.FirstName,
  o.LastName,
  o.Address,
  o.City,
  o.State,
  o.PostalCode,
  o.Country,
  o.Phone,
  o.Total,
  (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
FROM Orders o;

Wyniki tego zapytania obejmują dane z tabeli Orders ( Zamówienia):

Screenshot of a query that results in details of various orders.

W idealnym przypadku chcesz użyć pojedynczego działania kopiowania usługi Azure Data Factory (ADF), aby wysyłać zapytania do danych SQL jako źródła i zapisywać dane wyjściowe bezpośrednio w ujściu usługi Azure Cosmos DB jako odpowiednie obiekty JSON. Obecnie nie można wykonać wymaganej transformacji JSON w jednym działaniu kopiowania. Jeśli spróbujemy skopiować wyniki powyższego zapytania do kontenera usługi Azure Cosmos DB for NoSQL, zobaczymy pole OrderDetails jako właściwość string naszego dokumentu zamiast oczekiwanej tablicy JSON.

Możemy obejść to bieżące ograniczenie w jeden z następujących sposobów:

  • Użyj usługi Azure Data Factory z dwoma działaniami kopiowania:
    1. Pobieranie danych sformatowanych w formacie JSON z bazy danych SQL do pliku tekstowego w pośredniej lokalizacji magazynu obiektów blob
    2. Ładowanie danych z pliku tekstowego JSON do kontenera w usłudze Azure Cosmos DB.
  • Użyj usługi Azure Databricks do odczytu z bazy danych SQL i zapisu w usłudze Azure Cosmos DB — przedstawimy tutaj dwie opcje.

Przyjrzyjmy się tym metodom bardziej szczegółowo:

Azure Data Factory

Chociaż nie możemy osadzić elementu OrderDetails jako tablicy JSON w docelowym dokumencie usługi Azure Cosmos DB, możemy obejść ten problem przy użyciu dwóch oddzielnych działań kopiowania.

Działanie kopiowania nr 1: SqlJsonToBlobText

W przypadku danych źródłowych użyjemy zapytania SQL, aby uzyskać zestaw wyników jako pojedynczą kolumnę z jednym obiektem JSON (reprezentującym kolejność) na wiersz przy użyciu funkcji SQL Server OPENJSON i FOR JSON PATH:

SELECT [value] FROM OPENJSON(
  (SELECT
    id = o.OrderID,
    o.OrderDate,
    o.FirstName,
    o.LastName,
    o.Address,
    o.City,
    o.State,
    o.PostalCode,
    o.Country,
    o.Phone,
    o.Total,
    (select OrderDetailId, ProductId, UnitPrice, Quantity from OrderDetails od where od.OrderId = o.OrderId for json auto) as OrderDetails
   FROM Orders o FOR JSON PATH)
)

Screenshot of the preview values in the ADF copy operation.

W przypadku ujścia SqlJsonToBlobText działania kopiowania wybieramy pozycję "Rozdzielany tekst" i wskazujemy go na określony folder w usłudze Azure Blob Storage. Ten ujście zawiera dynamicznie wygenerowaną unikatową nazwę pliku (na przykład @concat(pipeline().RunId,'.json'). Ponieważ nasz plik tekstowy nie jest naprawdę "rozdzielany" i nie chcemy, aby był analizowany w osobnych kolumnach przy użyciu przecinków. Chcemy również zachować podwójne cudzysłowy ("), ustawić ogranicznik kolumny na tabulator ("\t") — lub inny znak, który nie występuje w danych, a następnie ustaw wartość "Znak cudzysłowu" na "Brak znaku cudzysłowu".

Screenshot that highlights the Column delimiter and Quote character settings.

Działanie kopiowania nr 2: BlobJsonToCosmos

Następnie zmodyfikujemy potok usługi ADF, dodając drugie działanie kopiowania, które wyszukuje w usłudze Azure Blob Storage dla pliku tekstowego utworzonego przez pierwsze działanie. Przetwarza go jako źródło "JSON", aby wstawić go do ujścia usługi Azure Cosmos DB jako jeden dokument na wiersz JSON znaleziony w pliku tekstowym.

Screenshot that highlights the JSON source file and the File path fields.

Opcjonalnie do potoku dodamy również działanie "Usuń", aby usuwać wszystkie poprzednie pliki pozostałe w folderze /Orders/ przed każdym uruchomieniem. Potok usługi ADF wygląda teraz mniej więcej tak:

Screenshot that highlights the Delete activity.

Po wyzwoleniu wspomnianego wcześniej potoku zobaczymy plik utworzony w lokalizacji pośredniej usługi Azure Blob Storage zawierający jeden obiekt JSON na wiersz:

Screenshot that shows the created file that contains the JSON objects.

Widzimy również dokumenty Orders z poprawnie osadzonymi elementami OrderDetails wstawianymi do kolekcji usługi Azure Cosmos DB:

Screenshot that shows the order details as a part of the Azure Cosmos DB document

Azure Databricks

Możemy również użyć platformy Spark w usłudze Azure Databricks , aby skopiować dane ze źródła usługi SQL Database do miejsca docelowego usługi Azure Cosmos DB bez tworzenia pośredniczących plików tekstowych/JSON w usłudze Azure Blob Storage.

Uwaga

Aby uzyskać czytelność i prostotę, fragmenty kodu zawierają jawnie wbudowane fikcyjne hasła bazy danych, ale najlepiej używać wpisów tajnych usługi Azure Databricks.

Najpierw utworzymy i dołączymy wymagane biblioteki łącznika SQL i łącznika usługi Azure Cosmos DB do klastra usługi Azure Databricks. Uruchom ponownie klaster, aby upewnić się, że biblioteki zostały załadowane.

Screenshot that shows where to create and attach the required SQL connector and Azure Cosmos DB connector libraries to our Azure Databricks cluster.

Następnie przedstawimy dwa przykłady dla języków Scala i Python.

Scala

W tym miejscu uzyskujemy wyniki zapytania SQL z danymi wyjściowymi "FOR JSON" do ramki danych:

// Connect to Azure SQL /connectors/sql/
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val configSql = Config(Map(
  "url"          -> "xxxx.database.windows.net",
  "databaseName" -> "xxxx",
  "queryCustom"  -> "SELECT o.OrderID, o.OrderDate, o.FirstName, o.LastName, o.Address, o.City, o.State, o.PostalCode, o.Country, o.Phone, o.Total, (SELECT OrderDetailId, ProductId, UnitPrice, Quantity FROM OrderDetails od WHERE od.OrderId = o.OrderId FOR JSON AUTO) as OrderDetails FROM Orders o",
  "user"         -> "xxxx", 
  "password"     -> "xxxx" // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
))
// Create DataFrame from Azure SQL query
val orders = sqlContext.read.sqlDB(configSql)
display(orders)

Screenshot that shows the SQL query output in a DataFrame.

Następnie nawiążmy połączenie z naszą bazą danych i kolekcją usługi Azure Cosmos DB:

// Connect to Azure Cosmos DB https://docs.databricks.com/data/data-sources/azure/cosmosdb-connector.html
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.functions._
val configMap = Map(
  "Endpoint" -> "https://xxxx.documents.azure.com:443/",
  // NOTE: For clarity and simplicity, this example includes secrets explicitely as a string, but you should always use Databricks secrets
  "Masterkey" -> "xxxx",
  "Database" -> "StoreDatabase",
  "Collection" -> "Orders")
val configAzure Cosmos DB= Config(configMap)

Na koniec definiujemy nasz schemat i używamy from_json do zastosowania ramki danych przed zapisaniem go w kolekcji usługi Cosmos DB.

// Convert DataFrame to proper nested schema
import org.apache.spark.sql.types._
val orderDetailsSchema = ArrayType(StructType(Array(
    StructField("OrderDetailId",IntegerType,true),
    StructField("ProductId",IntegerType,true),
    StructField("UnitPrice",DoubleType,true),
    StructField("Quantity",IntegerType,true)
  )))
val ordersWithSchema = orders.select(
  col("OrderId").cast("string").as("id"),
  col("OrderDate").cast("string"),
  col("FirstName").cast("string"),
  col("LastName").cast("string"),
  col("Address").cast("string"),
  col("City").cast("string"),
  col("State").cast("string"),
  col("PostalCode").cast("string"),
  col("Country").cast("string"),
  col("Phone").cast("string"),
  col("Total").cast("double"),
  from_json(col("OrderDetails"), orderDetailsSchema).as("OrderDetails")
)
display(ordersWithSchema)
// Save nested data to Azure Cosmos DB
CosmosDBSpark.save(ordersWithSchema, configCosmos)

Screenshot that highlights the proper array for saving to an Azure Cosmos DB collection.

Python

Alternatywną metodą może być wykonanie przekształceń JSON na platformie Spark, jeśli źródłowa baza danych nie obsługuje FOR JSON lub podobna operacja. Alternatywnie można użyć operacji równoległych dla dużego zestawu danych. W tym miejscu przedstawiamy przykład PySpark. Zacznij od skonfigurowania źródłowych i docelowych połączeń bazy danych w pierwszej komórce:

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
 
#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"
 
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
 
#Connect details for Target Azure Cosmos DB for NoSQL account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

Następnie wysyłamy zapytanie do źródłowej bazy danych (w tym przypadku programu SQL Server) zarówno dla rekordów szczegółów zamówienia, jak i zamówienia, umieszczając wyniki w ramkach danych platformy Spark. Tworzymy również listę zawierającą wszystkie identyfikatory zamówień i pulę wątków dla operacji równoległych:

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool
 
#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)
 
#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)
 
#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()
 
#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

Następnie utwórz funkcję do zapisywania zamówień w docelowym interfejsie API dla kolekcji NoSQL. Ta funkcja filtruje wszystkie szczegóły zamówienia dla danego identyfikatora zamówienia, konwertuje je na tablicę JSON i wstawia tablicę do dokumentu JSON. Dokument JSON jest następnie zapisywany w docelowym interfejsie API dla kontenera NoSQL dla tej kolejności:

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)
 
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to Azure Cosmos DB db using spark the connector
  #https://learn.microsoft.com/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

Na koniec wywołujemy funkcję języka Python writeOrder przy użyciu funkcji mapy w puli wątków, aby wykonać równolegle, przekazując listę utworzonych wcześniej identyfikatorów zamówień:

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

W obu metodach na końcu powinniśmy prawidłowo zapisać osadzony element OrderDetails w ramach każdego dokumentu Order w kolekcji usługi Azure Cosmos DB:

Screenshot of the resulting data after migration.

Następne kroki