Ausführung von adaptiven Abfragen

Die adaptive Abfrageausführung (Adaptive Query Execution, AQE) ist eine erneute Optimierung von Abfragen, die während der Abfrageausführung erfolgt.

Der Beweggrund für die erneute Optimierung der Laufzeit ist, dass Azure Databricks am Ende eines Shuffle- und Broadcastaustauschs (in AQE als Abfragephase bezeichnet) über die aktuellsten genauen Statistiken verfügt. Daher kann Azure Databricks eine bessere physische Strategie wählen, eine optimale Partitionsgröße und -anzahl nach dem Shuffle auswählen oder Optimierungen durchführen, für die früher Hinweise erforderlich waren, z. B. die Behandlung von Skew-Joins.

Dies kann sehr nützlich sein, wenn die Statistiksammlung nicht aktiviert ist oder Statistiken veraltet sind. Dies ist auch dann nützlich, wenn statisch abgeleitete Statistiken ungenau sind, z. B. in der Mitte einer komplizierten Abfrage oder nach dem Auftreten von Datenschiefe.

Funktionen

AQE ist standardmäßig aktiviert. AQE umfasst vier Hauptfunktionen:

  • Dynamische Änderung von Sort-Merge-Join in Broadcast-Hash-Join.
  • Dynamische Zusammenführung von Partitionen (Kombinieren kleiner Partitionen zu Partitionen mit angemessener Größe) nach dem Shuffle-Austausch. Sehr kleine Aufgaben haben einen schlechteren E/A-Durchsatz und unterliegen tendenziell einem höheren Planungsaufwand und einem Mehraufwand bei der Aufgabeneinrichtung. Durch die Kombination kleiner Aufgaben können Sie Ressourcen sparen und den Clusterdurchsatz verbessern.
  • Dynamische Behandlung von Datenschiefe bei Sort-Merge-Join und Shuffle-Hash-Join durch Aufteilung (und ggf. Replikation) von Aufgaben mit Schiefe in ungefähr gleichgroße Aufgaben.
  • Dynamische Erkennung und Weitergabe von leeren Beziehungen.

Anwendung

AQE wird auf alle Abfragen angewendet, für die Folgendes gilt:

  • Nicht-Streaming
  • Sie enthalten mindestens einen Austausch (in der Regel, wenn ein Join, ein Aggregat oder ein Fenster vorhanden ist), eine Unterabfrage oder beides.

Nicht alle Abfragen, auf die AQE angewendet wird, werden zwangsläufig erneut optimiert. Bei der erneuten Optimierung kann sich ein anderer als der statisch kompilierte Abfrageplan ergeben. Informationen dazu, wie Sie feststellen können, ob der Plan einer Abfrage von AQE geändert wurde, finden Sie im folgenden Abschnitt: Abfragepläne.

Abfragepläne

In diesem Abschnitt wird erläutert, wie Sie Abfragepläne auf unterschiedliche Weise untersuchen können.

Inhalt dieses Abschnitts:

Spark-Benutzeroberfläche

AdaptiveSparkPlan-Knoten

Abfragen, auf die AQE angewendet wird, enthalten mindestens einen AdaptiveSparkPlan-Knoten, in der Regel als Stammknoten jeder Hauptabfrage oder Unterabfrage. Vor oder während der Ausführung der Abfrage wird das isFinalPlan-Flag des entsprechenden AdaptiveSparkPlan-Knotens als false angezeigt. Nach Abschluss der Abfrageausführung ändert sich das isFinalPlan-Flag in true..

Der sich entwickelnde Plan

Das Abfrageplandiagramm wird im Laufe der Ausführung weiterentwickelt und spiegelt den aktuellen Plan wider, der ausgeführt wird. Bereits ausgeführte Knoten (für die Metriken verfügbar sind) werden nicht geändert, aber noch nicht ausgeführte Knoten können sich im Laufe der Zeit aufgrund von erneuten Optimierungen ändern.

Im Folgenden finden Sie ein Beispiel für ein Abfrageplandiagramm:

Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan-Knoten

Abfragen, auf die AQE angewendet wird, enthalten mindestens einen AdaptiveSparkPlan-Knoten, in der Regel als Stammknoten jeder Hauptabfrage oder Unterabfrage. Vor oder während der Ausführung der Abfrage wird das isFinalPlan-Flag des entsprechenden AdaptiveSparkPlan-Knotens als false angezeigt. Nach Abschluss der Abfrageausführung ändert sich das isFinalPlan-Flag in true.

Aktueller Plan und Anfangsplan

Unter jedem AdaptiveSparkPlan-Knoten befinden sich sowohl der Anfangsplan (der Plan vor dem Anwenden von AQE-Optimierungen) als auch der aktuelle oder der endgültige Plan, je nachdem, ob die Ausführung abgeschlossen ist. Der aktuelle Plan wird im Laufe der Ausführung weiterentwickelt.

Laufzeitstatistiken

Jede Shuffle- und Broadcastphase enthält Datenstatistiken.

Vor oder während der Ausführung der Phase sind die Statistiken Schätzungen zur Kompilierzeit, und das Flag isRuntime ist auf false festgelegt. Beispiel: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Nach Abschluss der Phasenausführung werden die Statistiken zur Laufzeit erfasst, und das Flag isRuntime wird auf true festgelegt. Beispiel: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Nachfolgend finden Sie ein Beispiel für DataFrame.explain:

  • Vor der Ausführung

    Before execution

  • Während der Ausführung

    During execution

  • Nach der Ausführung

    After execution

SQL EXPLAIN

AdaptiveSparkPlan-Knoten

Abfragen, auf die AQE angewendet wird, enthalten mindestens einen AdaptiveSparkPlan-Knoten, in der Regel als Stammknoten jeder Hauptabfrage oder Unterabfrage.

Kein aktueller Plan

Da SQL EXPLAIN die Abfrage nicht ausführt, entspricht der aktuelle Plan immer dem Anfangsplan und spiegelt nicht wider, was letztendlich von AQE ausgeführt würde.

Nachfolgend finden Sie ein Beispiel für SQL EXPLAIN:

SQL explain

Wirksamkeit

Der Abfrageplan wird geändert, wenn mindestens eine AQE-Optimierung wirksam wird. Die Auswirkung dieser AQE-Optimierungen wird durch den Unterschied zwischen dem aktuellen und dem endgültigen Plan und dem Anfangsplan und bestimmten Planknoten im aktuellen und endgültigen Plan veranschaulicht.

  • Dynamische Änderung von Sort-Merge-Join in Broadcast-Hash-Join: unterschiedliche physische Join-Knoten zwischen dem aktuellen/endgültigen Plan und dem Anfangsplan

    Join strategy string

  • Dynamische Zusammenführung von Partitionen: Knoten CustomShuffleReader mit Eigenschaft Coalesced

    Custom shuffle reader

    Custom shuffle reader string

  • Dynamische Behandlung von Skew-Joins: Knoten SortMergeJoin mit Feld isSkew als „true“.

    Skew join plan

    Skew join string

  • Dynamische Erkennung und Weitergabe von leeren Beziehungen: Ein Teil des Plans (oder der gesamte Plan) wird durch den Knoten „LocalTableScan“ ersetzt, wobei das Beziehungsfeld leer ist.

    Local table scan

    Local table scan string

Konfiguration

Inhalt dieses Abschnitts:

Aktivieren und Deaktivieren der adaptiven Abfrageausführung

Eigenschaft
spark.databricks.optimizer.adaptive.enabled

Geben Sie Folgendes ein: Boolean

Gibt an, ob die adaptive Abfrageausführung aktiviert oder deaktiviert werden soll.

Standardwert: true

Aktivieren des automatisch optimierten Mischens

Eigenschaft
spark.sql.shuffle.partitions

Geben Sie Folgendes ein: Integer

Die Standardanzahl von Partitionen, die beim Mischen von Daten für Verknüpfungen oder Aggregationen verwendet werden sollen. Durch Festlegen des Werts auto wird automatisch optimiertes Mischen aktiviert, was diese Zahl basierend auf dem Abfrageplan und der Größe der Abfrageeingabedaten automatisch bestimmt.

Hinweis: Für strukturiertes Streaming kann diese Konfiguration nicht zwischen Abfrageneustarts vom gleichen Prüfpunktstandort geändert werden.

Standardwert: 200

Dynamische Änderung von Sort-Merge-Join in Broadcast-Hash-Join

Eigenschaft
spark.databricks.adaptive.autoBroadcastJoinThreshold

Geben Sie Folgendes ein: Byte String

Der Schwellenwert, der den Wechsel zum Broadcast-Join zur Laufzeit auslöst.

Standardwert: 30MB

Dynamische Zusammenführung von Partitionen

Eigenschaft
spark.sql.adaptive.coalescePartitions.enabled

Geben Sie Folgendes ein: Boolean

Gibt an, ob die Zusammenführung von Partitionen aktiviert oder deaktiviert werden soll.

Standardwert: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Geben Sie Folgendes ein: Byte String

Die Zielgröße nach der Zusammenführung. Die Größe der zusammengeführten Partitionen entspricht in etwa dieser Zielgröße, liegt aber nicht darüber.

Standardwert: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Geben Sie Folgendes ein: Byte String

Die Mindestgröße der Partitionen nach der Zusammenführung. Die Größe der zusammengeführten Partitionen liegt nicht unter dieser Größe.

Standardwert: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Geben Sie Folgendes ein: Integer

Die Mindestanzahl von Partitionen nach der Zusammenführung. Nicht empfohlen, weil durch das Festlegen dieser Eigenschaft die folgende Eigenschaft explizit außer Kraft gesetzt wird:
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Standardwert: 2-fache Anzahl von Clusterkernen

Dynamische Behandlung von Skew-Joins

Eigenschaft
spark.sql.adaptive.skewJoin.enabled

Geben Sie Folgendes ein: Boolean

Gibt an, ob die Behandlung von Skew-Joins aktiviert oder deaktiviert werden soll.

Standardwert: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Geben Sie Folgendes ein: Integer

Ein Faktor, der bei Multiplizierung mit der mittleren Partitionsgröße dazu beiträgt, zu bestimmen, ob bei einer Partition Datenschiefe vorliegt.

Standardwert: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Geben Sie Folgendes ein: Byte String

Ein Schwellenwert, der dazu beiträgt, zu bestimmen, ob bei einer Partition Datenschiefe vorliegt.

Standardwert: 256MB

Eine Partition gilt als ungleichmäßig, wenn sowohl (partition size > skewedPartitionFactor * median partition size) als auch (partition size > skewedPartitionThresholdInBytes) den Wert true aufweisen.

Dynamische Erkennung und Weitergabe von leeren Beziehungen

Eigenschaft
spark.databricks.adaptive.emptyRelationPropagation.enabled

Geben Sie Folgendes ein: Boolean

Gibt an, ob die dynamische Weitergabe von leeren Beziehungen aktiviert oder deaktiviert werden soll.

Standardwert: true

Häufig gestellte Fragen (FAQ)

Inhalt dieses Abschnitts:

Warum hat AQE eine kleine Join-Tabelle nicht übertragen?

Beachten Sie die folgenden Punkte, wenn die Größe der Beziehung, die übertragen werden soll, unter diesem Schwellenwert liegt, aber trotzdem nicht übertragen wird:

  • Überprüfen Sie den Join-Typ. Broadcast wird für bestimmte Join-Typen nicht unterstützt, z. B. kann die linke Beziehung eines LEFT OUTER JOIN nicht übertragen werden.
  • Es kann auch sein, dass die Beziehung viele leere Partitionen enthält. In diesem Fall können die meisten Aufgaben mit einem Sort-Merge-Join schnell abgeschlossen werden, oder es kann möglicherweise eine Optimierung mit der Behandlung von Skew-Joins vorgenommen werden. AQE vermeidet die Änderung solcher Sort-Merge-Joins in Broadcast-Hash-Joins, wenn der Prozentsatz der nicht leeren Partitionen niedriger ist als spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Sollte ich bei aktivierter AQE trotzdem einen Hinweis auf eine Broadcast-Join-Strategie verwenden?

Ja. Ein statisch geplanter Broadcast-Join ist in der Regel leistungsfähiger als ein von AQE dynamisch geplanter, da AQE möglicherweise erst nach dem Ausführen von Shuffle für beide Seiten des Joins zum Broadcast-Join wechselt (bis zu diesem Zeitpunkt werden die tatsächlichen Beziehungsgrößen ermittelt). Daher kann die Verwendung eines Broadcast-Hinweises eine gute Wahl darstellen, wenn Sie Ihre Abfrage gut kennen. AQE beachtet Abfragehinweise auf die gleiche Weise wie die statische Optimierung, kann aber auch dynamische Optimierungen anwenden, die von den Hinweisen nicht beeinflusst werden.

Was ist der Unterschied zwischen einem Skew-Join-Hinweis und der AQE-Skew-Join-Optimierung? Was sollte ich verwenden?

Es wird empfohlen, sich auf die AQE-Behandlung von Skew-Joins zu verlassen, statt den Skew-Join-Hinweis zu verwenden, da die AQE-Behandlung von Skew-Joins vollständig automatisch erfolgt und im Allgemeinen eine bessere Leistung bietet als das Gegenstück des Hinweises.

Warum hat AQE meine Join-Reihenfolge nicht automatisch angepasst?

Die dynamische Neuanordnung von Joins ist kein Bestandteil von AQE.

Warum hat AQE die Datenschiefe nicht erkannt?

Es gibt zwei Größenbedingungen, die erfüllt sein müssen, damit AQE erkennt, dass bei einer Partition Datenschiefe vorliegt:

  • Die Partitionsgröße liegt über dem spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (Standardwert: 256 MB).
  • Die Partitionsgröße liegt über der mittleren Größe aller Partitionen mal dem Faktor für Partitionen mit Datenschiefe spark.sql.adaptive.skewJoin.skewedPartitionFactor (Standardwert: 5).

Darüber hinaus ist die Unterstützung für die Behandlung von Datenschiefe bei bestimmten Join-Typen eingeschränkt, z. B. kann bei LEFT OUTER JOIN nur die Schiefe auf der linken Seite optimiert werden.

Vorversion

Der Begriff „Adaptive Ausführung“ wird seit Spark 1.6 verwendet, aber die neue AQE in Spark 3.0 ist grundlegend anders. In Bezug auf die Funktionalität übernimmt Spark 1.6 nur den Teil „dynamische Zusammenführung von Partitionen“. In Bezug auf die technische Architektur ist die neue AQE ein Framework der dynamischen Planung und Neuplanung von Abfragen auf der Grundlage von Laufzeitstatistiken, das eine Vielzahl von Optimierungen (wie z. B. die in diesem Artikel beschriebenen) unterstützt und erweitert werden kann, um weitere potenzielle Optimierungen zu ermöglichen.