Inaktivera sändning när en frågeplan har BroadcastNestedLoopJoin

Den här artikeln förklarar hur du inaktiverar sändning när en frågeplan har BroadcastNestedLoopJoin i den fysiska planen.

Du förväntar dig att sändningen ska stoppas när du har inaktiverat tröskelvärdet för sändning, genom spark.sql.autoBroadcastJoinThreshold att ange till-1, men Apache Spark försöker sända den större tabellen och Miss lyckas med ett broadcast-fel.

Det här beteendet är inte ett fel, men det kan vara oväntat. Vi ska granska det förväntade beteendet och tillhandahålla ett alternativ för att undvika problemet.

Skapa tabeller

Börja med att skapa två tabeller, en med null-värden table_withNull och den andra utan null-värden tblA_NoNull .

sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull")
sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")

Försök att inaktivera sändning

Vi försöker inaktivera sändning genom att ställa in spark.sql.autoBroadcastJoinThreshold frågan, som har en under fråga med en in sats.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)

Om du granskar frågeplan BroadcastNestedLoopJoin är det sista möjliga återställningen i den här situationen. Den visas även när du försöker inaktivera sändningen.

== Physical Plan ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange IdentityBroadcastMode, [id=#2586]
   +- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Om data som bearbetas är tillräckligt stora resulterar det i broadcast-fel när Spark försöker sända tabellen.

Skriv om frågan med not exists i stället för in

Du kan lösa problemet genom att skriva om frågan med not exists i stället för in .

// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)

Med hjälp av not exists körs frågan med SortMergeJoin .

== Physical Plan ==
SortMergeJoin [id#2482L], [id#2483L], LeftAnti
:- Sort [id#2482L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]
:     +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Sort [id#2483L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#2483L, 200), [id=#2656]
      +- *(2) Project [id#2483L]
         +- *(2) Filter isnotnull(id#2483L)
            +- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

Förklaring

Spark gör inte detta automatiskt, eftersom Spark och SQL har något annorlunda semantik för null-hantering.

I SQL not in innebär det att om det finns något null-värde i not in värdena är resultatet tomt. Det är därför det bara kan köras med BroadcastNestedLoopJoin . Alla not in värden måste vara kända för att se till att det inte finns något null-värde i uppsättningen.

Exempelnotebook-fil

Den här antecknings boken har ett komplett exempel som visar varför Spark inte automatiskt växlar BroadcastNestedLoopJoin till SortMergeJoin .

BroadcastNestedLoopJoin exempel antecknings bok

Hämta notebook-fil