クエリ プランに BroadcastNestedLoopJoin がある場合にブロードキャストを無効にするDisable broadcast when query plan has BroadcastNestedLoopJoin

この記事では、クエリプランが物理計画に含まれている場合にブロードキャストを無効にする方法について説明 BroadcastNestedLoopJoin します。This article explains how to disable broadcast when the query plan has BroadcastNestedLoopJoin in the physical plan.

ブロードキャストのしきい値を無効にした後、ブロードキャストを停止することを想定して spark.sql.autoBroadcastJoinThreshold います。-1 に設定します。ただし Apache Spark は、大きいテーブルをブロードキャストしようとし、ブロードキャストエラーが発生して失敗します。You expect the broadcast to stop after you disable the broadcast threshold, by setting spark.sql.autoBroadcastJoinThreshold to -1, but Apache Spark tries to broadcast the bigger table and fails with a broadcast error.

この動作はバグではありませんが、予期しないものである可能性があります。This behavior is NOT a bug, however it can be unexpected. ここでは、想定される動作を確認し、この問題の軽減策を提供します。We are going to review the expected behavior and provide a mitigation option for this issue.

テーブルの作成Create tables

まず、null 値を持つテーブルと null 値を持たないテーブルを2つ作成し table_withNull tblA_NoNull ます。Start by creating two tables, one with null values table_withNull and the other without null values tblA_NoNull.

sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull")
sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")

ブロードキャストを無効にしようとしましたAttempt to disable broadcast

句を使用したサブクエリを含むクエリの設定によって、ブロードキャストを無効にしようとして spark.sql.autoBroadcastJoinThreshold in います。We attempt to disable broadcast by setting spark.sql.autoBroadcastJoinThreshold for the query, which has a sub-query with an in clause.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)

クエリプランを確認すると、 BroadcastNestedLoopJoin この状況では最後のフォールバックが可能になります。If you review the query plan, BroadcastNestedLoopJoin is the last possible fallback in this situation. ブロードキャストを無効にした後でも表示されます。It appears even after attempting to disable the broadcast.

== Physical Plan ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange IdentityBroadcastMode, [id=#2586]
   +- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

処理中のデータのサイズが十分に大きい場合、Spark がテーブルをブロードキャストしようとしたときにブロードキャストエラーが発生します。If the data being processed is large enough, this results in broadcast errors when Spark attempts to broadcast the table.

の代わりにを使用してクエリを書き直し not exists ます。 inRewrite query using not exists instead of in

この問題を解決するには、ではなくを使用してクエリを書き直し not exists in ます。You can resolve the issue by rewriting the query with not exists instead of in.

// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)

を使用する not exists と、クエリはで実行され SortMergeJoin ます。By using not exists, the query runs with SortMergeJoin.

== Physical Plan ==
SortMergeJoin [id#2482L], [id#2483L], LeftAnti
:- Sort [id#2482L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]
:     +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Sort [id#2483L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#2483L, 200), [id=#2656]
      +- *(2) Project [id#2483L]
         +- *(2) Filter isnotnull(id#2483L)
            +- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

説明Explanation

Spark と SQL は null 処理のセマンティクスが若干異なるため、spark はこれを自動的に実行しません。Spark doesn’t do this automatically, because Spark and SQL have slightly different semantics for null handling.

SQL では、 not in 値に null 値がある場合、結果は空になり not in ます。In SQL, not in means that if there is any null value in the not in values, the result is empty. これは、でのみ実行できる理由です BroadcastNestedLoopJoinThis is why it can only be executed with BroadcastNestedLoopJoin. not inSet に null 値がないことを保証するために、すべての値がわかっている必要があります。All not in values must be known in order to ensure there is no null value in the set.

ノートブックの例Example notebook

この notebook には、Spark が自動的に切り替えられない理由を示す完全な例があり BroadcastNestedLoopJoin SortMergeJoin ます。This notebook has a complete example, showing why Spark does not automatically switch BroadcastNestedLoopJoin to SortMergeJoin.

BroadcastNestedLoopJoin notebook の例BroadcastNestedLoopJoin example notebook

ノートブックを入手Get notebook