當查詢計劃有時停用廣播 BroadcastNestedLoopJoinDisable broadcast when query plan has BroadcastNestedLoopJoin

本文說明當查詢計劃 BroadcastNestedLoopJoin 在實體方案中時,如何停用廣播。This article explains how to disable broadcast when the query plan has BroadcastNestedLoopJoin in the physical plan.

您預期在停用廣播閾值之後,廣播會停止,方法是將設定 spark.sql.autoBroadcastJoinThreshold 為-1,但 Apache Spark 嘗試廣播較大的資料表,並因為廣播錯誤而失敗。You expect the broadcast to stop after you disable the broadcast threshold, by setting spark.sql.autoBroadcastJoinThreshold to -1, but Apache Spark tries to broadcast the bigger table and fails with a broadcast error.

此行為不是 bug,但可能是非預期的。This behavior is NOT a bug, however it can be unexpected. 我們將探討預期的行為,並提供此問題的緩和選項。We are going to review the expected behavior and provide a mitigation option for this issue.

建立資料表Create tables

首先,建立兩個數據表,其中一個具有 null 值, table_withNull 另一個則不含 null 值 tblA_NoNullStart by creating two tables, one with null values table_withNull and the other without null values tblA_NoNull.

sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull")
sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")

嘗試停用廣播Attempt to disable broadcast

我們嘗試透過設定查詢來停 spark.sql.autoBroadcastJoinThreshold 用 [廣播],其具有子句的子查詢 inWe attempt to disable broadcast by setting spark.sql.autoBroadcastJoinThreshold for the query, which has a sub-query with an in clause.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)

如果您檢查查詢計劃, BroadcastNestedLoopJoin 則在這種情況下可能會有最新的回復。If you review the query plan, BroadcastNestedLoopJoin is the last possible fallback in this situation. 即使在嘗試停用廣播之後也會出現。It appears even after attempting to disable the broadcast.

== Physical Plan ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange IdentityBroadcastMode, [id=#2586]
   +- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

如果正在處理的資料夠大,當 Spark 嘗試廣播資料表時,這會導致廣播錯誤。If the data being processed is large enough, this results in broadcast errors when Spark attempts to broadcast the table.

使用取代來重寫查詢 not exists``inRewrite query using not exists instead of in

您可以藉由重寫查詢(而非)來解決此問題 not exists inYou can resolve the issue by rewriting the query with not exists instead of in.

// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)

藉由使用 not exists ,查詢會以執行 SortMergeJoinBy using not exists, the query runs with SortMergeJoin.

== Physical Plan ==
SortMergeJoin [id#2482L], [id#2483L], LeftAnti
:- Sort [id#2482L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]
:     +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Sort [id#2483L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#2483L, 200), [id=#2656]
      +- *(2) Project [id#2483L]
         +- *(2) Filter isnotnull(id#2483L)
            +- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

說明Explanation

Spark 不會自動進行此作業,因為 Spark 和 SQL 對於 null 處理的語法稍有不同。Spark doesn’t do this automatically, because Spark and SQL have slightly different semantics for null handling.

在 SQL 中, not in 表示如果值中有任何 null 值 not in ,則結果會是空的。In SQL, not in means that if there is any null value in the not in values, the result is empty. 這就是為什麼只能用來執行它的原因 BroadcastNestedLoopJoinThis is why it can only be executed with BroadcastNestedLoopJoin. not in必須知道所有的值,才能確保集合中沒有任何 null 值。All not in values must be known in order to ensure there is no null value in the set.

範例筆記本Example notebook

此筆記本有完整的範例,顯示 Spark 不會自動切換至的原因 BroadcastNestedLoopJoin SortMergeJoinThis notebook has a complete example, showing why Spark does not automatically switch BroadcastNestedLoopJoin to SortMergeJoin.

BroadcastNestedLoopJoin 範例筆記本BroadcastNestedLoopJoin example notebook

取得筆記本Get notebook