Execução de consultas adaptável

Execução de consulta adaptativa (AQE) é a re-otimização de consulta que ocorre durante a execução de consultas.

A motivação para a rein otimização do tempo de execução é que o Azure Databricks tem as estatísticas precisas mais atualizadas no final de uma troca de mensagens e de transmissão (referida como uma fase de consulta no AQE). Como resultado, a Azure Databricks pode optar por uma melhor estratégia física, escolher um tamanho e número de partição pós-baralhar ideal, ou fazer otimizações que costumavam exigir sugestões, por exemplo, manipulação de junta de skew.

Isto pode ser muito útil quando a recolha de estatísticas não é ligada ou quando as estatísticas estão em mau estado. Também é útil em locais onde as estatísticas derivadas estáticas são imprecisas, como no meio de uma consulta complicada, ou após a ocorrência de distorção de dados.

Capacidades

No Databricks Runtime 7.3 LTS, o AQE é ativado por padrão. Tem 4 características principais:

  • Mudanças dinâmicas tipo fusão juntar-se em junção de haxixe de difusão.
  • Dinâmicamente coalesces divisições (combinar pequenas divisórias em divisórias razoavelmente dimensionadas) após a troca de baralhar. Tarefas muito pequenas têm pior rendimento de E/S e tendem a sofrer mais com o agendamento de sobrecargas e sobrecargas de configuração de tarefas. Combinar pequenas tarefas poupa recursos e melhora a produção do cluster.
  • Os manuseamentos dinâmicos de fusões de tipo juntam-se e baralham a unção de haxixe dividindo (e replicando se necessário) tarefas distorcidas em tarefas aproximadamente uniformes.
  • Deteta e propaga relações vazias.

Aplicação

O AQE aplica-se a todas as consultas que sejam:

  • Não-streaming
  • Conter pelo menos uma troca (geralmente quando há uma junção, agregado ou janela), uma sub-consulta, ou ambas.

Nem todas as consultas aplicadas pelo AQE são necessariamente reotimizadas. A re-otimização pode ou não surgir com um plano de consulta diferente do que o estático compilado. Consulte a secção seguinte sobre como determinar se o plano de uma consulta foi alterado pelo AQE.

Planos de consulta

Esta secção discute como pode examinar planos de consulta de diferentes maneiras.

Nesta secção:

UI faísca

AdaptiveSparkPlan

As consultas aplicadas a AQE contêm um ou mais AdaptiveSparkPlan nós, geralmente como nó raiz de cada consulta principal ou sub-consulta. Antes da consulta decorrer ou quando está em execução, a isFinalPlan bandeira do nó correspondente mostra como ; após a AdaptiveSparkPlan false execução da consulta, a isFinalPlan bandeira muda para true.

Plano de evolução

O diagrama do plano de consulta evolui à medida que a execução progride e reflete o plano mais atual que está a ser executado. Os nós que já foram executados (em que as métricas estão disponíveis) não vão mudar, mas aqueles que não podem mudar ao longo do tempo como resultado de re-otimizações.

Segue-se um exemplo de diagrama de plano de consulta:

Diagrama do plano de consulta

DataFrame.explain()

AdaptiveSparkPlan

As consultas aplicadas a AQE contêm um ou mais AdaptiveSparkPlan nós, geralmente como nó raiz de cada consulta principal ou sub-consulta. Antes da consulta decorrer ou quando está em execução, a isFinalPlan bandeira do nó correspondente mostra como ; após a AdaptiveSparkPlan false execução da consulta, a isFinalPlan bandeira muda para true .

Plano atual e inicial

Em cada AdaptiveSparkPlan nó haverá tanto o plano inicial (o plano antes de aplicar quaisquer otimizações AQE) como o plano atual ou final, dependendo se a execução foi concluída. O plano atual evoluirá à medida que a execução progride.

Estatísticas de tempo de execução

Cada fase de baralhar e transmitir contém estatísticas de dados.

Antes da fase de decorrer ou quando o estágio está a decorrer, as estatísticas são estimativas de tempo de compilação, e a bandeira isRuntime false é, por exemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Após a execução da fase concluída, as estatísticas são as recolhidas em tempo de execução, e a bandeira isRuntime tornar-se-á, true por exemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Segue-se um DataFrame.explain exemplo:

  • Antes da execução

    Antes da execução

  • Durante a execução

    Durante a execução

  • Após a execução

    Após a execução

SQL EXPLAIN

AdaptiveSparkPlan

As consultas aplicadas a AQE contêm um ou mais nós AdaptiveSparkPlan, geralmente como nó raiz de cada consulta principal ou sub-consulta.

Nenhum plano atual

Como SQL EXPLAIN não executa a consulta, o plano atual é sempre o mesmo que o plano inicial e não reflete o que seria eventualmente executado pela AQE.

Segue-se um exemplo de explicação do SQL:

SQL explicar

Eficácia

O plano de consulta mudará se uma ou mais otimizações AQE produzirem efeito. O efeito destas otimizações do AQE é demonstrado pela diferença entre os planos atuais e finais e o plano inicial e os nós de plano específico nos planos atuais e finais.

  • Dinâmica alterar a fusão do tipo de fusão em junção de haxixe de difusão: diferentes nós de união física entre o plano atual/final e o plano inicial

    Junte-se à cadeia de estratégia

  • Divisórias dinamicamente coalesce: nó CustomShuffleReader com propriedade Coalesced

    Leitor de baralhar personalizado

    Cadeia de leitor de baralhar personalizado

  • Adere dinamicamente ao skew: nó SortMergeJoin com o campo como isSkew verdadeiro.

    Skew aderir plano

    Skew juntar-se corda

  • Detetar e propagar dinamicamente as relações vazias: parte do (ou inteiro) o plano é substituído por node LocalTableScan com o campo de relação como vazio.

    Digitalização de mesa local

    Cadeia de digitalização de mesa local

Configuração

Nas propriedades desta secção, <prefix> substitua spark.sql.adaptive por, e <db_prefix> por spark.databricks.adaptive .

Nesta secção:

Ativar e desativar a execução de consultas adaptativas

Propriedade Predefinição Description
<prefix>.enabled true Quer ativar ou desativar a execução de consultas adaptativas.

Fusão de tipo de mudança dinâmica junta-se à junção de haxixe de transmissão

Propriedade Predefinição Description
<db_prefix>.autoBroadcastJoinThreshold 30MB O limiar para acionar a mudança para a transmissão junta-se no tempo de execução.

Divisórias de coalesce dinâmicas

Propriedade Predefinição Description
<prefix>.coalescePartitions.enabled true Seja para ativar ou desativar a coligação de partição.
<prefix>.advisoryPartitionSizeInBytes 64MB O tamanho do alvo depois da coligação. Os tamanhos de partição coalesced serão próximos, mas não maiores do que este tamanho alvo.
<prefix>.coalescePartitions.minPartitionSize 1 MB O tamanho mínimo das divisórias após a coligação. Os tamanhos de partição coalesced não serão menores do que este tamanho.
<prefix>.coalescePartitions.minPartitionNum 2x não. de núcleos de cluster O número mínimo de divisórias após a coligação. Não recomendado, porque a definição se sobrepõe explicitamente <prefix>.coalescePartitions.minPartitionSize .

Aderir dinamicamente de mane-mandes

Propriedade Predefinição Description
<prefix>.skewJoin.enabled true Desaperte o manuseamento de unidos verdadeiro/falso para ativar/desativar o manuseamento da junta.
<prefix>.skewJoin.skewedPartitionFactor 5 Um fator que, quando multiplicado pelo tamanho mediano da partição, contribui para determinar se uma partição é distorcida.
<prefix>.skewJoin.skewedPartitionThresholdInBytes 256MB Um limiar que contribui para determinar se uma partição é distorcida.

Uma divisória é considerada distorcida quando ambas (partition size > skewedPartitionFactor * median partition size) e (partition size > skewedPartitionThresholdInBytes) são true .

Detetar e propagar dinamicamente as relações vazias

Propriedade Predefinição Description
<db_prefix>.emptyRelationPropagation.enabled true Quer ativar ou desativar a propagação dinâmica da relação vazia.

Perguntas frequentes (PERGUNTAS Frequentes)

Nesta secção:

Porque é que o AQE não alterou o número de partição baralhada, apesar da coligação de partição já estar ativada?

O AQE não altera o número inicial de partição. Recomenda-se que estabeleça um valor razoavelmente elevado para o número de partição de shuffle e deixe o AQE coalesce pequenas divisórias com base no tamanho dos dados de saída em cada fase da consulta.

Se virem derramar nos vossos empregos, podem tentar:

  • Aumentando o número de divisórias shuffle config: spark.sql.shuffle.partitions
  • Permitir a baralhar auto otimizado definindo <db_prefix>.autoOptimizeShuffle.enabled para true

Porque é que o AQE não transmitiu uma pequena mesa de aderir?

Se a dimensão da relação prevista para ser transmitida se enquadrar neste limiar, mas ainda não for transmitida:

  • Verifique o tipo de junção. A difusão não é suportada para certos tipos de junção, por exemplo, a relação à esquerda de um LEFT OUTER JOIN não pode ser transmitida.
  • Também pode ser que a relação contenha muitas divisórias vazias, caso em que a maioria das tarefas pode terminar rapidamente com a fusão de tipo ou pode potencialmente ser otimizada com manipulação de junta de skew. O AQE evita alterar este tipo de fusão junta-se à difusão de haxixe se a percentagem de divisórias não vazias for inferior <prefix>.nonEmptyPartitionRatioForBroadcastJoin a .

Devo ainda usar uma dica de estratégia de junção de transmissão com AQE ativada?

Sim. Uma junção de emissão estática planeada é geralmente mais performante do que uma dinâmica planeada pelo AQE, uma vez que o AQE pode não mudar para a difusão até depois de realizar a baralhar para ambos os lados da junta (altura em que os tamanhos reais da relação são obtidos). Assim, usar uma dica de transmissão ainda pode ser uma boa escolha se você conhece bem a sua consulta. O AQE respeitará as sugestões de consulta da mesma forma que a otimização estática, mas ainda pode aplicar otimizações dinâmicas que não são afetadas pelas dicas.

Qual é a diferença entre a sugestão de união de skew e a otimização de adere ao AQE? Qual devo usar?

Recomenda-se confiar no manuseamento de junta de aderência a AQE em vez de usar a sugestão de unção de skew, porque a unção de inclinação AQE é completamente automática e, em geral, tem um desempenho melhor do que a contrapartida de sugestões.

Porque é que o AQE não ajustou automaticamente a minha encomenda de aderiá?

O reordenamento de junta dinâmica não faz parte do AQE a partir do Databricks Runtime 7.3 LTS.

Porque é que o AQE não detetou os meus dados?

Existem duas condições de tamanho que devem ser satisfeitas para que o AQE detete uma partição como uma divisória distorcida:

  • O tamanho da partição é maior do que o <prefix>.skewJoin.skewedPartitionThresholdInBytes (padrão 256MB)
  • O tamanho da partição é maior do que o tamanho mediano de todas as divisórias vezes o fator de partição distorcido <prefix>.skewJoin.skewedPartitionFactor (padrão 5)

Além disso, o suporte de manuseamento de skew é limitado para certos tipos de junção, por exemplo, em LEFT OUTER JOIN , apenas o desvio no lado esquerdo pode ser otimizado.

Legado

O termo "Execução Adaptativa" existe desde a Faísca 1.6, mas o novo AQE em Spark 3.0 é fundamentalmente diferente. Em termos de funcionalidade, a Spark 1.6 faz apenas a parte "dinâmicas de divisórias" Em termos de arquitetura técnica, o novo AQE é um quadro de planeamento dinâmico e replaneamento de consultas baseadas em estatísticas de tempo de execução, que suporta uma variedade de otimizações como as que descrevemos neste artigo e que podem ser alargadas para permitir mais otimizações potenciais.