當查詢計劃有時停用廣播 BroadcastNestedLoopJoin
Disable broadcast when query plan has BroadcastNestedLoopJoin
本文說明當查詢計劃 BroadcastNestedLoopJoin
在實體方案中時,如何停用廣播。This article explains how to disable broadcast when the query plan has BroadcastNestedLoopJoin
in the physical plan.
您預期在停用廣播閾值之後,廣播會停止,方法是將設定 spark.sql.autoBroadcastJoinThreshold
為-1,但 Apache Spark 嘗試廣播較大的資料表,並因為廣播錯誤而失敗。You expect the broadcast to stop after you disable the broadcast threshold, by setting spark.sql.autoBroadcastJoinThreshold
to -1, but Apache Spark tries to broadcast the bigger table and fails with a broadcast error.
此行為不是 bug,但可能是非預期的。This behavior is NOT a bug, however it can be unexpected. 我們將探討預期的行為,並提供此問題的緩和選項。We are going to review the expected behavior and provide a mitigation option for this issue.
建立資料表Create tables
首先,建立兩個數據表,其中一個具有 null 值, table_withNull
另一個則不含 null 值 tblA_NoNull
。Start by creating two tables, one with null values table_withNull
and the other without null values tblA_NoNull
.
sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull")
sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")
嘗試停用廣播Attempt to disable broadcast
我們嘗試透過設定查詢來停 spark.sql.autoBroadcastJoinThreshold
用 [廣播],其具有子句的子查詢 in
。We attempt to disable broadcast by setting spark.sql.autoBroadcastJoinThreshold
for the query, which has a sub-query with an in
clause.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)
如果您檢查查詢計劃, BroadcastNestedLoopJoin
則在這種情況下可能會有最新的回復。If you review the query plan, BroadcastNestedLoopJoin
is the last possible fallback in this situation. 即使在嘗試停用廣播之後也會出現。It appears even after attempting to disable the broadcast.
== Physical Plan ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange IdentityBroadcastMode, [id=#2586]
+- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
如果正在處理的資料夠大,當 Spark 嘗試廣播資料表時,這會導致廣播錯誤。If the data being processed is large enough, this results in broadcast errors when Spark attempts to broadcast the table.
使用取代來重寫查詢 not exists``in
Rewrite query using not exists
instead of in
您可以藉由重寫查詢(而非)來解決此問題 not exists
in
。You can resolve the issue by rewriting the query with not exists
instead of in
.
// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)
藉由使用 not exists
,查詢會以執行 SortMergeJoin
。By using not exists
, the query runs with SortMergeJoin
.
== Physical Plan ==
SortMergeJoin [id#2482L], [id#2483L], LeftAnti
:- Sort [id#2482L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]
: +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Sort [id#2483L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#2483L, 200), [id=#2656]
+- *(2) Project [id#2483L]
+- *(2) Filter isnotnull(id#2483L)
+- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
說明Explanation
Spark 不會自動進行此作業,因為 Spark 和 SQL 對於 null 處理的語法稍有不同。Spark doesn’t do this automatically, because Spark and SQL have slightly different semantics for null handling.
在 SQL 中, not in
表示如果值中有任何 null 值 not in
,則結果會是空的。In SQL, not in
means that if there is any null value in the not in
values, the result is empty. 這就是為什麼只能用來執行它的原因 BroadcastNestedLoopJoin
。This is why it can only be executed with BroadcastNestedLoopJoin
. not in
必須知道所有的值,才能確保集合中沒有任何 null 值。All not in
values must be known in order to ensure there is no null value in the set.
範例筆記本Example notebook
此筆記本有完整的範例,顯示 Spark 不會自動切換至的原因 BroadcastNestedLoopJoin
SortMergeJoin
。This notebook has a complete example, showing why Spark does not automatically switch BroadcastNestedLoopJoin
to SortMergeJoin
.