Execução de consultas adaptável

A execução de consultas adaptável (AQE) é a reotimização da consulta que ocorre durante a execução de consultas.

A motivação para a reotimização do tempo de execução é que o Azure Databricks tem as estatísticas precisas mais atualizadas no final de uma troca aleatória e de transmissão (referida como um estágio de consulta no AQE). Como resultado, o Azure Databricks pode optar por uma estratégia física melhor, escolher um tamanho e número de partição pós-embaralhamento ideais ou fazer otimizações que costumavam exigir dicas, por exemplo, manipulação de junção inclinada.

Isto pode ser muito útil quando a recolha de estatísticas não está ativada ou quando as estatísticas estão obsoletas. Também é útil em locais onde as estatísticas derivadas estaticamente são imprecisas, como no meio de uma consulta complicada ou após a ocorrência de distorção de dados.

Capacidades

O AQE está habilitado por padrão. Tem 4 características principais:

  • Altera dinamicamente a associação de intercalação de ordenação para associação hash de difusão.
  • Aglutina dinamicamente partições (combine pequenas partições em partições de tamanho razoável) após a troca aleatória. Tarefas muito pequenas têm pior taxa de transferência de E/S e tendem a sofrer mais com a sobrecarga de agendamento e a sobrecarga de configuração de tarefas. A combinação de pequenas tarefas economiza recursos e melhora a taxa de transferência do cluster.
  • Processa dinamicamente a distorção na associação de intercalação de ordenação e associação hash aleatória ao dividir (e substituir, se necessário) tarefas distorcidas em tarefas com um tamanho aproximadamente uniforme.
  • Deteta e propaga dinamicamente relações vazias.

Aplicação

A AQE aplica-se a todas as consultas que sejam:

  • Não transmissão em fluxo
  • Contêm pelo menos uma troca (geralmente quando há uma junção, agregação ou janela), uma subconsulta ou ambas.

Nem todas as consultas aplicadas ao AQE são necessariamente reotimizadas. A reotimização pode ou não criar um plano de consulta diferente daquele compilado estaticamente. Para determinar se o plano de uma consulta foi alterado pela AQE, consulte a seção a seguir, Planos de consulta.

Planos de consulta

Esta seção discute como você pode examinar planos de consulta de diferentes maneiras.

Nesta secção:

IU do Apache Spark

AdaptiveSparkPlan

As consultas aplicadas ao AQE contêm um ou mais AdaptiveSparkPlan nós, geralmente como o nó raiz de cada consulta ou subconsulta principal. Antes de a consulta ser executada ou quando estiver em execução, o isFinalPlan sinalizador do nó correspondente AdaptiveSparkPlan aparece como false; após a conclusão da execução da consulta, o isFinalPlan sinalizador muda para true.

Plano evolutivo

O diagrama de plano de consulta evolui à medida que a execução progride e reflete o plano mais atual que está sendo executado. Os nós que já foram executados (nos quais as métricas estão disponíveis) não serão alterados, mas aqueles que não foram executados podem mudar ao longo do tempo como resultado de reotimizações.

Segue-se um exemplo de diagrama de plano de consulta:

Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan

As consultas aplicadas ao AQE contêm um ou mais AdaptiveSparkPlan nós, geralmente como o nó raiz de cada consulta ou subconsulta principal. Antes da execução da consulta ou quando ela está em execução, o isFinalPlan sinalizador do nó correspondente AdaptiveSparkPlan é exibido como false; após a conclusão da execução da consulta, o isFinalPlan sinalizador muda para true.

Plano atual e plano inicial

Em cada AdaptiveSparkPlan nó haverá o plano inicial (o plano antes de aplicar quaisquer otimizações de AKE) e o plano atual ou final, dependendo se a execução foi concluída. O plano atual evoluirá à medida que a execução progride.

Estatísticas de tempo de execução

Cada etapa de shuffle e broadcast contém estatísticas de dados.

Antes da execução do estágio ou quando o estágio está em execução, as estatísticas são estimativas de tempo de compilação, e o sinalizador isRuntime é false, por exemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Após a conclusão da execução do estágio, as estatísticas são aquelas coletadas em tempo de execução, e o sinalizador isRuntime se tornará true, por exemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Segue-se um DataFrame.explain exemplo:

  • Antes da execução

    Before execution

  • Durante a execução

    During execution

  • Após a execução

    After execution

SQL EXPLAIN

AdaptiveSparkPlan

As consultas aplicadas ao AQE contêm um ou mais nós AdaptiveSparkPlan, geralmente como o nó raiz de cada consulta ou subconsulta principal.

Nenhum plano atual

Como SQL EXPLAIN não executa a consulta, o plano atual é sempre o mesmo que o plano inicial e não reflete o que acabaria por ser executado pela AQE.

Segue-se um exemplo de explicação SQL:

SQL explain

Eficácia

O plano de consulta será alterado se uma ou mais otimizações AQE entrarem em vigor. O efeito dessas otimizações de AQE é demonstrado pela diferença entre os planos atual e final e os nós do plano inicial e do plano específico nos planos atual e final.

  • Alterar dinamicamente a associação de intercalação de ordenação para associação hash de difusão: nós de associação físicos diferentes entre o plano atual/final e o plano inicial

    Join strategy string

  • Aglutinar partições dinamicamente: nó CustomShuffleReader com propriedade Coalesced

    Custom shuffle reader

    Custom shuffle reader string

  • Manipule dinamicamente a junção de inclinação: nó SortMergeJoin com campo isSkew como verdadeiro.

    Skew join plan

    Skew join string

  • Detetar e propagar dinamicamente relações vazias: parte (ou inteira) do plano é substituída pelo nó LocalTableScan com o campo de relação como vazio.

    Local table scan

    Local table scan string

Configuração

Nesta secção:

Habilitar e desabilitar a execução de consultas adaptáveis

Propriedade
spark.databricks.optimizer.adaptive.enabled

Tipo: Boolean

Se a execução de consulta adaptável deve ser habilitada ou desabilitada.

Valor predefinido: true

Habilite o shuffle otimizado automaticamente

Propriedade
faísca.sql.shuffle.partições

Tipo: Integer

O número padrão de partições a serem usadas ao embaralhar dados para junções ou agregações. A definição do valor auto permite a otimização automática do shuffle, que determina automaticamente esse número com base no plano de consulta e no tamanho dos dados de entrada da consulta.

Nota: Para Streaming Estruturado, essa configuração não pode ser alterada entre reinicializações de consulta do mesmo local de ponto de verificação.

Valor padrão: 200

Alterar dinamicamente a associação de intercalação de ordenação para associação hash de difusão

Propriedade
spark.databricks.adaptive.autoBroadcastJoinThreshold

Tipo: Byte String

O limite para acionar a mudança para a junção de transmissão em tempo de execução.

Valor predefinido: 30MB

Aglutinar partições dinamicamente

Propriedade
spark.sql.adaptive.coalescePartitions.enabled

Tipo: Boolean

Se é necessário ativar ou desativar a coalescência de partições.

Valor predefinido: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Tipo: Byte String

O tamanho do alvo após a coalescência. Os tamanhos das partições coalesced serão próximos, mas não maiores do que esse tamanho de destino.

Valor predefinido: 64MB
faísca.sql.adaptive.coalescePartitions.minPartitionSize

Tipo: Byte String

O tamanho mínimo das partições após a coalescência. Os tamanhos das partições coalesced não serão menores do que este tamanho.

Valor predefinido: 1MB
faísca.sql.adaptive.coalescePartitions.minPartitionNum

Tipo: Integer

O número mínimo de partições após a coalescência. Não recomendado, porque a configuração substitui explicitamente
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Valor padrão: 2x não. de núcleos de cluster

Manipule dinamicamente a junção inclinada

Propriedade
spark.sql.adaptive.skewJoin.enabled

Tipo: Boolean

Se é necessário ativar ou desativar o tratamento de junção inclinada.

Valor predefinido: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Tipo: Integer

Um fator que, quando multiplicado pelo tamanho mediano da partição, contribui para determinar se uma partição está distorcida.

Valor predefinido: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Tipo: Byte String

Um limite que contribui para determinar se uma partição está distorcida.

Valor predefinido: 256MB

Uma partição é considerada enviesada quando ambos (partition size > skewedPartitionFactor * median partition size) e (partition size > skewedPartitionThresholdInBytes) são true.

Detetar e propagar dinamicamente relações vazias

Propriedade
spark.databricks.adaptive.emptyRelationPropagation.enabled

Tipo: Boolean

Se deseja habilitar ou desabilitar a propagação dinâmica de relações vazias.

Valor predefinido: true

Perguntas mais frequentes (FAQ)

Nesta secção:

Por que a AQE não transmitiu uma pequena mesa de junção?

Se a dimensão da relação que se espera que seja difundida for inferior a este limiar, mas continuar a não ser difundida:

  • Verifique o tipo de junção. A transmissão não é suportada para certos tipos de junção, por exemplo, a relação esquerda de um LEFT OUTER JOIN não pode ser transmitida.
  • Também pode ser que a relação contenha muitas partições vazias, caso em que a maioria das tarefas pode terminar rapidamente com junção de mesclagem de classificação ou pode ser potencialmente otimizada com manipulação de junção inclinada. O AQE evitará a alteração destas associações de intercalação de ordenação para associações hash de difusão se a percentagem de partições não vazias for inferior a spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Ainda devo usar uma dica de estratégia de ingresso de transmissão com o AQE ativado?

Sim. Uma junção de transmissão planejada estaticamente é geralmente mais performante do que uma unida planejada dinamicamente pela AKE, pois a AQE pode não mudar para a junção de transmissão até depois de executar o shuffle para ambos os lados da junção (quando os tamanhos de relação reais são obtidos). Portanto, usar uma dica de transmissão ainda pode ser uma boa escolha se você conhecer bem sua consulta. O AQE respeitará as dicas de consulta da mesma forma que a otimização estática, mas ainda poderá aplicar otimizações dinâmicas que não são afetadas pelas dicas.

Qual é a diferença entre a dica de junção inclinada e a otimização de junção de distorção AKE? Qual devo usar?

Recomenda-se confiar no manuseio de junção de inclinação AQE em vez de usar a dica de junção inclinada, porque a junção de inclinação AQE é completamente automática e, em geral, tem um desempenho melhor do que a contraparte de dica.

Porque é que a AQE não ajustou a minha encomenda de adesão automaticamente?

A reordenação dinâmica da junção não faz parte da AQE.

Por que a AQE não detetou minha distorção de dados?

Há duas condições de tamanho que devem ser satisfeitas para que o AQE detete uma partição como uma partição enviesada:

  • O tamanho da partição é maior do que o spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (padrão 256MB)
  • O tamanho da partição é maior do que o tamanho médio de todas as partições vezes o fator spark.sql.adaptive.skewJoin.skewedPartitionFactor de partição distorcida (padrão 5)

Além disso, o suporte de manipulação de distorção é limitado para certos tipos de junção, por exemplo, no LEFT OUTER JOIN, apenas a inclinação no lado esquerdo pode ser otimizada.

Legado

O termo "Execução Adaptativa" existe desde o Spark 1.6, mas o novo AQE no Spark 3.0 é fundamentalmente diferente. Em termos de funcionalidade, o Spark 1.6 faz apenas a parte de "coalescência dinâmica de partições". Em termos de arquitetura técnica, o novo AQE é uma estrutura de planejamento dinâmico e replanejamento de consultas com base em estatísticas de tempo de execução, que suporta uma variedade de otimizações como as que descrevemos neste artigo e pode ser estendida para permitir mais otimizações potenciais.