クエリ プランに BroadcastNestedLoopJoin
がある場合にブロードキャストを無効にするDisable broadcast when query plan has BroadcastNestedLoopJoin
この記事では、クエリプランが物理計画に含まれている場合にブロードキャストを無効にする方法について説明 BroadcastNestedLoopJoin
します。This article explains how to disable broadcast when the query plan has BroadcastNestedLoopJoin
in the physical plan.
ブロードキャストのしきい値を無効にした後、ブロードキャストを停止することを想定して spark.sql.autoBroadcastJoinThreshold
います。-1 に設定します。ただし Apache Spark は、大きいテーブルをブロードキャストしようとし、ブロードキャストエラーが発生して失敗します。You expect the broadcast to stop after you disable the broadcast threshold, by setting spark.sql.autoBroadcastJoinThreshold
to -1, but Apache Spark tries to broadcast the bigger table and fails with a broadcast error.
この動作はバグではありませんが、予期しないものである可能性があります。This behavior is NOT a bug, however it can be unexpected. ここでは、想定される動作を確認し、この問題の軽減策を提供します。We are going to review the expected behavior and provide a mitigation option for this issue.
テーブルの作成Create tables
まず、null 値を持つテーブルと null 値を持たないテーブルを2つ作成し table_withNull
tblA_NoNull
ます。Start by creating two tables, one with null values table_withNull
and the other without null values tblA_NoNull
.
sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull")
sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")
ブロードキャストを無効にしようとしましたAttempt to disable broadcast
句を使用したサブクエリを含むクエリの設定によって、ブロードキャストを無効にしようとして spark.sql.autoBroadcastJoinThreshold
in
います。We attempt to disable broadcast by setting spark.sql.autoBroadcastJoinThreshold
for the query, which has a sub-query with an in
clause.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)
クエリプランを確認すると、 BroadcastNestedLoopJoin
この状況では最後のフォールバックが可能になります。If you review the query plan, BroadcastNestedLoopJoin
is the last possible fallback in this situation. ブロードキャストを無効にした後でも表示されます。It appears even after attempting to disable the broadcast.
== Physical Plan ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange IdentityBroadcastMode, [id=#2586]
+- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
処理中のデータのサイズが十分に大きい場合、Spark がテーブルをブロードキャストしようとしたときにブロードキャストエラーが発生します。If the data being processed is large enough, this results in broadcast errors when Spark attempts to broadcast the table.
の代わりにを使用してクエリを書き直し not exists
ます。 in
Rewrite query using not exists
instead of in
この問題を解決するには、ではなくを使用してクエリを書き直し not exists
in
ます。You can resolve the issue by rewriting the query with not exists
instead of in
.
// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)
を使用する not exists
と、クエリはで実行され SortMergeJoin
ます。By using not exists
, the query runs with SortMergeJoin
.
== Physical Plan ==
SortMergeJoin [id#2482L], [id#2483L], LeftAnti
:- Sort [id#2482L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]
: +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Sort [id#2483L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#2483L, 200), [id=#2656]
+- *(2) Project [id#2483L]
+- *(2) Filter isnotnull(id#2483L)
+- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
説明Explanation
Spark と SQL は null 処理のセマンティクスが若干異なるため、spark はこれを自動的に実行しません。Spark doesn’t do this automatically, because Spark and SQL have slightly different semantics for null handling.
SQL では、 not in
値に null 値がある場合、結果は空になり not in
ます。In SQL, not in
means that if there is any null value in the not in
values, the result is empty. これは、でのみ実行できる理由です BroadcastNestedLoopJoin
。This is why it can only be executed with BroadcastNestedLoopJoin
. not in
Set に null 値がないことを保証するために、すべての値がわかっている必要があります。All not in
values must be known in order to ensure there is no null value in the set.
ノートブックの例Example notebook
この notebook には、Spark が自動的に切り替えられない理由を示す完全な例があり BroadcastNestedLoopJoin
SortMergeJoin
ます。This notebook has a complete example, showing why Spark does not automatically switch BroadcastNestedLoopJoin
to SortMergeJoin
.