Ejecución de consultas adaptables

La ejecución de consultas adaptables (AQE) es la re-optimización de consultas que se produce durante la ejecución de la consulta.

La motivación para la re-optimización en tiempo de ejecución es que Azure Databricks tiene las estadísticas más precisas actualizadas al final de un intercambio aleatorio y de difusión (denominado fase de consulta en AQE). Como resultado, Azure Databricks puede optar por una mejor estrategia física, elegir un número y un tamaño de partición posteriores al orden aleatorio óptimos, o realizar optimizaciones que solían requerir sugerencias, por ejemplo, el control de la combinación de sesgo.

Esto puede ser muy útil cuando la recopilación de estadísticas no está activada o cuando las estadísticas están obsoletas. También es útil en lugares donde las estadísticas derivadas estáticamente son inexactas, como en medio de una consulta complicada, o después de la aparición de asimetría de datos.

Funcionalidades

En Databricks Runtime 7.3 LTS, AQE está habilitado de forma predeterminada. Tiene 4 características principales:

  • Cambia dinámicamente la combinación de combinación de combinación de ordenación en combinación hash de difusión.
  • Combina dinámicamente las particiones (combinar particiones pequeñas en particiones de tamaño razonable) después del intercambio aleatorio. Las tareas muy pequeñas tienen peor rendimiento de E/S y tienden a sufrir más por la sobrecarga de programación y la sobrecarga de configuración de tareas. La combinación de tareas pequeñas ahorra recursos y mejora el rendimiento del clúster.
  • Controla dinámicamente el sesgo en la combinación de ordenación y combinación hash aleatoria dividiendo (y replicando si es necesario) las tareas sesgadas en tareas de tamaño aproximado uniforme.
  • Detecta y propaga dinámicamente relaciones vacías.

Aplicación

AQE se aplica a todas las consultas que son:

  • Sin streaming
  • Contiene al menos un intercambio (normalmente cuando hay una combinación, un agregado o una ventana), una subconsjecución o ambas.

No todas las consultas aplicadas a AQE se optimizan necesariamente. La re-optimización podría o no tener un plan de consulta diferente al compilado estáticamente. Consulte la siguiente sección sobre cómo determinar si AQE ha cambiado el plan de una consulta.

Planes de consulta

En esta sección se describe cómo puede examinar los planes de consulta de maneras diferentes.

En esta sección:

Interfaz de usuario de Spark

AdaptiveSparkPlan Nodo

Las consultas aplicadas a AQE contienen uno o varios nodos, normalmente como nodo AdaptiveSparkPlan raíz de cada consulta principal o subconscución. Antes de que se ejecute la consulta o cuando se esté ejecutando, la marca del nodo correspondiente se muestra como ; una vez completada la ejecución de la consulta, la marca isFinalPlan AdaptiveSparkPlan false cambia isFinalPlan a . true.

Plan en evolución

El diagrama del plan de consulta evoluciona a medida que avanza la ejecución y refleja el plan más actual que se ejecuta. Los nodos que ya se han ejecutado (en los que hay métricas disponibles) no cambiarán, pero los que no lo han hecho pueden cambiar con el tiempo como resultado de las optimizaciones.

A continuación se muestra un ejemplo de diagrama de plan de consulta:

Diagrama del plan de consulta

DataFrame.explain()

AdaptiveSparkPlan Nodo

Las consultas aplicadas a AQE contienen uno o varios nodos, normalmente como nodo AdaptiveSparkPlan raíz de cada consulta principal o subconscución. Antes de que se ejecute la consulta o cuando se esté ejecutando, la marca del nodo correspondiente se muestra como ; una vez completada la ejecución de la consulta, la marca isFinalPlan AdaptiveSparkPlan cambia a false isFinalPlan true .

Plan actual e inicial

En cada nodo habrá tanto el plan inicial (el plan antes de aplicar las optimizaciones de AQE) como el plan actual o final, dependiendo de si la ejecución se ha AdaptiveSparkPlan completado. El plan actual evolucionará a medida que avance la ejecución.

Estadísticas en tiempo de ejecución

Cada fase de orden aleatorio y difusión contiene estadísticas de datos.

Antes de que se ejecute la fase o cuando la fase se esté ejecutando, las estadísticas son estimaciones en tiempo de compilación y la marca isRuntime es false , por ejemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Una vez completada la ejecución de la fase, las estadísticas son las recopiladas en tiempo de ejecución y la marca isRuntime se convertirá en , por true ejemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

A continuación se muestra un DataFrame.explain ejemplo:

  • Antes de la ejecución

    Antes de la ejecución

  • Durante la ejecución

    Durante la ejecución

  • Después de la ejecución

    Después de la ejecución

SQL EXPLAIN

AdaptiveSparkPlan Nodo

Las consultas aplicadas a AQE contienen uno o varios nodos AdaptiveSparkPlan, normalmente como nodo raíz de cada consulta principal o subconscución.

No hay ningún plan actual

Como no ejecuta la consulta, el plan actual siempre es el mismo que el plan inicial y no refleja lo que SQL EXPLAIN AQE ejecutaría finalmente.

A continuación se muestra un ejemplo de explicación de SQL:

Explicación de SQL

Eficacia

El plan de consulta cambiará si una o varias optimizaciones de AQE tienen efecto. El efecto de estas optimizaciones de AQE se muestra en la diferencia entre los planes actual y final y el plan inicial y los nodos de plan específicos en los planes actuales y finales.

  • Cambiar dinámicamente la combinación de combinación de ordenación en combinación hash de difusión: nodos de combinación físicos diferentes entre el plan actual o final y el plan inicial

    Cadena de estrategia de combinación

  • Coalesce dinámicamente particiones: nodo CustomShuffleReader con propiedad Coalesced

    Lector aleatorio personalizado

    Cadena de lector aleatorio personalizada

  • Controlar dinámicamente la combinación de sesgo: nodo SortMergeJoin con campo isSkew como true.

    Plan de combinación de sesgo

    Cadena de combinación sesgada

  • Detectar y propagar dinámicamente relaciones vacías: parte de (o todo) el plan se reemplaza por el nodo LocalTableScan por el campo de relación como vacío.

    Examen de tabla local

    Cadena de examen de tabla local

Configuración

En las propiedades de esta sección, reemplace <prefix> por spark.sql.adaptive y por <db_prefix> spark.databricks.adaptive .

En esta sección:

Habilitación y deshabilitación de la ejecución de consultas adaptables

Propiedad Valor predeterminado Descripción
spark.databricks.optimizer.adaptive.enabled true Si se habilita o deshabilita la ejecución de consultas adaptables.

Cambiar dinámicamente la combinación de combinación de ordenación en combinación hash de difusión

Propiedad Valor predeterminado Descripción
<db_prefix>.autoBroadcastJoinThreshold 30 MB Umbral para desencadenar el cambio a la combinación de difusión en tiempo de ejecución.

Coalesce dinámicamente las particiones

Propiedad Valor predeterminado Descripción
<prefix>.coalescePartitions.enabled true Si se habilita o deshabilita la conjunción de particiones.
<prefix>.advisoryPartitionSizeInBytes 64 MB El tamaño de destino después de la conjunción. Los tamaños de partición coalesced estarán cerca, pero no más grandes que este tamaño de destino.
<prefix>.coalescePartitions.minPartitionSize 1MB El tamaño mínimo de las particiones después de la conjunción. Los tamaños de partición coalesced no serán menores que este tamaño.
<prefix>.coalescePartitions.minPartitionNum 2x no. de núcleos de clúster Número mínimo de particiones después de la conjunción. No se recomienda, porque la configuración invalida explícitamente <prefix>.coalescePartitions.minPartitionSize .

Controlar dinámicamente la combinación de sesgo

Propiedad Valor predeterminado Descripción
<prefix>.skewJoin.enabled true Establezca true/false para habilitar o deshabilitar el control de combinaciones sesgadas.
<prefix>.skewJoin.skewedPartitionFactor 5 Un factor que, cuando se multiplica por el tamaño medio de la partición, contribuye a determinar si una partición está sesgada.
<prefix>.skewJoin.skewedPartitionThresholdInBytes 256 MB Umbral que contribuye a determinar si una partición está sesgada.

Una partición se considera sesgada cuando (partition size > skewedPartitionFactor * median partition size) y (partition size > skewedPartitionThresholdInBytes) son true .

Detectar y propagar dinámicamente relaciones vacías

Propiedad Valor predeterminado Descripción
<db_prefix>.emptyRelationPropagation.enabled true Si se habilita o deshabilita la propagación dinámica de relaciones vacías.

Preguntas más frecuentes (P+F)

En esta sección:

¿Por qué AQE no cambió el número de partición aleatorio a pesar de que la mezcla de particiones ya está habilitada?

AQE no cambia el número de partición inicial. Se recomienda establecer un valor razonablemente alto para el número de partición aleatorio y permitir que AQE conste particiones pequeñas en función del tamaño de los datos de salida en cada fase de la consulta.

Si ve un desbordamiento en los trabajos, puede probar lo siguiente:

  • Aumentar la configuración del número de partición aleatoria: spark.sql.shuffle.partitions
  • Habilitación del orden aleatorio optimizado automáticamente estableciendo <db_prefix>.autoOptimizeShuffle.enabled en true

¿Por qué AQE no difundió una tabla de combinación pequeña?

Si el tamaño de la relación que se espera que se difunda se encuentra por debajo de este umbral, pero todavía no se difunde:

  • Compruebe el tipo de combinación. La difusión no se admite para determinados tipos de combinación, por ejemplo, la relación izquierda de LEFT OUTER JOIN un no se puede difundir.
  • También puede ser que la relación contiene una gran cantidad de particiones vacías, en cuyo caso la mayoría de las tareas pueden finalizar rápidamente con la combinación de combinación de ordenación o se puede optimizar potencialmente con el control de la combinación de sesgo. AQE evita cambiar estas combinaciones de combinación de ordenación para difundir combinaciones hash si el porcentaje de particiones no vacías es inferior a <prefix>.nonEmptyPartitionRatioForBroadcastJoin .

¿Debo seguir utilizando una sugerencia de estrategia de combinación de difusión con AQE habilitado?

Sí. Una combinación de difusión planeada estáticamente suele ser más efectiva que una planeada dinámicamente por AQE, ya que AQE podría no cambiar a la combinación de difusión hasta después de realizar el orden aleatorio para ambos lados de la combinación (momento en que se obtienen los tamaños de relación reales). Por lo tanto, el uso de una sugerencia de difusión puede seguir siendo una buena opción si conoce bien la consulta. AQE respetará las sugerencias de consulta de la misma manera que lo hace la optimización estática, pero todavía puede aplicar optimizaciones dinámicas que no se ven afectadas por las sugerencias.

¿Cuál es la diferencia entre la sugerencia de combinación de sesgo y la optimización de combinación de sesgo de AQE? ¿Cuál debo usar?

Se recomienda confiar en el control de la combinación de sesgo de AQE en lugar de usar la sugerencia de combinación de sesgo, ya que la combinación de sesgo de AQE es completamente automática y, en general, funciona mejor que el equivalente de la sugerencia.

¿Por qué AQE no ajustó mi orden de combinación automáticamente?

La reordenación de combinación dinámica no forma parte de AQE a partir Databricks Runtime 7.3 LTS.

¿Por qué AQE no detectó mi asimetría de datos?

Hay dos condiciones de tamaño que se deben cumplir para que AQE detecte una partición como una partición sesgada:

  • El tamaño de la partición es mayor que <prefix>.skewJoin.skewedPartitionThresholdInBytes (el valor predeterminado es 256 MB).
  • El tamaño de partición es mayor que el tamaño medio de todas las particiones veces el factor de partición sesgado <prefix>.skewJoin.skewedPartitionFactor (valor predeterminado 5)

Además, la compatibilidad con el control de sesgos está limitada para determinados tipos de combinación, por ejemplo, en , solo se puede optimizar el sesgo del LEFT OUTER JOIN lado izquierdo.

Heredado

El término "Ejecución adaptable" existe desde Spark 1.6, pero el nuevo AQE en Spark 3.0 es fundamentalmente diferente. En términos de funcionalidad, Spark 1.6 solo realiza la parte "coalesce dinámicamente particiones". En términos de arquitectura técnica, el nuevo AQE es un marco de planeamiento dinámico y replanteado de consultas basado en estadísticas en tiempo de ejecución, que admite una variedad de optimizaciones como las que hemos descrito en este artículo y se puede ampliar para habilitar más optimizaciones potenciales.