如何使用分割區剪除來改善 Delta Lake MERGE INTO 查詢的效能How to improve performance of Delta Lake MERGE INTO queries using partition pruning

本文說明如何在 Delta Lake MERGE中觸發分割區剪除,並從 Azure Databricks 中的查詢。This article explains how to trigger partition pruning in Delta Lake MERGE INTO queries from Azure Databricks.

「分割區剪除」是一種優化技巧,可限制查詢所檢查的資料分割數目。Partition pruning is an optimization technique to limit the number of partitions that are inspected by a query.

討論Discussion

MERGE INTO與 Delta 資料表搭配使用時,是一種昂貴的作業。MERGE INTO is an expensive operation when used with Delta tables. 如果您不分割基礎資料並適當地使用,則查詢效能可能會受到嚴重影響。If you don’t partition the underlying data and use it appropriately, query performance can be severely impacted.

主要課程如下:如果您知道 MERGE INTO 查詢需要檢查的資料分割,您應該在查詢中指定它們,以便執行分割區剪除。The main lesson is this: if you know which partitions a MERGE INTO query needs to inspect, you should specify them in the query so that partition pruning is performed.

示範:不剪除資料分割Demonstration: no partition pruning

以下是執行效能不佳之查詢的範例, MERGE INTO 而不需要分割區剪除。Here is an example of a poorly performing MERGE INTO query without partition pruning.

首先,建立下列 Delta 資料表,稱為 delta_merge_intoStart by creating the following Delta table, called delta_merge_into:

val df = spark.range(30000000)
    .withColumn("par", ($"id" % 1000).cast(IntegerType))
    .withColumn("ts", current_timestamp())
    .write
      .format("delta")
      .mode("overwrite")
      .partitionBy("par")
      .saveAsTable("delta_merge_into")

然後將資料框架合併到 Delta 資料表,以建立名為的資料表 updateThen merge a DataFrame into the Delta table to create a table called update:

val updatesTableName = "update"
val targetTableName = "delta_merge_into"
val updates = spark.range(100).withColumn("id", (rand() * 30000000 * 2).cast(IntegerType))
    .withColumn("par", ($"id" % 2).cast(IntegerType))
    .withColumn("ts", current_timestamp())
    .dropDuplicates("id")
updates.createOrReplaceTempView(updatesTableName)

update資料表有100個數據列,其中包含三個數據行:、 id partsThe update table has 100 rows with three columns, id, par, and ts. 的值 par 一律為1或0。The value of par is always either 1 or 0.

假設您執行下列簡單的 MERGE INTO 查詢:Let’s say you run the following simple MERGE INTO query:

spark.sql(s"""
    |MERGE INTO $targetTableName
    |USING $updatesTableName
     |ON $targetTableName.id = $updatesTableName.id
     |WHEN MATCHED THEN
     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts
    |WHEN NOT MATCHED THEN
    |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
 """.stripMargin)

查詢需要13.16 分鐘的時間才能完成:The query takes 13.16 minutes to complete:

無替代文字no-alternative-text

此查詢的實體計畫包含 PartitionCount: 1000 ,如下所示。The physical plan for this query contains PartitionCount: 1000, as shown below. 這表示 Apache Spark 正在掃描所有1000資料分割,以便執行查詢。This means Apache Spark is scanning all 1000 partitions in order to execute the query. 這不是有效率的查詢,因為 update 資料只有和的分割區值 1 0This is not an efficient query, because the update data only has partition values of 1 and 0:

== Physical Plan ==
*(5) HashAggregate(keys=[], functions=[finalmerge_count(merge count#8452L) AS count(1)#8448L], output=[count#8449L])
+- Exchange SinglePartition
   +- *(4) HashAggregate(keys=[], functions=[partial_count(1) AS count#8452L], output=[count#8452L])
    +- *(4) Project
       +- *(4) Filter (isnotnull(count#8440L) && (count#8440L > 1))
          +- *(4) HashAggregate(keys=[_row_id_#8399L], functions=[finalmerge_sum(merge sum#8454L) AS sum(cast(one#8434 as bigint))#8439L], output=[count#8440L])
             +- Exchange hashpartitioning(_row_id_#8399L, 200)
                +- *(3) HashAggregate(keys=[_row_id_#8399L], functions=[partial_sum(cast(one#8434 as bigint)) AS sum#8454L], output=[_row_id_#8399L, sum#8454L])
                   +- *(3) Project [_row_id_#8399L, UDF(_file_name_#8404) AS one#8434]
                      +- *(3) BroadcastHashJoin [cast(id#7514 as bigint)], [id#8390L], Inner, BuildLeft, false
                         :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
                         :  +- *(2) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
                         :     +- Exchange hashpartitioning(id#7514, 200)
                         :        +- *(1) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
                         :           +- *(1) Filter isnotnull(id#7514)
                         :              +- *(1) Project [cast(((rand(8188829649009385616) * 3.0E7) * 2.0) as int) AS id#7514]
                         :                 +- *(1) Range (0, 100, step=1, splits=36)
                         +- *(3) Filter isnotnull(id#8390L)
                            +- *(3) Project [id#8390L, _row_id_#8399L, input_file_name() AS _file_name_#8404]
                               +- *(3) Project [id#8390L, monotonically_increasing_id() AS _row_id_#8399L]
                                  +- *(3) Project [id#8390L, par#8391, ts#8392]
                                     +- *(3) FileScan parquet [id#8390L,ts#8392,par#8391] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeBatchFileIndex[dbfs:/user/hive/warehouse/delta_merge_into], PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,ts:timestamp>

解決方法Solution

重寫查詢以指定分割區。Rewrite the query to specify the partitions.

MERGE INTO 查詢會直接指定磁碟分割:This MERGE INTO query specifies the partitions directly:

spark.sql(s"""
     |MERGE INTO $targetTableName
     |USING $updatesTableName
     |ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
     |WHEN MATCHED THEN
     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts
     |WHEN NOT MATCHED THEN
     |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
 """.stripMargin)

現在,在相同的叢集上完成查詢只需20.54 秒。Now the query takes just 20.54 seconds to complete on the same cluster.

無替代文字no-alternative-text

此查詢的實體計畫包含 PartitionCount: 2 ,如下所示。The physical plan for this query contains PartitionCount: 2, as shown below. 只有微小的變更,查詢現在比40X 更快:With only minor changes, the query is now more than 40X faster:

== Physical Plan ==
*(5) HashAggregate(keys=[], functions=[finalmerge_count(merge count#7892L) AS count(1)#7888L], output=[count#7889L])
+- Exchange SinglePartition
   +- *(4) HashAggregate(keys=[], functions=[partial_count(1) AS count#7892L], output=[count#7892L])
    +- *(4) Project
       +- *(4) Filter (isnotnull(count#7880L) && (count#7880L > 1))
          +- *(4) HashAggregate(keys=[_row_id_#7839L], functions=[finalmerge_sum(merge sum#7894L) AS sum(cast(one#7874 as bigint))#7879L], output=[count#7880L])
             +- Exchange hashpartitioning(_row_id_#7839L, 200)
                +- *(3) HashAggregate(keys=[_row_id_#7839L], functions=[partial_sum(cast(one#7874 as bigint)) AS sum#7894L], output=[_row_id_#7839L, sum#7894L])
                   +- *(3) Project [_row_id_#7839L, UDF(_file_name_#7844) AS one#7874]
                      +- *(3) BroadcastHashJoin [cast(id#7514 as bigint)], [id#7830L], Inner, BuildLeft, false
                         :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
                         :  +- *(2) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
                         :     +- Exchange hashpartitioning(id#7514, 200)
                         :        +- *(1) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
                         :           +- *(1) Filter isnotnull(id#7514)
                         :              +- *(1) Project [cast(((rand(8188829649009385616) * 3.0E7) * 2.0) as int) AS id#7514]
                         :                 +- *(1) Range (0, 100, step=1, splits=36)
                         +- *(3) Project [id#7830L, _row_id_#7839L, _file_name_#7844]
                            +- *(3) Filter (par#7831 IN (1,0) && isnotnull(id#7830L))
                               +- *(3) Project [id#7830L, par#7831, _row_id_#7839L, input_file_name() AS _file_name_#7844]
                                  +- *(3) Project [id#7830L, par#7831, monotonically_increasing_id() AS _row_id_#7839L]
                                     +- *(3) Project [id#7830L, par#7831, ts#7832]
                                        +- *(3) FileScan parquet [id#7830L,ts#7832,par#7831] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeBatchFileIndex[dbfs:/user/hive/warehouse/delta_merge_into], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,ts:timestamp>