パーティションの削除を使用して Delta Lake の MERGE INTO
クエリのパフォーマンスを向上させる方法How to improve performance of Delta Lake MERGE INTO
queries using partition pruning
この記事では、Azure Databricks からのクエリにデルタレイク MERGE のパーティション排除をトリガーする方法について説明します。This article explains how to trigger partition pruning in Delta Lake MERGE INTO queries from Azure Databricks.
パーティション排除は、クエリによって検査されるパーティションの数を制限するための最適化手法です。Partition pruning is an optimization technique to limit the number of partitions that are inspected by a query.
ディスカッションDiscussion
MERGE INTO
は、デルタテーブルと共に使用する場合に高価な操作です。MERGE INTO
is an expensive operation when used with Delta tables. 基になるデータをパーティション分割せずに適切に使用すると、クエリのパフォーマンスに重大な影響が及ぶ可能性があります。If you don’t partition the underlying data and use it appropriately, query performance can be severely impacted.
メインのレッスンは次のとおりです。クエリで検査する必要があるパーティションがわかっている場合は、 MERGE INTO
パーティションの排除が実行されるようにクエリでそれらを指定する必要があります。The main lesson is this: if you know which partitions a MERGE INTO
query needs to inspect, you should specify them in the query so that partition pruning is performed.
デモンストレーション: パーティション排除なしDemonstration: no partition pruning
パーティションを排除せずに、パフォーマンスの低いクエリの例を次に示し MERGE INTO
ます。Here is an example of a poorly performing MERGE INTO
query without partition pruning.
まず、次のデルタテーブルを作成し delta_merge_into
ます。Start by creating the following Delta table, called delta_merge_into
:
val df = spark.range(30000000)
.withColumn("par", ($"id" % 1000).cast(IntegerType))
.withColumn("ts", current_timestamp())
.write
.format("delta")
.mode("overwrite")
.partitionBy("par")
.saveAsTable("delta_merge_into")
次に、データフレームをデルタテーブルにマージして、というテーブルを作成し update
ます。Then merge a DataFrame into the Delta table to create a table called update
:
val updatesTableName = "update"
val targetTableName = "delta_merge_into"
val updates = spark.range(100).withColumn("id", (rand() * 30000000 * 2).cast(IntegerType))
.withColumn("par", ($"id" % 2).cast(IntegerType))
.withColumn("ts", current_timestamp())
.dropDuplicates("id")
updates.createOrReplaceTempView(updatesTableName)
テーブルには、、、 update
およびという3つの列を持つ100行があり id
par
ts
ます。The update
table has 100 rows with three columns, id
, par
, and ts
. の値 par
は、常に1または0です。The value of par
is always either 1 or 0.
たとえば、次の単純なクエリを実行するとし MERGE INTO
ます。Let’s say you run the following simple MERGE INTO
query:
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
""".stripMargin)
このクエリは、完了するまでに13.16 分かかります。The query takes 13.16 minutes to complete:
次に示すように、このクエリの物理プランにはが含まれてい PartitionCount: 1000
ます。The physical plan for this query contains PartitionCount: 1000
, as shown below. これは、クエリを実行するために、すべての1000パーティションをスキャンする Apache Spark ことを意味します。This means Apache Spark is scanning all 1000 partitions in order to execute the query. update
データにはとのパーティション値のみが含まれるため、これは効率的なクエリではありません 1
0
。This is not an efficient query, because the update
data only has partition values of 1
and 0
:
== Physical Plan ==
*(5) HashAggregate(keys=[], functions=[finalmerge_count(merge count#8452L) AS count(1)#8448L], output=[count#8449L])
+- Exchange SinglePartition
+- *(4) HashAggregate(keys=[], functions=[partial_count(1) AS count#8452L], output=[count#8452L])
+- *(4) Project
+- *(4) Filter (isnotnull(count#8440L) && (count#8440L > 1))
+- *(4) HashAggregate(keys=[_row_id_#8399L], functions=[finalmerge_sum(merge sum#8454L) AS sum(cast(one#8434 as bigint))#8439L], output=[count#8440L])
+- Exchange hashpartitioning(_row_id_#8399L, 200)
+- *(3) HashAggregate(keys=[_row_id_#8399L], functions=[partial_sum(cast(one#8434 as bigint)) AS sum#8454L], output=[_row_id_#8399L, sum#8454L])
+- *(3) Project [_row_id_#8399L, UDF(_file_name_#8404) AS one#8434]
+- *(3) BroadcastHashJoin [cast(id#7514 as bigint)], [id#8390L], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
: +- *(2) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
: +- Exchange hashpartitioning(id#7514, 200)
: +- *(1) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
: +- *(1) Filter isnotnull(id#7514)
: +- *(1) Project [cast(((rand(8188829649009385616) * 3.0E7) * 2.0) as int) AS id#7514]
: +- *(1) Range (0, 100, step=1, splits=36)
+- *(3) Filter isnotnull(id#8390L)
+- *(3) Project [id#8390L, _row_id_#8399L, input_file_name() AS _file_name_#8404]
+- *(3) Project [id#8390L, monotonically_increasing_id() AS _row_id_#8399L]
+- *(3) Project [id#8390L, par#8391, ts#8392]
+- *(3) FileScan parquet [id#8390L,ts#8392,par#8391] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeBatchFileIndex[dbfs:/user/hive/warehouse/delta_merge_into], PartitionCount: 1000, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,ts:timestamp>
解決策Solution
パーティションを指定するようにクエリを書き直してください。Rewrite the query to specify the partitions.
次 MERGE INTO
のクエリでは、パーティションを直接指定します。This MERGE INTO
query specifies the partitions directly:
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
""".stripMargin)
これで、クエリは同じクラスターで完了するまでに20.54 秒しかかかりません。Now the query takes just 20.54 seconds to complete on the same cluster.
次に示すように、このクエリの物理プランにはが含まれてい PartitionCount: 2
ます。The physical plan for this query contains PartitionCount: 2
, as shown below. わずかな変更だけで、クエリは40X よりも高速になりました。With only minor changes, the query is now more than 40X faster:
== Physical Plan ==
*(5) HashAggregate(keys=[], functions=[finalmerge_count(merge count#7892L) AS count(1)#7888L], output=[count#7889L])
+- Exchange SinglePartition
+- *(4) HashAggregate(keys=[], functions=[partial_count(1) AS count#7892L], output=[count#7892L])
+- *(4) Project
+- *(4) Filter (isnotnull(count#7880L) && (count#7880L > 1))
+- *(4) HashAggregate(keys=[_row_id_#7839L], functions=[finalmerge_sum(merge sum#7894L) AS sum(cast(one#7874 as bigint))#7879L], output=[count#7880L])
+- Exchange hashpartitioning(_row_id_#7839L, 200)
+- *(3) HashAggregate(keys=[_row_id_#7839L], functions=[partial_sum(cast(one#7874 as bigint)) AS sum#7894L], output=[_row_id_#7839L, sum#7894L])
+- *(3) Project [_row_id_#7839L, UDF(_file_name_#7844) AS one#7874]
+- *(3) BroadcastHashJoin [cast(id#7514 as bigint)], [id#7830L], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
: +- *(2) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
: +- Exchange hashpartitioning(id#7514, 200)
: +- *(1) HashAggregate(keys=[id#7514], functions=[], output=[id#7514])
: +- *(1) Filter isnotnull(id#7514)
: +- *(1) Project [cast(((rand(8188829649009385616) * 3.0E7) * 2.0) as int) AS id#7514]
: +- *(1) Range (0, 100, step=1, splits=36)
+- *(3) Project [id#7830L, _row_id_#7839L, _file_name_#7844]
+- *(3) Filter (par#7831 IN (1,0) && isnotnull(id#7830L))
+- *(3) Project [id#7830L, par#7831, _row_id_#7839L, input_file_name() AS _file_name_#7844]
+- *(3) Project [id#7830L, par#7831, monotonically_increasing_id() AS _row_id_#7839L]
+- *(3) Project [id#7830L, par#7831, ts#7832]
+- *(3) FileScan parquet [id#7830L,ts#7832,par#7831] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeBatchFileIndex[dbfs:/user/hive/warehouse/delta_merge_into], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,ts:timestamp>