自适应查询执行Adaptive query execution

自适应查询执行 (AQE) 是在执行查询期间发生的查询重新优化。Adaptive query execution (AQE) is query re-optimization that occurs during query execution.

运行时重新优化的动机是,Azure Databricks 在无序和广播交换结束时具有最新的准确统计信息, (在 AQE) 中称为查询阶段。The motivation for runtime re-optimization is that Azure Databricks has the most up-to-date accurate statistics at the end of a shuffle and broadcast exchange (referred to as a query stage in AQE). 因此,Azure Databricks 可以选择更好的物理策略、选择最佳的序无序分区大小和数目,或执行用于要求提示的优化(例如,倾斜联接处理)。As a result, Azure Databricks can opt for a better physical strategy, pick an optimal post-shuffle partition size and number, or do optimizations that used to require hints, for example, skew join handling.

当未启用统计信息收集或统计信息过期时,这可能非常有用。This can be very useful when statistics collection is not turned on or when statistics are stale. 它还可用于静态派生统计信息不准确的位置,例如,在复杂的查询过程中,或在数据偏斜发生之后。It is also useful in places where statically derived statistics are inaccurate, such as in the middle of a complicated query, or after the occurrence of data skew.

功能Capabilities

在 Databricks Runtime 7.3 LTS 中,默认情况下,AQE 处于启用状态。In Databricks Runtime 7.3 LTS, AQE is enabled by default. 它有4个主要功能:It has 4 major features:

  • 动态更改排序合并联接进入广播哈希联接。Dynamically changes sort merge join into broadcast hash join.
  • 动态合并分区 (将小分区合并为在无序转换后) 大小合理的分区。Dynamically coalesces partitions (combine small partitions into reasonably sized partitions) after shuffle exchange. 非常小的任务具有更糟的 i/o 吞吐量,并且往往从计划开销和任务设置开销中变得越来越多。Very small tasks have worse I/O throughput and tend to suffer more from scheduling overhead and task setup overhead. 结合小型任务可节省资源并提高群集吞吐量。Combining small tasks saves resources and improves cluster throughput.
  • 动态处理排序合并联接中的扭曲,并通过拆分 (和复制来无序排序哈希联接,如需要) 将扭曲的任务分为大致均匀大小的任务。Dynamically handles skew in sort merge join and shuffle hash join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks.
  • 动态检测并传播空关系。Dynamically detects and propagates empty relations.

应用程序Application

AQE 适用于所有的查询:AQE applies to all queries that are:

  • 非流式处理Non-streaming
  • 至少包含一个 exchange (在存在联接、聚合或窗口) 、一个子查询或同时包含这两个情况时。Contain at least one exchange (usually when there’s a join, aggregate, or window), one sub-query, or both.

并非所有 AQE 应用的查询都必须重新进行优化。Not all AQE-applied queries are necessarily re-optimized. 重新优化的查询计划可能与静态编译的查询计划不同,也可能不会。The re-optimization might or might not come up with a different query plan than the one statically compiled. 请参阅下一节,了解如何确定 AQE 是否更改了查询的计划。Refer to the next section regarding how to determine if a query’s plan has been changed by AQE.

查询计划Query plans

本部分讨论如何以不同方式检查查询计划。This section discusses how you can examine query plans in different ways.

本部分内容:In this section:

Spark UISpark UI

AdaptiveSparkPlan 节点AdaptiveSparkPlan node

AQE 应用查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query. 在运行查询之前或在运行时, isFinalPlan 相应节点的标志将 AdaptiveSparkPlan 显示为; 在 false 执行完查询后, isFinalPlan 标志将更改为 true.Before the query runs or when it is running, the isFinalPlan flag of the corresponding AdaptiveSparkPlan node shows as false; after the query execution completes, the isFinalPlan flag changes to true.

发展计划Evolving plan

查询计划关系图在执行过程中演变,并反映正在执行的最新计划。The query plan diagram evolves as the execution progresses and reflects the most current plan that is being executed. 已执行的节点 (可用指标) 将不会更改,但在重新优化后不会随时间变化的节点。Nodes that have already been executed (in which metrics are available) will not change, but those that haven’t can change over time as the result of re-optimizations.

下面是查询计划图示例:The following is a query plan diagram example:

查询计划示意图Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan 节点AdaptiveSparkPlan node

AQE 应用查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query. 在运行查询之前或在运行时, isFinalPlan 相应节点的标志将 AdaptiveSparkPlan 显示为; 在 false 执行完查询后, isFinalPlan 标志将更改为 trueBefore the query runs or when it is running, the isFinalPlan flag of the corresponding AdaptiveSparkPlan node shows as false; after the query execution completes, the isFinalPlan flag changes to true.

当前计划和初始计划Current and initial plan

在每个 AdaptiveSparkPlan 节点下,将在应用任何 AQE 优化之前,初始计划 (计划) 和当前或最终计划,具体取决于执行是否已完成。Under each AdaptiveSparkPlan node there will be both the initial plan (the plan before applying any AQE optimizations) and the current or the final plan, depending on whether the execution has completed. 当执行过程中,当前计划将不断发展。The current plan will evolve as the execution progresses.

运行时统计信息Runtime statistics

每个无序和广播阶段都包含数据统计信息。Each shuffle and broadcast stage contains data statistics.

在此阶段运行之前或当阶段正在运行时,统计信息为编译时估计值,标志为 isRuntime false ,例如: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);Before the stage runs or when the stage is running, the statistics are compile-time estimates, and the flag isRuntime is false, for example: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

阶段执行完成后,统计信息是在运行时收集的,并且标志 isRuntime 将变为 true ,例如: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)After the stage execution completes, the statistics are those collected at runtime, and the flag isRuntime will become true, for example: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

下面是一个 DataFrame.explain 示例:The following is a DataFrame.explain example:

  • 执行前Before the execution

    执行之前Before execution

  • 执行过程中During the execution

    执行过程中During execution

  • 执行后After the execution

    执行后After execution

SQL EXPLAIN

AdaptiveSparkPlan 节点AdaptiveSparkPlan node

AQE 应用查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query.

无当前计划No current plan

SQL EXPLAIN 执行查询时,当前计划始终与初始计划相同,并且不反映最终由 AQE 执行的操作。As SQL EXPLAIN does not execute the query, the current plan is always the same as the initial plan and does not reflect what would eventually get executed by AQE.

下面是一个 SQL 说明示例:The following is a SQL explain example:

SQL 说明SQL explain

有效性Effectiveness

如果一个或多个 AQE 优化生效,查询计划将更改。The query plan will change if one or more AQE optimizations take effect. 这些 AQE 优化的效果通过当前计划和最终计划中的初始计划和特定计划节点之间的差异来进行演示。The effect of these AQE optimizations is demonstrated by the difference between the current and final plans and the initial plan and specific plan nodes in the current and final plans.

  • 动态更改排序合并联接到广播哈希联接:当前/最终计划与初始计划之间的不同物理联接节点Dynamically change sort merge join into broadcast hash join: different physical join nodes between the current/final plan and the initial plan

    联接策略字符串Join strategy string

  • 动态合并分区: CustomShuffleReader 具有属性的节点 CoalescedDynamically coalesce partitions: node CustomShuffleReader with property Coalesced

    自定义无序读取器Custom shuffle reader

    自定义无序读取器字符串Custom shuffle reader string

  • 动态处理倾斜联接: SortMergeJoin 字段的字段 isSkew 为 true。Dynamically handle skew join: node SortMergeJoin with field isSkew as true.

    倾斜联接计划Skew join plan

    倾斜联接字符串Skew join string

  • 动态检测并传播空关系: (或整个) 的一部分,计划将被节点 LocalTableScan 替换为空的关系字段。Dynamically detect and propagate empty relations: part of (or entire) the plan is replaced by node LocalTableScan with the relation field as empty.

    本地表扫描Local table scan

    本地表扫描字符串Local table scan string

ConfigurationConfiguration

在此部分的属性中,将替换 <prefix>spark.sql.adaptiveIn the properties in this section, replace <prefix> with spark.sql.adaptive.

本部分内容:In this section:

启用和禁用自适应查询执行Enable and disable adaptive query execution

属性Property 默认Default 说明Description
<prefix>.enabled truetrue 是启用还是禁用自适应查询执行。Whether to enable or disable adaptive query execution.

动态更改排序合并联接进入广播哈希联接Dynamically change sort merge join into broadcast hash join

属性Property 默认Default 说明Description
<prefix>.autoBroadcastJoinThreshold 30MB30MB 在运行时触发切换到广播联接的阈值。The threshold to trigger switching to broadcast join at runtime.

动态合并分区Dynamically coalesce partitions

属性Property 默认Default 说明Description
<prefix>.coalescePartitions.enabled truetrue 启用还是禁用分区合并。Whether to enable or disable partition coalescing.
<prefix>.advisoryPartitionSizeInBytes 64MB64MB 合并后的目标大小。The target size after coalescing. 合并的分区大小将接近于,但不大于此目标大小。The coalesced partition sizes will be close to but no bigger than this target size.
<prefix>.coalescePartitions.minPartitionSize 1MB1MB 合并后的最小分区大小。The minimum size of partitions after coalescing. 合并的分区大小将不小于此大小。The coalesced partition sizes will be no smaller than this size.
<prefix>.coalescePartitions.minPartitionNum 2x 无。2x no. 群集核心of cluster cores 合并后的最小分区数。The minimum number of partitions after coalescing. 不建议使用,因为设置显式重写 <prefix>.coalescePartitions.minPartitionSizeNot recommended, because setting explicitly overrides <prefix>.coalescePartitions.minPartitionSize.

动态处理倾斜联接Dynamically handle skew join

属性Property 默认Default 说明Description
<prefix>.skewJoin.enabled truetrue 如果设置为 true/false,则启用/禁用倾斜联接处理。Set true/false to enable/disable skew join handling.
<prefix>.skewJoin.skewedPartitionFactor 55 当乘以中间分区大小时,确定分区是否歪斜的因素。A factor that when multiplied by the median partition size contributes to determining whether a partition is skewed.
<prefix>.skewJoin.skewedPartitionThresholdInBytes 256 MB256MB 用于确定分区是否歪斜的阈值。A threshold that contributes to determining whether a partition is skewed.

当和均为时,分区被视为倾斜 (partition size > skewedPartitionFactor * median partition size) (partition size > skewedPartitionThresholdInBytes) trueA partition is considered skewed when both (partition size > skewedPartitionFactor * median partition size) and (partition size > skewedPartitionThresholdInBytes) are true.

动态检测并传播空关系Dynamically detect and propagate empty relations

属性Property 默认Default 说明Description
<prefix>.emptyRelationPropagation.enabled truetrue 启用还是禁用动态空关系传播。Whether to enable or disable dynamic empty relation propagation.

常见问题 (FAQ)Frequently asked questions (FAQs)

本部分内容:In this section:

尽管已启用分区合并,为什么 AQE 不会更改无序分区号?Why didn’t AQE change the shuffle partition number despite the partition coalescing already being enabled?

AQE 不会更改初始分区号。AQE does not change the initial partition number. 建议为无序分区号设置相当高的值,并根据查询每个阶段的输出数据大小,AQE 合并小分区。It is recommended that you set a reasonably high value for the shuffle partition number and let AQE coalesce small partitions based on the output data size at each stage of the query.

如果在作业中看到溢出,可以尝试执行以下操作:If you see spilling in your jobs, you can try:

  • 增加无序分区号配置: spark.sql.shuffle.partitionsIncreasing the shuffle partition number config: spark.sql.shuffle.partitions
  • 通过将设置为来启用自动优化的无序播放 <prefix>.autoOptimizeShuffle.enabled``trueEnabling auto optimized shuffle by setting <prefix>.autoOptimizeShuffle.enabled to true

为什么 AQE 广播小型联接表?Why didn’t AQE broadcast a small join table?

如果需要广播的关系大小低于此阈值,但仍未广播:If the size of the relation expected to be broadcast does fall under this threshold but is still not broadcast:

  • 检查联接类型。Check the join type. 某些联接类型不支持广播,例如无法广播的左关系 LEFT OUTER JOINBroadcast is not supported for certain join types, for example, the left relation of a LEFT OUTER JOIN cannot be broadcast.
  • 这也可能是关系包含许多空分区,在这种情况下,大部分任务都可以使用排序合并联接来快速完成,或者可能使用倾斜联接处理进行优化。It can also be that the relation contains a lot of empty partitions, in which case the majority of the tasks can finish quickly with sort merge join or it can potentially be optimized with skew join handling. 如果非空分区的百分比低于,AQE 将避免更改此类排序合并联接来广播哈希联接 <prefix>.nonEmptyPartitionRatioForBroadcastJoinAQE avoids changing such sort merge joins to broadcast hash joins if the percentage of non-empty partitions is lower than <prefix>.nonEmptyPartitionRatioForBroadcastJoin.

是否仍应使用启用了 AQE 的广播联接策略提示?Should I still use a broadcast join strategy hint with AQE enabled?

是。Yes. 与动态规划的广播联接相比于动态规划的广播联接通常比动态规划的更高性能 AQE,因为 AQE 可能不会切换到广播联接,而是在执行联接 (两侧的实际关系大小) 时进行的。A statically planned broadcast join is usually more performant than a dynamically planned one by AQE as AQE might not switch to broadcast join until after performing shuffle for both sides of the join (by which time the actual relation sizes are obtained). 如果你知道查询是很好的,则使用广播提示仍是一个不错的选择。So using a broadcast hint can still be a good choice if you know your query well. AQE 将遵循与静态优化相同的查询提示,但仍可应用不受提示影响的动态优化。AQE will respect query hints the same way as static optimization does, but can still apply dynamic optimizations that are not affected by the hints.

倾斜联接提示与 AQE 倾斜联接优化之间有何区别?What is the difference between skew join hint and AQE skew join optimization? 应使用哪种方法?Which one should I use?

建议依赖于 AQE 倾斜联接处理而不是使用倾斜联接提示,因为 AQE 倾斜联接完全是自动的,并且通常比对应的提示更好。It is recommended to rely on AQE skew join handling rather than use the skew join hint, because AQE skew join is completely automatic and in general performs better than the hint counterpart.

为什么 AQE 自动调整联接顺序?Why didn’t AQE adjust my join ordering automatically?

动态联接重新排序不是 AQE 7.3 LTS Databricks Runtime 的一部分。Dynamic join reordering is not part of AQE as of Databricks Runtime 7.3 LTS.

为什么 AQE 检测不到数据歪斜?Why didn’t AQE detect my data skew?

AQE 要将分区检测为倾斜分区,必须满足两个大小条件:There are two size conditions that must be satisfied for AQE to detect a partition as a skewed partition:

  • 分区大小大于 <prefix>.skewJoin.skewedPartitionThresholdInBytes (默认 256mb) The partition size is larger than the <prefix>.skewJoin.skewedPartitionThresholdInBytes (default 256MB)
  • 分区大小大于每个分区的中间大小乘以 <prefix>.skewJoin.skewedPartitionFactor (默认值为 5) The partition size is larger than the median size of all partitions times the skewed partition factor <prefix>.skewJoin.skewedPartitionFactor (default 5)

此外,对于某些联接类型(例如,在中, LEFT OUTER JOIN 仅可优化左侧的倾斜),倾斜处理支持也受到限制。In addition, skew handling support is limited for certain join types, for example, in LEFT OUTER JOIN, only skew on the left side can be optimized.

旧的Legacy

从 Spark 1.6 开始,术语 "自适应执行" 已存在,但 Spark 3.0 中的新 AQE 在本质上是不同的。The term “Adaptive Execution” has existed since Spark 1.6, but the new AQE in Spark 3.0 is fundamentally different. 就功能而言,Spark 1.6 只执行 "动态合并分区" 部分。In terms of functionality, Spark 1.6 does only the “dynamically coalesce partitions” part. 就技术体系结构而言,新的 AQE 是一种基于运行时统计信息的动态规划和 replanning 查询框架,它支持多种优化,如本文中所述,可以扩展以实现更多的潜在优化。In terms of technical architecture, the new AQE is a framework of dynamic planning and replanning of queries based on runtime stats, which supports a variety of optimizations such as the ones we have described in this article and can be extended to enable more potential optimizations.