Share via


Adaptieve queryuitvoering

Adaptieve queryuitvoering (AQE) is heroptimalisatie van query's die wordt gedaan tijdens het uitvoeren van query's.

De motivatie voor het opnieuw optimaliseren van runtimes is dat Azure Databricks de meest actuele nauwkeurige statistieken heeft aan het einde van een shuffle- en broadcast-uitwisseling (ook wel een queryfase in AQE genoemd). Als gevolg hiervan kan Azure Databricks kiezen voor een betere fysieke strategie, een optimale partitiegrootte en -nummer na willekeurige volgorde kiezen of optimalisaties uitvoeren die worden gebruikt om hints te vereisen, bijvoorbeeld scheeftrekken van joinafhandeling.

Dit kan zeer nuttig zijn wanneer het verzamelen van statistieken niet is ingeschakeld of wanneer statistieken verouderd zijn. Het is ook handig op plaatsen waar statisch afgeleide statistieken onnauwkeurig zijn, zoals in het midden van een gecompliceerde query of na het voorkomen van scheeftrekken van gegevens.

Functies

AQE is standaard ingeschakeld. Het heeft vier belangrijke functies:

  • Hiermee wijzigt u de samenvoegbewerking dynamisch in broadcast-hash-join.
  • Hiermee worden partities dynamisch samengevoegd (kleine partities combineren tot redelijk grote partities) na een willekeurige uitwisseling. Zeer kleine taken hebben slechtere I/O-doorvoer en hebben meestal meer last van planningsoverhead en overhead voor het instellen van taken. Door kleine taken te combineren, worden resources opgeslagen en wordt de clusterdoorvoer verbeterd.
  • Verwerkt dynamisch scheefheid in samenvoeging en hash-join in willekeurige volgorde door scheeftrekkende taken te splitsen (en indien nodig te repliceren) in ongeveer gelijkmatige taken.
  • Automatisch worden lege relaties gedetecteerd en doorgegeven.

Aanvraag

AQE is van toepassing op alle query's die:

  • Niet-streaming
  • Bevatten ten minste één uitwisseling (meestal wanneer er een join, aggregaat of venster is), één subquery of beide.

Niet alle AQE-toegepaste query's zijn noodzakelijkerwijs opnieuw geoptimaliseerd. De heroptimalisatie kan al dan niet een ander queryplan opleveren dan het plan dat statisch is gecompileerd. Als u wilt bepalen of het plan van een query is gewijzigd door AQE, raadpleegt u de volgende sectie, Queryplannen.

Queryplannen

In deze sectie wordt beschreven hoe u queryplannen op verschillende manieren kunt onderzoeken.

In deze sectie:

Apache Spark-gebruikersinterface

AdaptiveSparkPlan-knooppunt

AQE-toegepaste query's bevatten een of meer AdaptiveSparkPlan knooppunten, meestal als hoofdknooppunt van elke hoofdquery of subquery. Voordat de query wordt uitgevoerd of wanneer deze wordt uitgevoerd, wordt de isFinalPlan vlag van het bijbehorende AdaptiveSparkPlan knooppunt weergegeven als false; nadat de uitvoering van de query is voltooid, wordt de isFinalPlan vlag gewijzigd in true.

Ontwikkelend plan

Het queryplandiagram ontwikkelt zich naarmate de uitvoering vordert en weerspiegelt het meest recente plan dat wordt uitgevoerd. Knooppunten die al zijn uitgevoerd (waarin metrische gegevens beschikbaar zijn) worden niet gewijzigd, maar knooppunten die na verloop van tijd niet kunnen worden gewijzigd als gevolg van heroptimalisaties.

Hier volgt een voorbeeld van een queryplandiagram:

Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan-knooppunt

AQE-toegepaste query's bevatten een of meer AdaptiveSparkPlan knooppunten, meestal als hoofdknooppunt van elke hoofdquery of subquery. Voordat de query wordt uitgevoerd of wanneer deze wordt uitgevoerd, wordt de isFinalPlan vlag van het bijbehorende AdaptiveSparkPlan knooppunt weergegeven als false; nadat de uitvoering van de query is voltooid, wordt de isFinalPlan vlag gewijzigd in true.

Huidig en eerste plan

Onder elk AdaptiveSparkPlan knooppunt zijn zowel het eerste plan (het plan voor het toepassen van AQE-optimalisaties) als het huidige of het uiteindelijke plan, afhankelijk van of de uitvoering is voltooid. Het huidige plan zal zich ontwikkelen naarmate de uitvoering vordert.

Runtimestatistieken

Elke shuffle- en broadcastfase bevat gegevensstatistieken.

Voordat de fase wordt uitgevoerd of wanneer de fase wordt uitgevoerd, zijn de statistieken schattingen van de compilatietijd en de vlag isRuntime , falsebijvoorbeeld: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Nadat de uitvoering van de fase is voltooid, zijn de statistieken die tijdens runtime worden verzameld en wordt truede vlag isRuntime bijvoorbeeld:Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Hier volgt een DataFrame.explain voorbeeld:

  • Vóór de uitvoering

    Before execution

  • Tijdens de uitvoering

    During execution

  • Na de uitvoering

    After execution

SQL EXPLAIN

AdaptiveSparkPlan-knooppunt

AQE-toegepaste query's bevatten een of meer AdaptiveSparkPlan-knooppunten, meestal als het hoofdknooppunt van elke hoofdquery of subquery.

Geen huidig abonnement

Omdat SQL EXPLAIN de query niet wordt uitgevoerd, is het huidige plan altijd hetzelfde als het oorspronkelijke plan en wordt niet weergegeven wat uiteindelijk door AQE wordt uitgevoerd.

Hier volgt een voorbeeld van een SQL-uitleg:

SQL explain

Doeltreffendheid

Het queryplan wordt gewijzigd als een of meer AQE-optimalisaties van kracht worden. Het effect van deze AQE-optimalisaties wordt gedemonstreerd door het verschil tussen de huidige en definitieve plannen en het eerste plan en specifieke planknooppunten in de huidige en definitieve plannen.

  • De samenvoegbewerking dynamisch wijzigen in broadcast-hash-join: verschillende fysieke join-knooppunten tussen het huidige/definitieve plan en het eerste plan

    Join strategy string

  • Dynamisch samenvoegingspartities: knooppunt CustomShuffleReader met eigenschap Coalesced

    Custom shuffle reader

    Custom shuffle reader string

  • Dynamisch afhandelen van scheefheidsdeelname: knooppunt SortMergeJoin met veld isSkew als waar.

    Skew join plan

    Skew join string

  • Lege relaties dynamisch detecteren en doorgeven: een deel van (of geheel) het plan wordt vervangen door localTableScan van het knooppunt door het relationele veld als leeg.

    Local table scan

    Local table scan string

Configuration

In deze sectie:

Adaptieve queryuitvoering in- en uitschakelen

Eigenschappen
spark.databricks.optimizer.adaptive.enabled

Type: Boolean

Of u adaptieve query's wilt in- of uitschakelen.

Standaardwaarde: true

Automatisch geoptimaliseerde shuffle inschakelen

Eigenschappen
spark.sql.shuffle.partitions

Type: Integer

Het standaardaantal partities dat moet worden gebruikt bij het opsnipperen van gegevens voor joins of aggregaties. Als u de waarde auto instelt, wordt automatisch geoptimaliseerde willekeurige volgorde ingeschakeld, waardoor dit aantal automatisch wordt bepaald op basis van het queryplan en de grootte van de queryinvoergegevens.

Opmerking: Voor gestructureerd streamen kan deze configuratie niet worden gewijzigd tussen opnieuw opstarten van query's vanaf dezelfde controlepuntlocatie.

Standaardwaarde: 200

Samenvoegen samenvoegen dynamisch wijzigen in broadcast-hash-join

Eigenschappen
spark.databricks.adaptive.autoBroadcastJoinThreshold

Type: Byte String

De drempelwaarde voor het activeren van overschakelen naar broadcast-join tijdens runtime.

Standaardwaarde: 30MB

Dynamisch samenvoegingspartities

Eigenschappen
spark.sql.adaptive.coalescePartitions.enabled

Type: Boolean

Of partities samenvoegen moeten worden in- of uitgeschakeld.

Standaardwaarde: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Type: Byte String

De doelgrootte na het samenvoegen. De grootten van de gesamenceerde partities liggen dicht bij, maar niet groter dan deze doelgrootte.

Standaardwaarde: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Type: Byte String

De minimale grootte van partities na het samenvoegen. De grootten van de samengesamende partities zijn niet kleiner dan deze grootte.

Standaardwaarde: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Type: Integer

Het minimale aantal partities na het samenvoegen. Niet aanbevolen, omdat het instellen expliciet overschrijft
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Standaardwaarde: 2x nee. van clusterkernen

Scheefheidsdeelname dynamisch verwerken

Eigenschappen
spark.sql.adaptive.skewJoin.enabled

Type: Boolean

Of u de verwerking van scheefheidsdeelname wilt in- of uitschakelen.

Standaardwaarde: true
spark.sql.adaptive.skewJoin.skewEdPartitionFactor

Type: Integer

Een factor die bij vermenigvuldiging met de mediaanpartitiegrootte bijdraagt aan het bepalen of een partitie scheef is.

Standaardwaarde: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Type: Byte String

Een drempelwaarde die bijdraagt aan het bepalen of een partitie scheef is.

Standaardwaarde: 256MB

Een partitie wordt beschouwd als scheeftrekken wanneer beide (partition size > skewedPartitionFactor * median partition size) en (partition size > skewedPartitionThresholdInBytes) zijn true.

Lege relaties dynamisch detecteren en doorgeven

Eigenschappen
spark.databricks.adaptive.emptyRelationPropagation.enabled

Type: Boolean

Hiermee wordt aangegeven of dynamische, lege relationele doorgifte moet worden ingeschakeld of uitgeschakeld.

Standaardwaarde: true

Veelgestelde vragen

In deze sectie:

Waarom heeft AQE geen kleine jointabel uitgezonden?

Als de grootte van de verwachte relatie onder deze drempelwaarde valt, maar nog steeds niet wordt uitgezonden:

  • Controleer het jointype. Broadcast wordt niet ondersteund voor bepaalde jointypen, bijvoorbeeld de linkerrelatie van een LEFT OUTER JOIN kan niet worden uitgezonden.
  • Het kan ook zijn dat de relatie veel lege partities bevat. In dat geval kan het merendeel van de taken snel worden voltooid met sort merge join of kan deze worden geoptimaliseerd met de verwerking van scheve joins. AQE vermijdt het wijzigen van dergelijke samenvoegingsdeelnames om hash-joins uit te zenden als het percentage niet-lege partities lager is dan spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Moet ik nog steeds een hint voor een broadcast-joinstrategie gebruiken waarvoor AQE is ingeschakeld?

Ja. Een statisch geplande broadcast-join is meestal beter presterend dan een dynamisch geplande join door AQE, omdat AQE mogelijk pas overschakelt naar broadcast-join nadat een willekeurige volgorde voor beide zijden van de join is uitgevoerd (waarbij de werkelijke relatiegrootten worden verkregen). Het gebruik van een broadcast-hint kan dus nog steeds een goede keuze zijn als u uw query goed kent. AQE respecteert queryhints op dezelfde manier als statische optimalisatie, maar kan dynamische optimalisaties toepassen die niet worden beïnvloed door de hints.

Wat is het verschil tussen scheefheidsdeelnamehint en optimalisatie van AQE-joins? Welke moet ik gebruiken?

Het wordt aanbevolen om te vertrouwen op AQE scheefheidsdeelnameverwerking in plaats van de scheefheidshint te gebruiken, omdat AQE-scheefheidsdeelname volledig automatisch is en in het algemeen beter presteert dan de hint-tegenhanger.

Waarom heeft AQE mijn joinvolgorde niet automatisch aangepast?

Dynamische joins opnieuw ordenen maakt geen deel uit van AQE.

Waarom heeft AQE mijn gegevens scheeftrekken niet gedetecteerd?

Er zijn twee groottevoorwaarden waaraan moet worden voldaan voor AQE om een partitie te detecteren als een scheve partitie:

  • De partitiegrootte is groter dan de spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (standaard 256 MB)
  • De partitiegrootte is groter dan de mediaangrootte van alle partities en de scheve partitiefactor spark.sql.adaptive.skewJoin.skewedPartitionFactor (standaard 5)

Bovendien is de ondersteuning voor scheefheidsafhandeling beperkt voor bepaalde jointypen, bijvoorbeeld in LEFT OUTER JOIN, kunnen alleen scheeftrekken aan de linkerkant worden geoptimaliseerd.

Verouderd

De term Adaptieve uitvoering bestaat sinds Spark 1.6, maar de nieuwe AQE in Spark 3.0 is fundamenteel anders. Wat de functionaliteit betreft, voert Spark 1.6 alleen het gedeelte 'dynamisch samenvoegingspartities' uit. Wat de technische architectuur betreft, is de nieuwe AQE een framework van dynamische planning en herplanning van query's op basis van runtimestatistieken, die ondersteuning biedt voor diverse optimalisaties, zoals de optimalisaties die we in dit artikel hebben beschreven en kunnen worden uitgebreid om meer mogelijke optimalisaties mogelijk te maken.