Adaptive query execution

Adaptive query execution (AQE) is query re-optimization that occurs during query execution.

A futtatókörnyezet újraoptimalizálásának az a motivációja, hogy az Azure Databricks a legfrissebb pontos statisztikákkal rendelkezik egy váltó- és szóráscsere végén (az AQE lekérdezési szakasza). Ennek eredményeképpen az Azure Databricks jobb fizikai stratégiát választhat, kiválaszthatja az optimális utószűrési partícióméretet és -számot, vagy optimalizálhatja azokat az optimalizálásokat, amelyekhez tippekre van szükség, például az illesztés kezelésének elmozdulásához.

Ez nagyon hasznos lehet, ha a statisztikai adatgyűjtés nincs bekapcsolva, vagy ha a statisztikák elavultak. Olyan helyeken is hasznos, ahol a statikusan származtatott statisztikák pontatlanok, például egy bonyolult lekérdezés közepén vagy az adateltérés előfordulása után.

Capabilities

Az AQE alapértelmezés szerint engedélyezve van. 4 fő funkcióval rendelkezik:

  • Dinamikusan módosítja a rendezési egyesítés illesztést a szórásos kivonat illesztéské.
  • Dinamikusan alakítja ki a partíciókat (a kis partíciókat ésszerűen méretezhető partíciókat kombinálja) a váltás után. A nagyon kis feladatok nagyobb I/O-átviteli sebességgel rendelkeznek, és általában jobban szenvednek az ütemezési többletterheléstől és a tevékenységbeállítási többletterheléstől. A kis feladatok egyesítése erőforrásokat takarít meg, és javítja a fürt átviteli sebességét.
  • Dinamikusan kezeli a rendezési egyesítési illesztésben a ferdeségeket és a kivonat illesztését úgy, hogy az elvarrott tevékenységeket nagyjából egyenlő méretű tevékenységekre osztja (és szükség esetén replikálja).
  • Dinamikusan észleli és propagálja az üres kapcsolatokat.

Application

Az AQE az összes olyan lekérdezésre vonatkozik, amely a következő:

  • Nem streamelés
  • Legalább egy cserét tartalmaz (általában összekapcsolás, összesítés vagy ablak esetén), egy al-lekérdezést vagy mindkettőt.

Nem minden AQE által alkalmazott lekérdezés van szükség újraoptimalizáltra. Előfordulhat, hogy az újraoptimalizálás más lekérdezési tervet hoz létre, mint a statikusan lefordított. Annak megállapításához, hogy az AQE módosította-e egy lekérdezés tervét, tekintse meg a következő, Lekérdezéstervek című szakaszt.

Lekérdezési csomagok

Ez a szakasz azt ismerteti, hogyan vizsgálhatja meg a lekérdezési terveket különböző módokon.

Ebben a szakaszban:

Spark UI

AdaptiveSparkPlan csomópont

Az AQE által alkalmazott lekérdezések egy vagy több AdaptiveSparkPlan csomópontot tartalmaznak, általában az egyes fő lekérdezések vagy allekérdezések gyökércsomópontjaként. A lekérdezés futtatása vagy futtatása előtt a isFinalPlan megfelelő csomópont jelzője a következőképpen AdaptiveSparkPlan jelenik meg false: a lekérdezés végrehajtása után a jelölő a isFinalPlan következőre változik: true.

Fejlődő terv

A lekérdezésterv diagramja a végrehajtás előrehaladtával fejlődik, és a végrehajtás alatt álló legújabb tervet tükrözi. A már végrehajtott csomópontok (amelyekben a metrikák elérhetők) nem változnak, de azok, amelyek az újraoptimalizálások eredményeként idővel nem változnak.

Az alábbiakban egy lekérdezésterv-diagramot láthat:

Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan csomópont

Az AQE által alkalmazott lekérdezések egy vagy több AdaptiveSparkPlan csomópontot tartalmaznak, általában az egyes fő lekérdezések vagy allekérdezések gyökércsomópontjaként. A lekérdezés futtatása vagy futtatása előtt a isFinalPlan megfelelő csomópont jelzője a következőként falseAdaptiveSparkPlan jelenik meg: a lekérdezés végrehajtása után a jelölő a isFinalPlan következőre trueváltozik: .

Aktuális és kezdeti terv

Az egyes AdaptiveSparkPlan csomópontok alatt a kezdeti terv (az AQE-optimalizálás alkalmazása előtti terv) és az aktuális vagy a végleges terv is megjelenik attól függően, hogy a végrehajtás befejeződött-e. A jelenlegi terv a végrehajtás előrehaladtával fejlődik.

Futtatókörnyezeti statisztikák

Az egyes shuffle és broadcast fázisok adatstatisztikákat tartalmaznak.

A szakasz futtatása vagy a szakasz futtatása előtt a statisztikák fordítási időbecslések, a jelző isRuntime pedig például a következő false: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

A szakasz végrehajtása után a statisztikák futásidőben lesznek összegyűjtve, és a jelző isRuntime például a következő lesz true: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Az alábbiakban egy DataFrame.explain példa látható:

  • A végrehajtás előtt

    Before execution

  • A végrehajtás során

    During execution

  • A végrehajtás után

    After execution

SQL EXPLAIN

AdaptiveSparkPlan csomópont

Az AQE által alkalmazott lekérdezések egy vagy több AdaptiveSparkPlan csomópontot tartalmaznak, általában az egyes fő lekérdezések vagy allekérdezések gyökércsomópontjaként.

Nincs aktuális terv

Mivel SQL EXPLAIN nem hajtja végre a lekérdezést, az aktuális terv mindig ugyanaz, mint a kezdeti terv, és nem tükrözi azt, amit az AQE végül végrehajtana.

Az alábbiakban egy SQL-magyarázó példát mutatunk be:

SQL explain

Hatékonyságát

A lekérdezési terv megváltozik, ha egy vagy több AQE-optimalizálás érvénybe lép. Ezeknek az AQE-optimalizálásoknak a hatását az aktuális és a végleges tervek, valamint a kezdeti terv és az aktuális és a végleges tervek konkrét tervcsomópontjai közötti különbség mutatja.

  • Rendezési egyesítési illesztés dinamikus módosítása szórásos kivonat illesztésre: különböző fizikai illesztési csomópontok az aktuális/végleges terv és a kezdeti terv között

    Join strategy string

  • Partíciók dinamikus bányászata: csomópont CustomShuffleReader tulajdonsággal Coalesced

    Custom shuffle reader

    Custom shuffle reader string

  • Dinamikusan kezeli a ferde illesztéseket: a csomópont SortMergeJoinisSkew mezője igaz.

    Skew join plan

    Skew join string

  • Az üres kapcsolatok dinamikus észlelése és propagálása: a terv egy részét (vagy egészét) a LocalTableScan csomópont váltja fel üresként a relációs mezővel.

    Local table scan

    Local table scan string

Konfiguráció

Ebben a szakaszban:

Adaptív lekérdezés végrehajtásának engedélyezése és letiltása

Property
spark.databricks.optimizer.adaptive.enabled

Típus: Boolean

Az adaptív lekérdezések végrehajtásának engedélyezése vagy letiltása.

Alapértelmezett érték: true

Automatikusan optimalizált shuffle engedélyezése

Property
spark.sql.shuffle.partitions

Típus: Integer

Az illesztések vagy aggregációk adatainak összevonásakor használandó partíciók alapértelmezett száma. Az érték auto beállítása lehetővé teszi az automatikusan optimalizált shuffle-t, amely automatikusan meghatározza ezt a számot a lekérdezésterv és a lekérdezés bemeneti adatmérete alapján.

Megjegyzés: Strukturált streamelés esetén ez a konfiguráció nem módosítható az azonos ellenőrzőpont-helyről származó lekérdezés-újraindítások között.

Alapértelmezett érték: 200

Rendezési egyesítési illesztés dinamikus módosítása szórásos kivonat illesztéské

Property
spark.databricks.adaptive.autoBroadcastJoinThreshold

Típus: Byte String

Az a küszöbérték, amely futásidőben a közvetítési csatlakozásra való váltást aktiválja.

Alapértelmezett érték: 30MB

Partíciók dinamikus szennyezése

Property
spark.sql.adaptive.coalescePartitions.enabled

Típus: Boolean

Engedélyezze vagy tiltsa le a partíciók szenesítését.

Alapértelmezett érték: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Típus: Byte String

A célméret a szenesítés után. A szenesített partícióméretek közel lesznek a célmérethez, de nem lesznek nagyobbak ennél a célméretnél.

Alapértelmezett érték: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Típus: Byte String

A partíciók minimális mérete a szenesítés után. A szénerősített partícióméretek nem lesznek kisebbek ennél a méretnél.

Alapértelmezett érték: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Típus: Integer

A partíciók minimális száma a szenesítés után. Nem ajánlott, mert explicit felülbírálások beállítása
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Alapértelmezett érték: 2x nem. fürtmagok

Ferde illesztés dinamikus kezelése

Property
spark.sql.adaptive.skewJoin.enabled

Típus: Boolean

Az illesztés kezelésének engedélyezése vagy letiltása.

Alapértelmezett érték: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Típus: Integer

Az a tényező, amely a medián partícióméret szorzatával hozzájárul annak meghatározásához, hogy egy partíció ferde-e.

Alapértelmezett érték: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Típus: Byte String

Egy küszöbérték, amely hozzájárul annak meghatározásához, hogy egy partíció ferde-e.

Alapértelmezett érték: 256MB

A partíciók akkor tekinthetők ferdenek, ha mindkettő (partition size > skewedPartitionFactor * median partition size) és (partition size > skewedPartitionThresholdInBytes) azok is.true

Üres kapcsolatok dinamikus észlelése és propagálása

Property
spark.databricks.adaptive.emptyRelationPropagation.enabled

Típus: Boolean

A dinamikus üres reláció propagálásának engedélyezése vagy letiltása.

Alapértelmezett érték: true

Frequently asked questions (FAQ)

Ebben a szakaszban:

Miért nem közvetített az AQE egy kis illesztőtáblát?

Ha a szórásra váró kapcsolat mérete nem éri el ezt a küszöbértéket, de még mindig nem közvetíti:

  • Ellenőrizze az illesztés típusát. A közvetítés bizonyos illesztéstípusok esetében nem támogatott, például a bal oldali reláció LEFT OUTER JOIN nem közvetíthető.
  • Az is előfordulhat, hogy a reláció sok üres partíciót tartalmaz, ebben az esetben a feladatok többsége gyorsan befejezheti a rendezési egyesítést, vagy esetleg ferde illesztéskezeléssel optimalizálható. Az AQE nem módosítja az ilyen rendezési egyesítési illesztéseket kivonatos illesztésekre, ha a nem üres partíciók százalékos aránya alacsonyabb, mint spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoina .

Továbbra is használnom kell egy adásbeléptetési stratégia tippet az AQE engedélyezésével?

Igen. A statikusan tervezett szórási illesztések általában nagyobb teljesítményűek, mint az AQE által dinamikusan tervezettek, mivel az AQE nem válthat szórásos illesztésre, amíg el nem végzi az illesztés mindkét oldalán az elegyítést (addigra a tényleges relációs méretek lekérése). Így a közvetítési tipp használata továbbra is jó választás lehet, ha jól ismeri a lekérdezést. Az AQE ugyanúgy fogja figyelembe venni a lekérdezési tippeket, mint a statikus optimalizálást, de továbbra is alkalmazhat dinamikus optimalizálásokat, amelyeket a tippek nem érintenek.

Mi a különbség a ferde illesztés és az AQE illesztés optimalizálása között? Melyiket használjam?

Javasoljuk, hogy az AQE ferde illesztés kezelésére támaszkodjon a ferde illesztés tipp használata helyett, mert az AQE ferde illesztés teljesen automatikus, és általában jobban teljesít, mint a tipp megfelelője.

Miért nem módosította automatikusan az AQE az illesztés sorrendjét?

A dinamikus illesztés átrendezése nem része az AQE-nek.

Miért nem észlelte az AQE az adateltérést?

Két méretfeltételnek kell teljesülnie ahhoz, hogy az AQE ferde partícióként észleljen egy partíciót:

  • A partíció mérete nagyobb, mint a spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (alapértelmezett 256 MB)
  • A partíció mérete nagyobb, mint az összes partíció mediánmérete az ferde partíciótényező spark.sql.adaptive.skewJoin.skewedPartitionFactor (alapértelmezett 5)

Emellett bizonyos illesztéstípusok esetében korlátozott a ferdeségkezelés támogatása, LEFT OUTER JOINpéldául a bal oldalon csak a ferdeség optimalizálható.

Örökölt

Az "Adaptív végrehajtás" kifejezés a Spark 1.6 óta létezik, de a Spark 3.0 új AQE-jének alapjaiban eltér. A funkció szempontjából a Spark 1.6 csak a "dinamikusan szikkadt partíciók" részt használja. A műszaki architektúra szempontjából az új AQE a lekérdezések futásidejű statisztikákon alapuló dinamikus tervezésének és újratervezésének keretrendszere, amely számos optimalizálást támogat, például a cikkben ismertetetteket, és kiterjeszthető a további lehetséges optimalizálás érdekében.