Обработка больших запросов в интерактивных рабочих процессах

Задача с интерактивными рабочими процессами данных — обработка больших запросов. Сюда входят запросы, которые создают слишком много выходных строк, получают множество внешних секций или вычисляют очень большие наборы данных. Эти запросы могут быть чрезвычайно медленными, насыщенными вычислительными ресурсами и затруднять совместное использование ресурсов вычислений другими пользователями.

Query Watchdog — это процесс, который предотвращает монополизацию вычислительных ресурсов запросов путем изучения наиболее распространенных причин больших запросов и завершения запросов, которые проходят пороговое значение. В этой статье описывается, как включить и настроить модуль Query Watchdog.

Внимание

Функция "Наблюдатель за запросом" включена для всех вычислительных ресурсов, созданных с помощью пользовательского интерфейса.

Пример деструктивного запроса

Аналитик выполняет несколько специальных запросов в оперативном хранилище данных. Аналитик использует общий вычислительный ресурс автомасштабирования, что упрощает использование нескольких пользователей одного вычисления одновременно. Предположим, что существует две таблицы, каждая из которых содержит миллионы строк.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

Размеры этих таблиц могут контролироваться в Apache Spark. Однако каждая из них содержит столбец join_key с пустой строкой в каждой строке. Это может произойти, если данные не являются идеально чистыми, или если существует значительный перекос данных, когда некоторые ключи преобладают над другими. Эти пустые ключи соединений гораздо более распространены, чем любые другие значения.

В следующем коде аналитик соединяет эти две таблицы по их ключам, что дает результат 1 000 000 000 000 результатов, и все они создаются в одном исполнителе (исполнитель, который получает ключ " "):

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

Вероятно, этот запрос работает. Но не зная о данных, аналитик видит, что в ходе выполнения задания осталась "всего" одна задача. Запрос никогда не завершается, оставляя аналитика разочарованным и сбитым с толку тем, почему он не работает.

В этом случае имеется только один проблемный ключ объединения. В других случаях может быть много других.

Включение и настройка модуля Query Watchdog

Чтобы включить и настроить контрольный запрос, необходимо выполнить следующие действия.

  • Включите контрольную функцию spark.databricks.queryWatchdog.enabled.
  • Настройте среду выполнения задач с spark.databricks.queryWatchdog.minTimeSecsпомощью .
  • Отображение выходных данных с spark.databricks.queryWatchdog.minOutputRowsпомощью .
  • Настройка коэффициента вывода с spark.databricks.queryWatchdog.outputRatioThresholdпомощью .

Чтобы запрос не создавал слишком много выходных строк для количества входных строк, можно включить Query Watchdog и настроить максимальное количество выходных строк как кратное количеству входных строк. В этом примере используется соотношение 1 000 (значение по умолчанию).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

В последней конфигурации объявлено, что любая выполняемая задача никогда не должна производить входных строк больше, чем в 1 000 раз.

Совет

Коэффициент вывода полностью настраиваемый. Мы рекомендуем начать с понижения и увидеть, какой порог подходит для вас и вашей команды. Хорошей отправной точкой является диапазон от 1 000 до 10 000.

Не только запросы наблюдателя не позволяют пользователям монополизизировать вычислительные ресурсы для заданий, которые никогда не завершаются, а также экономит время путем быстрого сбоя запроса, который никогда не завершился. Например, следующий запрос завершится ошибкой через несколько минут, так как она превысит соотношение.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Вот что вы видите:

Запрос наблюдателя

Обычно этого достаточно для включения наблюдения за запросами и установки соотношения пороговых значений вывода и ввода, но у вас также есть возможность задать два дополнительных свойства: spark.databricks.queryWatchdog.minTimeSecs и spark.databricks.queryWatchdog.minOutputRows. Эти свойства указывают минимальное время, в течение которого должна выполняться данная задача в запросе, перед ее отменой и минимальным числом выходных строк для задачи в этом запросе.

Например, можно задать более высокое значение для minTimeSecs, если вы хотите дать ему возможность создавать большое количество строк для каждой задачи. Аналогичным образом можно задать значение 10 000 000 для spark.databricks.queryWatchdog.minOutputRows, если требуется прерывать запрос только после того, как задача в этом запросе выдала 10 000 000 строк. Если что-то меньшее, запрос выполняется успешно, даже при превышении коэффициента вывода и ввода.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Совет

Если вы настраиваете контрольную запись запросов в записной книжке, конфигурация не сохраняется во время перезапусков вычислений. Если вы хотите настроить контрольный запрос для всех пользователей вычислений, рекомендуется использовать конфигурацию вычислений.

Обнаружение запроса к очень большому набору данных

Другой типичный большой запрос может сканировать большой объем данных из больших таблиц / наборов данных. Операция сканирования может длиться долго и насыщенными вычислительными ресурсами (даже чтение метаданных большой таблицы Hive может занять значительное время). Можно настроить maxHivePartitions для предотвращения выборки слишком большого количества секций из большой таблицы Hive. Аналогичным образом можно также задать maxQueryTasks для ограничения запросов для очень большого набора данных.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Когда следует включать модуль Query Watchdog?

Запросы Watchdog должны быть включены для нерегламентированных аналитических вычислений, где аналитики SQL и специалисты по обработке и анализу данных совместно используют заданные вычислительные ресурсы, и администратору необходимо убедиться, что запросы "хорошо играют" друг с другом.

Когда следует отключать модуль Query Watchdog?

В целом мы не советуем заранее отменять запросы, используемые в сценарии ETL, так как обычно нет человека в цикле для исправления ошибки. Рекомендуется отключить контрольный запрос для всех, но нерегламентированных вычислений аналитики.