Adaptive query execution
Adaptive query execution (AQE) is query re-optimization that occurs during query execution.
A futtatókörnyezet újraoptimalizálásának az a motivációja, hogy az Azure Databricks a legfrissebb pontos statisztikákkal rendelkezik egy váltó- és szóráscsere végén (az AQE lekérdezési szakasza). Ennek eredményeképpen az Azure Databricks jobb fizikai stratégiát választhat, kiválaszthatja az optimális utószűrési partícióméretet és -számot, vagy optimalizálhatja azokat az optimalizálásokat, amelyekhez tippekre van szükség, például az illesztés kezelésének elmozdulásához.
Ez nagyon hasznos lehet, ha a statisztikai adatgyűjtés nincs bekapcsolva, vagy ha a statisztikák elavultak. Olyan helyeken is hasznos, ahol a statikusan származtatott statisztikák pontatlanok, például egy bonyolult lekérdezés közepén vagy az adateltérés előfordulása után.
Capabilities
Az AQE alapértelmezés szerint engedélyezve van. 4 fő funkcióval rendelkezik:
- Dinamikusan módosítja a rendezési egyesítés illesztést a szórásos kivonat illesztéské.
- Dinamikusan alakítja ki a partíciókat (a kis partíciókat ésszerűen méretezhető partíciókat kombinálja) a váltás után. A nagyon kis feladatok nagyobb I/O-átviteli sebességgel rendelkeznek, és általában jobban szenvednek az ütemezési többletterheléstől és a tevékenységbeállítási többletterheléstől. A kis feladatok egyesítése erőforrásokat takarít meg, és javítja a fürt átviteli sebességét.
- Dinamikusan kezeli a rendezési egyesítési illesztésben a ferdeségeket és a kivonat illesztését úgy, hogy az elvarrott tevékenységeket nagyjából egyenlő méretű tevékenységekre osztja (és szükség esetén replikálja).
- Dinamikusan észleli és propagálja az üres kapcsolatokat.
Application
Az AQE az összes olyan lekérdezésre vonatkozik, amely a következő:
- Nem streamelés
- Legalább egy cserét tartalmaz (általában összekapcsolás, összesítés vagy ablak esetén), egy al-lekérdezést vagy mindkettőt.
Nem minden AQE által alkalmazott lekérdezés van szükség újraoptimalizáltra. Előfordulhat, hogy az újraoptimalizálás más lekérdezési tervet hoz létre, mint a statikusan lefordított. Annak megállapításához, hogy az AQE módosította-e egy lekérdezés tervét, tekintse meg a következő, Lekérdezéstervek című szakaszt.
Lekérdezési csomagok
Ez a szakasz azt ismerteti, hogyan vizsgálhatja meg a lekérdezési terveket különböző módokon.
Ebben a szakaszban:
Spark UI
AdaptiveSparkPlan
csomópont
Az AQE által alkalmazott lekérdezések egy vagy több AdaptiveSparkPlan
csomópontot tartalmaznak, általában az egyes fő lekérdezések vagy allekérdezések gyökércsomópontjaként.
A lekérdezés futtatása vagy futtatása előtt a isFinalPlan
megfelelő csomópont jelzője a következőképpen AdaptiveSparkPlan
jelenik meg false
: a lekérdezés végrehajtása után a jelölő a isFinalPlan
következőre változik: true.
Fejlődő terv
A lekérdezésterv diagramja a végrehajtás előrehaladtával fejlődik, és a végrehajtás alatt álló legújabb tervet tükrözi. A már végrehajtott csomópontok (amelyekben a metrikák elérhetők) nem változnak, de azok, amelyek az újraoptimalizálások eredményeként idővel nem változnak.
Az alábbiakban egy lekérdezésterv-diagramot láthat:
DataFrame.explain()
AdaptiveSparkPlan
csomópont
Az AQE által alkalmazott lekérdezések egy vagy több AdaptiveSparkPlan
csomópontot tartalmaznak, általában az egyes fő lekérdezések vagy allekérdezések gyökércsomópontjaként. A lekérdezés futtatása vagy futtatása előtt a isFinalPlan
megfelelő csomópont jelzője a következőként false
AdaptiveSparkPlan
jelenik meg: a lekérdezés végrehajtása után a jelölő a isFinalPlan
következőre true
változik: .
Aktuális és kezdeti terv
Az egyes AdaptiveSparkPlan
csomópontok alatt a kezdeti terv (az AQE-optimalizálás alkalmazása előtti terv) és az aktuális vagy a végleges terv is megjelenik attól függően, hogy a végrehajtás befejeződött-e. A jelenlegi terv a végrehajtás előrehaladtával fejlődik.
Futtatókörnyezeti statisztikák
Az egyes shuffle és broadcast fázisok adatstatisztikákat tartalmaznak.
A szakasz futtatása vagy a szakasz futtatása előtt a statisztikák fordítási időbecslések, a jelző isRuntime
pedig például a következő false
: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
A szakasz végrehajtása után a statisztikák futásidőben lesznek összegyűjtve, és a jelző isRuntime
például a következő lesz true
: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
Az alábbiakban egy DataFrame.explain
példa látható:
A végrehajtás előtt
A végrehajtás során
A végrehajtás után
SQL EXPLAIN
AdaptiveSparkPlan
csomópont
Az AQE által alkalmazott lekérdezések egy vagy több AdaptiveSparkPlan csomópontot tartalmaznak, általában az egyes fő lekérdezések vagy allekérdezések gyökércsomópontjaként.
Nincs aktuális terv
Mivel SQL EXPLAIN
nem hajtja végre a lekérdezést, az aktuális terv mindig ugyanaz, mint a kezdeti terv, és nem tükrözi azt, amit az AQE végül végrehajtana.
Az alábbiakban egy SQL-magyarázó példát mutatunk be:
Hatékonyságát
A lekérdezési terv megváltozik, ha egy vagy több AQE-optimalizálás érvénybe lép. Ezeknek az AQE-optimalizálásoknak a hatását az aktuális és a végleges tervek, valamint a kezdeti terv és az aktuális és a végleges tervek konkrét tervcsomópontjai közötti különbség mutatja.
Rendezési egyesítési illesztés dinamikus módosítása szórásos kivonat illesztésre: különböző fizikai illesztési csomópontok az aktuális/végleges terv és a kezdeti terv között
Partíciók dinamikus bányászata: csomópont
CustomShuffleReader
tulajdonsággalCoalesced
Dinamikusan kezeli a ferde illesztéseket: a csomópont
SortMergeJoin
isSkew
mezője igaz.Az üres kapcsolatok dinamikus észlelése és propagálása: a terv egy részét (vagy egészét) a LocalTableScan csomópont váltja fel üresként a relációs mezővel.
Konfiguráció
Ebben a szakaszban:
- Adaptív lekérdezés végrehajtásának engedélyezése és letiltása
- Automatikusan optimalizált shuffle engedélyezése
- Rendezési egyesítési illesztés dinamikus módosítása szórásos kivonat illesztéské
- Partíciók dinamikus szennyezése
- Ferde illesztés dinamikus kezelése
- Üres kapcsolatok dinamikus észlelése és propagálása
Adaptív lekérdezés végrehajtásának engedélyezése és letiltása
Property |
---|
spark.databricks.optimizer.adaptive.enabled Típus: Boolean Az adaptív lekérdezések végrehajtásának engedélyezése vagy letiltása. Alapértelmezett érték: true |
Automatikusan optimalizált shuffle engedélyezése
Property |
---|
spark.sql.shuffle.partitions Típus: Integer Az illesztések vagy aggregációk adatainak összevonásakor használandó partíciók alapértelmezett száma. Az érték auto beállítása lehetővé teszi az automatikusan optimalizált shuffle-t, amely automatikusan meghatározza ezt a számot a lekérdezésterv és a lekérdezés bemeneti adatmérete alapján.Megjegyzés: Strukturált streamelés esetén ez a konfiguráció nem módosítható az azonos ellenőrzőpont-helyről származó lekérdezés-újraindítások között. Alapértelmezett érték: 200 |
Rendezési egyesítési illesztés dinamikus módosítása szórásos kivonat illesztéské
Property |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Típus: Byte String Az a küszöbérték, amely futásidőben a közvetítési csatlakozásra való váltást aktiválja. Alapértelmezett érték: 30MB |
Partíciók dinamikus szennyezése
Property |
---|
spark.sql.adaptive.coalescePartitions.enabled Típus: Boolean Engedélyezze vagy tiltsa le a partíciók szenesítését. Alapértelmezett érték: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes Típus: Byte String A célméret a szenesítés után. A szenesített partícióméretek közel lesznek a célmérethez, de nem lesznek nagyobbak ennél a célméretnél. Alapértelmezett érték: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Típus: Byte String A partíciók minimális mérete a szenesítés után. A szénerősített partícióméretek nem lesznek kisebbek ennél a méretnél. Alapértelmezett érték: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Típus: Integer A partíciók minimális száma a szenesítés után. Nem ajánlott, mert explicit felülbírálások beállítása spark.sql.adaptive.coalescePartitions.minPartitionSize .Alapértelmezett érték: 2x nem. fürtmagok |
Ferde illesztés dinamikus kezelése
Property |
---|
spark.sql.adaptive.skewJoin.enabled Típus: Boolean Az illesztés kezelésének engedélyezése vagy letiltása. Alapértelmezett érték: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor Típus: Integer Az a tényező, amely a medián partícióméret szorzatával hozzájárul annak meghatározásához, hogy egy partíció ferde-e. Alapértelmezett érték: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Típus: Byte String Egy küszöbérték, amely hozzájárul annak meghatározásához, hogy egy partíció ferde-e. Alapértelmezett érték: 256MB |
A partíciók akkor tekinthetők ferdenek, ha mindkettő (partition size > skewedPartitionFactor * median partition size)
és (partition size > skewedPartitionThresholdInBytes)
azok is.true
Üres kapcsolatok dinamikus észlelése és propagálása
Property |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Típus: Boolean A dinamikus üres reláció propagálásának engedélyezése vagy letiltása. Alapértelmezett érték: true |
Frequently asked questions (FAQ)
Ebben a szakaszban:
- Miért nem közvetített az AQE egy kis illesztőtáblát?
- Továbbra is használnom kell egy adásbeléptetési stratégia tippet az AQE engedélyezésével?
- Mi a különbség a ferde illesztés és az AQE illesztés optimalizálása között? Melyiket használjam?
- Miért nem módosította automatikusan az AQE az illesztés sorrendjét?
- Miért nem észlelte az AQE az adateltérést?
Miért nem közvetített az AQE egy kis illesztőtáblát?
Ha a szórásra váró kapcsolat mérete nem éri el ezt a küszöbértéket, de még mindig nem közvetíti:
- Ellenőrizze az illesztés típusát. A közvetítés bizonyos illesztéstípusok esetében nem támogatott, például a bal oldali reláció
LEFT OUTER JOIN
nem közvetíthető. - Az is előfordulhat, hogy a reláció sok üres partíciót tartalmaz, ebben az esetben a feladatok többsége gyorsan befejezheti a rendezési egyesítést, vagy esetleg ferde illesztéskezeléssel optimalizálható. Az AQE nem módosítja az ilyen rendezési egyesítési illesztéseket kivonatos illesztésekre, ha a nem üres partíciók százalékos aránya alacsonyabb, mint
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
a .
Továbbra is használnom kell egy adásbeléptetési stratégia tippet az AQE engedélyezésével?
Igen. A statikusan tervezett szórási illesztések általában nagyobb teljesítményűek, mint az AQE által dinamikusan tervezettek, mivel az AQE nem válthat szórásos illesztésre, amíg el nem végzi az illesztés mindkét oldalán az elegyítést (addigra a tényleges relációs méretek lekérése). Így a közvetítési tipp használata továbbra is jó választás lehet, ha jól ismeri a lekérdezést. Az AQE ugyanúgy fogja figyelembe venni a lekérdezési tippeket, mint a statikus optimalizálást, de továbbra is alkalmazhat dinamikus optimalizálásokat, amelyeket a tippek nem érintenek.
Mi a különbség a ferde illesztés és az AQE illesztés optimalizálása között? Melyiket használjam?
Javasoljuk, hogy az AQE ferde illesztés kezelésére támaszkodjon a ferde illesztés tipp használata helyett, mert az AQE ferde illesztés teljesen automatikus, és általában jobban teljesít, mint a tipp megfelelője.
Miért nem módosította automatikusan az AQE az illesztés sorrendjét?
A dinamikus illesztés átrendezése nem része az AQE-nek.
Miért nem észlelte az AQE az adateltérést?
Két méretfeltételnek kell teljesülnie ahhoz, hogy az AQE ferde partícióként észleljen egy partíciót:
- A partíció mérete nagyobb, mint a
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(alapértelmezett 256 MB) - A partíció mérete nagyobb, mint az összes partíció mediánmérete az ferde partíciótényező
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(alapértelmezett 5)
Emellett bizonyos illesztéstípusok esetében korlátozott a ferdeségkezelés támogatása, LEFT OUTER JOIN
például a bal oldalon csak a ferdeség optimalizálható.
Örökölt
Az "Adaptív végrehajtás" kifejezés a Spark 1.6 óta létezik, de a Spark 3.0 új AQE-jének alapjaiban eltér. A funkció szempontjából a Spark 1.6 csak a "dinamikusan szikkadt partíciók" részt használja. A műszaki architektúra szempontjából az új AQE a lekérdezések futásidejű statisztikákon alapuló dinamikus tervezésének és újratervezésének keretrendszere, amely számos optimalizálást támogat, például a cikkben ismertetetteket, és kiterjeszthető a további lehetséges optimalizálás érdekében.