Zákaz vysílání, pokud plán dotazu má BroadcastNestedLoopJoin

Tento článek vysvětluje, jak zakázat všesměrové vysílání, když má plán dotazů BroadcastNestedLoopJoin ve fyzickém plánu.

Očekáváte, že se vysílání zastaví po zakázání prahové hodnoty všesměrového vysílání nastavením spark.sql.autoBroadcastJoinThreshold na hodnotu-1, ale Apache Spark se pokusí vysílat větší tabulku a v případě chyby vysílání dojde k chybě.

Toto chování není chybou, ale může to být neočekávané. Zkontrolujeme očekávané chování a zajistěte pro tento problém možnost zmírnění.

Vytváření tabulek

Začněte vytvořením dvou tabulek, jedna s hodnotami null table_withNull a druhou bez hodnot null tblA_NoNull .

sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull")
sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")

Pokus o zakázání vysílání

Snažíme se zakázat všesměrové vysílání nastavením spark.sql.autoBroadcastJoinThreshold dotazu, který obsahuje poddotaz s in klauzulí.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)

Pokud si prohlédnete plán dotazu, BroadcastNestedLoopJoin je poslední možnou zálohou v této situaci. Zobrazuje se i po pokusu o zakázání vysílání.

== Physical Plan ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange IdentityBroadcastMode, [id=#2586]
   +- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Pokud jsou zpracovaná data dostatečně velká, výsledkem je chyba všesměrového vysílání, když se Spark pokusí vysílat tabulku.

Přepsat dotaz pomocí not exists místo in

Problém můžete vyřešit tak, že znovu zapíšete dotaz not exists místo in .

// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)

Pomocí příkazu not exists se dotaz spouští s SortMergeJoin .

== Physical Plan ==
SortMergeJoin [id#2482L], [id#2483L], LeftAnti
:- Sort [id#2482L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]
:     +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Sort [id#2483L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#2483L, 200), [id=#2656]
      +- *(2) Project [id#2483L]
         +- *(2) Filter isnotnull(id#2483L)
            +- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

Vysvětlení

Spark tento postup neprovede automaticky, protože Spark a SQL mají mírně odlišnou sémantiku pro zpracování hodnoty null.

V jazyce SQL not in to znamená, že pokud jsou v not in hodnotách hodnoty null, výsledek je prázdný. To je důvod, proč se dá provést jenom s BroadcastNestedLoopJoin . Aby se not in zajistilo, že v sadě není žádná hodnota null, musí být známé všechny hodnoty.

Příklad poznámkového bloku

Tento Poznámkový blok má úplný příklad, který ukazuje, proč Spark nepřepne automaticky BroadcastNestedLoopJoin na SortMergeJoin .

BroadcastNestedLoopJoin Příklad poznámkového bloku

Získat poznámkový blok