Share via


Azure Cosmos DB-gegevens kopiëren naar een toegewezen SQL-pool, met behulp van Apache Spark

Met Azure Synapse Link voor Azure Cosmos DB kunnen gebruikers in bijna realtime analyses uitvoeren via operationele gegevens in Azure Cosmos DB. Er zijn echter momenten waarop sommige gegevens moeten worden geaggregeerd en verrijkt zodat datawarehouse-gebruikers ze kunnen gebruiken. Het cureren en exporteren van gegevens Azure Synapse Link kan worden uitgevoerd met slechts een paar cellen in een notebook.

Vereisten

Stappen

In deze zelfstudie gaat u verbinding maken met de analytische opslag, zodat de transactionele opslag niet wordt beïnvloed (er worden geen aanvraageenheden gebruikt). De volgende stappen worden uitgevoerd:

  1. De Azure Cosmos DB HTAP-container in een Spark-gegevensframe lezen
  2. De resultaten aggregeren in een nieuwe dataframe
  3. De gegevens opnemen in een toegewezen SQL-pool

Apache Spark naar SQL: stappen 1

Gegevens

In dit voorbeeld wordt een HTAP-container met de naam RetailSales gebruikt. Deze maakt deel uit van een gekoppelde service met de naam ConnectedData en heeft het volgende schema:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

U gaat de verkoop (quantity, revenue (price x quantity)) aggregeren met productCode en weekStarting voor rapportagedoeleinden. Ten slotte exporteren we die gegevens naar een toegewezen SQL-pooltabel met de naam dbo.productsales.

Een Apache Spark-notebook configureren

Maak een Apache Spark-notebook met Scala as Spark (Scala) als hoofdtaal. U gebruikt de standaardinstelling van de notebook voor deze sessie.

De gegevens in Apache Spark uitlezen

Lees de Azure Cosmos DB HTAP-container met Spark in een dataframe in de eerste cel.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

De resultaten aggregeren in een nieuwe dataframe

In de tweede cel voert u de transformatie en aggregaties uit die nodig zijn voor de nieuwe dataframe, vóórdat u deze in een toegewezen SQL-pooldatabase laadt.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

De resultaten laden in een toegewezen SQL-pool

In de derde cel laadt u de gegevens in een toegewezen SQL-pool. Er worden automatisch een tijdelijke externe tabel, externe gegevensbron en externe bestandsindeling gemaakt, die worden verwijderd zodra de taak is uitgevoerd.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Query's uitvoeren op de resultaten met SQL

U kunt query's uitvoeren op het resultaat met een eenvoudige SQL-query, zoals het volgende SQL-script:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

De query geeft de volgende resultaten weer in een grafiekmodus: Spark naar SQL Stappen 2

Volgende stappen