パーティション排除を使用したクエリへのデルタレイクマージのパフォーマンスを向上させる方法How to Improve Performance of Delta Lake MERGE INTO Queries Using Partition Pruning

この記事では、Azure Databricks からのクエリにデルタレイクMERGEのパーティション排除をトリガーする方法について説明します。This article explains how to trigger partition pruning in Delta Lake MERGE INTO queries from Azure Databricks.

パーティション排除は、クエリによって検査されるパーティションの数を制限するための最適化手法です。Partition pruning is an optimization technique to limit the number of partitions that are inspected by a query.

注意

この操作は、5.0 以降の Databricks Runtime に適用されます。This operation applies to Databricks Runtime 5.0 and above.

ディスカッションDiscussion

MERGE INTO は、デルタテーブルと共に使用するとコストがかかる操作です。MERGE INTO is an expensive operation when used with Delta tables. 基になるデータをパーティション分割せずに適切に使用すると、クエリのパフォーマンスに重大な影響が及ぶ可能性があります。If you don’t partition the underlying data and use it appropriately, query performance can be severely impacted.

メインのレッスンは次のようになります。 MERGE INTO クエリで検査する必要があるパーティションがわかっている場合は、パーティションの排除が実行されるようにクエリでそれらを指定する必要があります。The main lesson is this: if you know which partitions a MERGE INTO query needs to inspect, you should specify them in the query so that partition pruning is performed.

デモンストレーション: パーティション排除なしDemonstration: no partition pruning

次に示すのは、パーティションを排除せずに、パフォーマンスの低い MERGE INTO クエリの例です。Here is an example of a poorly performing MERGE INTO query without partition pruning.

まず、delta_merge_intoという次のデルタテーブルを作成します。Start by creating the following Delta table, called delta_merge_into:

val df = spark.range(30000000)
    .withColumn("par", ($"id" % 1000).cast(IntegerType))
    .withColumn("ts", current_timestamp())
    .write
      .format("delta")
      .mode("overwrite")
      .partitionBy("par")
      .saveAsTable("delta_merge_into")

次に、データフレームをデルタテーブルにマージして、update というテーブルを作成します。Then merge a DataFrame into the Delta table to create a table called update:

val updatesTableName = "update"
val targetTableName = "delta_merge_into"
val updates = spark.range(100).withColumn("id", (rand() * 30000000 * 2).cast(IntegerType))
    .withColumn("par", ($"id" % 2).cast(IntegerType))
    .withColumn("ts", current_timestamp())
    .dropDuplicates("id")
updates.createOrReplaceTempView(updatesTableName)

update テーブルには、idpar、および tsの3つの列を持つ100行があります。The update table has 100 rows with three columns, id, par, and ts. par の値は常に1または0です。The value of par is always either 1 or 0.

たとえば、次の単純な MERGE INTO クエリを実行するとします。Let’s say you run the following simple MERGE INTO query:

spark.sql(s"""
    |MERGE INTO $targetTableName
    |USING $updatesTableName
     |ON $targetTableName.id = $updatesTableName.id
     |WHEN MATCHED THEN
     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts
    |WHEN NOT MATCHED THEN
    |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
 """.stripMargin)

このクエリは、完了するまでに13.16 分かかります。The query takes 13.16 minutes to complete:

代替テキスト](../_static/images/delta/without-partition-filters.png) を ![no-alternative-text

このクエリの物理プランには、次に示すように PartitionCount: 1000 が含まれています。The physical plan for this query contains PartitionCount: 1000, as shown below. これは、クエリを実行するために、すべての1000パーティションをスキャンする Apache Spark ことを意味します。This means Apache Spark is scanning all 1000 partitions in order to execute the query. update データには 10のパーティション値のみが含まれるため、これは効率的なクエリではありません。This is not an efficient query, because the update data only has partition values of 1 and 0:

== Physical Plan ==
*(5) HashAggregate(keys=[], functions=[finalmerge_count(merge count#8452L) AS count(1)#8448L], output=[count#8449L])
+- Exchange SinglePartition
   +- *(4) HashAggregate(keys=[], functions=[partial_count(1) AS count#8452L], output=[count#8452L])
    +- *(4) Project
       +- *(4) Filter (isnotnull(count#8440L) && (count#8440L > 1))
          +- *(4) HashAggregate(keys=[_row_id_#8399L], functions=[finalmerge_sum(merge sum#8454L) AS sum(cast(one#8434 as bigint))#8439L], output=[count#8440L])
             +- Exchange hashpartitioning(_row_id_#8399L, 200)
                +- *(3) HashAggregate(keys=[_row_id_#8399L], functions=[partial_sum(cast(one#8434 as bigint)) AS sum#8454L], output=[_row_id_#8399L, sum#8454L])
                   +- *(3) Project [_row_id_#8399L, UDF(_file_name_#8404) AS one#8434]
                      +- *(3) BroadcastHashJoin [cast(id#7514 as bigint)], [id#8390L], Inner, BuildLeft, false
                         :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
                         :  +- *(2) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
                         :     +- Exchange hashpartitioning(id#7514, 200)
                         :        +- *(1) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
                         :           +- *(1) Filter isnotnull(id#7514)
                         :              +- *(1) Project [cast(((rand(8188829649009385616) * 3.0E7) * 2.0) as int) AS id#7514]
                         :                 +- *(1) Range (0, 100, step=1, splits=36)
                         +- *(3) Filter isnotnull(id#8390L)
                            +- *(3) Project [id#8390L, _row_id_#8399L, input_file_name() AS _file_name_#8404]
                               +- *(3) Project [id#8390L, monotonically_increasing_id() AS _row_id_#8399L]
                                  +- *(3) Project [id#8390L, par#8391, ts#8392]
                                     +- *(3) FileScan parquet [id#8390L,ts#8392,par#8391] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeBatchFileIndex[dbfs:/user/hive/warehouse/delta_merge_into], PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,ts:timestamp>

ソリューションSolution

以下に示す 2 つのソリューションがあります。There are two solutions:

  1. パーティションを手動で指定するようにクエリを書き直してください。Rewrite the query to specify the partitions manually.
  2. 動的パーティション排除を有効にします。Enable dynamic partition pruning.

方法 1Method 1

この MERGE INTO クエリでは、パーティションを直接指定します。This MERGE INTO query specifies the partitions directly:

spark.sql(s"""
     |MERGE INTO $targetTableName
     |USING $updatesTableName
     |ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
     |WHEN MATCHED THEN
     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts
     |WHEN NOT MATCHED THEN
     |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
 """.stripMargin)

これで、クエリは同じクラスターで完了するまでに20.54 秒しかかかりません。Now the query takes just 20.54 seconds to complete on the same cluster.

代替テキスト](../_static/images/delta/with-partition-filters.png) を ![no-alternative-text

このクエリの物理プランには、次に示すように PartitionCount: 2 が含まれています。The physical plan for this query contains PartitionCount: 2, as shown below. わずかな変更だけで、クエリは40X よりも高速になりました。With only minor changes, the query is now more than 40X faster:

== Physical Plan ==
*(5) HashAggregate(keys=[], functions=[finalmerge_count(merge count#7892L) AS count(1)#7888L], output=[count#7889L])
+- Exchange SinglePartition
   +- *(4) HashAggregate(keys=[], functions=[partial_count(1) AS count#7892L], output=[count#7892L])
    +- *(4) Project
       +- *(4) Filter (isnotnull(count#7880L) && (count#7880L > 1))
          +- *(4) HashAggregate(keys=[_row_id_#7839L], functions=[finalmerge_sum(merge sum#7894L) AS sum(cast(one#7874 as bigint))#7879L], output=[count#7880L])
             +- Exchange hashpartitioning(_row_id_#7839L, 200)
                +- *(3) HashAggregate(keys=[_row_id_#7839L], functions=[partial_sum(cast(one#7874 as bigint)) AS sum#7894L], output=[_row_id_#7839L, sum#7894L])
                   +- *(3) Project [_row_id_#7839L, UDF(_file_name_#7844) AS one#7874]
                      +- *(3) BroadcastHashJoin [cast(id#7514 as bigint)], [id#7830L], Inner, BuildLeft, false
                         :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
                         :  +- *(2) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
                         :     +- Exchange hashpartitioning(id#7514, 200)
                         :        +- *(1) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
                         :           +- *(1) Filter isnotnull(id#7514)
                         :              +- *(1) Project [cast(((rand(8188829649009385616) * 3.0E7) * 2.0) as int) AS id#7514]
                         :                 +- *(1) Range (0, 100, step=1, splits=36)
                         +- *(3) Project [id#7830L, _row_id_#7839L, _file_name_#7844]
                            +- *(3) Filter (par#7831 IN (1,0) && isnotnull(id#7830L))
                               +- *(3) Project [id#7830L, par#7831, _row_id_#7839L, input_file_name() AS _file_name_#7844]
                                  +- *(3) Project [id#7830L, par#7831, monotonically_increasing_id() AS _row_id_#7839L]
                                     +- *(3) Project [id#7830L, par#7831, ts#7832]
                                        +- *(3) FileScan parquet [id#7830L,ts#7832,par#7831] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeBatchFileIndex[dbfs:/user/hive/warehouse/delta_merge_into], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,ts:timestamp>

方法 2Method 2

ノートブックセルで次の設定を行うことにより、動的パーティション排除を有効にします。Enable dynamic partition pruning by making the following setting in a notebook cell:

spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")