Processar consultas grandes em fluxos de trabalho interativos

Um desafio com fluxos de trabalho de dados interativos é lidar com grandes consultas. Tal inclui consultas que geram muitas linhas de saída, obtêm muitas partições externas ou realizam computação em conjuntos de dados extremamente grandes. Essas consultas podem ser extremamente lentas, saturar recursos de computação e dificultar que outras pessoas compartilhem a mesma computação.

O Query Watchdog é um processo que impede que as consultas monopolizem os recursos de computação, examinando as causas mais comuns de consultas grandes e encerrando consultas que ultrapassam um limite. Este artigo descreve como habilitar e configurar o Query Watchdog.

Importante

O Query Watchdog está habilitado para todos os cálculos para todos os fins criados usando a interface do usuário.

Exemplo de uma consulta com interrupções

Um analista está realizando algumas consultas ad hoc em um data warehouse just-in-time. O analista usa uma computação de dimensionamento automático compartilhada que torna mais fácil para vários usuários usarem uma única computação ao mesmo tempo. Suponha que há duas tabelas com um milhão de linhas cada.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

Esses tamanhos de tabela são gerenciáveis no Apache Spark. No entanto, cada um deles inclui uma join_key coluna com uma cadeia de caracteres vazia em cada linha. Isso pode acontecer se os dados não estiverem perfeitamente limpos ou se houver distorção de dados significativa em que algumas chaves são mais prevalentes do que outras. Essas chaves de junção vazias são muito mais prevalentes do que qualquer outro valor.

No código a seguir, o analista está unindo essas duas tabelas em suas chaves, o que produz resultados de um trilhão, e todos eles são produzidos em um único executor (o executor que obtém a " " chave):

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

Esta consulta parece estar em execução. Mas, sem conhecer os dados, o analista vê que "apenas" resta uma única tarefa ao longo da execução do trabalho. A consulta nunca termina, deixando o analista frustrado e confuso sobre o motivo pelo qual não funcionou.

Neste caso, há apenas uma chave de junção problemática. Outras vezes pode haver muito mais.

Habilitar e configurar o Query Watchdog

Para habilitar e configurar o Query Watchdog, as etapas a seguir são necessárias.

  • Habilite o Watchdog com spark.databricks.queryWatchdog.enabledo .
  • Configure o tempo de execução da tarefa com spark.databricks.queryWatchdog.minTimeSecso .
  • Exibir saída com spark.databricks.queryWatchdog.minOutputRows.
  • Configure a taxa de saída com spark.databricks.queryWatchdog.outputRatioThreshold.

Para evitar que uma consulta crie muitas linhas de saída para o número de linhas de entrada, você pode habilitar o Query Watchdog e configurar o número máximo de linhas de saída como um múltiplo do número de linhas de entrada. Neste exemplo, usamos uma proporção de 1000 (o padrão).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

A última configuração declara que qualquer tarefa nunca deve produzir mais de 1000 vezes o número de linhas de entrada.

Gorjeta

A relação de saída é completamente personalizável. Recomendamos começar mais baixo e ver qual limite funciona bem para você e sua equipe. Um intervalo de 1.000 a 10.000 é um bom ponto de partida.

O Query Watchdog não só impede que os usuários monopolizem recursos de computação para trabalhos que nunca serão concluídos, mas também economiza tempo ao falhar rapidamente em uma consulta que nunca teria sido concluída. Por exemplo, a consulta a seguir falhará após vários minutos porque excede a proporção.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Aqui está o que você veria:

Cão de guarda de consultas

Normalmente, é suficiente ativar o Query Watchdog e definir a relação de limite de saída/entrada, mas você também tem a opção de definir duas propriedades adicionais: spark.databricks.queryWatchdog.minTimeSecs e spark.databricks.queryWatchdog.minOutputRows. Essas propriedades especificam o tempo mínimo que uma determinada tarefa em uma consulta deve ser executada antes de cancelá-la e o número mínimo de linhas de saída para uma tarefa nessa consulta.

Por exemplo, você pode definir minTimeSecs como um valor mais alto se quiser dar a ele a chance de produzir um grande número de linhas por tarefa. Da mesma forma, você pode definir spark.databricks.queryWatchdog.minOutputRows como dez milhões se quiser parar uma consulta somente depois que uma tarefa nessa consulta tiver produzido dez milhões de linhas. Qualquer coisa menos e a consulta é bem-sucedida, mesmo que a relação saída/entrada tenha sido excedida.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Gorjeta

Se você configurar o Query Watchdog em um bloco de anotações, a configuração não persistirá nas reinicializações de computação. Se você quiser configurar o Query Watchdog para todos os usuários de uma computação, recomendamos que você use uma configuração de computação.

Detetar consulta em conjunto de dados extremamente grande

Outra consulta grande típica pode digitalizar uma grande quantidade de dados de grandes tabelas/conjuntos de dados. A operação de verificação pode durar muito tempo e saturar os recursos de computação (até mesmo a leitura de metadados de uma grande tabela do Hive pode levar uma quantidade significativa de tempo). Você pode definir maxHivePartitions para evitar buscar muitas partições de uma grande tabela do Hive. Da mesma forma, você também pode definir maxQueryTasks para limitar consultas em um conjunto de dados extremamente grande.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Quando você deve ativar o Query Watchdog?

O Query Watchdog deve ser habilitado para computação de análise ad hoc, onde analistas SQL e cientistas de dados estão compartilhando uma determinada computação e um administrador precisa se certificar de que as consultas "funcionam bem" umas com as outras.

Quando você deve desativar o Query Watchdog?

Em geral, não aconselhamos o cancelamento ansioso de consultas usadas em um cenário de ETL porque normalmente não há um humano no loop para corrigir o erro. Recomendamos que você desative o Query Watchdog para todos os cálculos, exceto os de análise ad hoc.