Control de consultas grandes en flujos de trabajo interactivos

Un desafío de los flujos de trabajo de datos interactivos es el control de las consultas de gran tamaño. Esto incluye las consultas que generan demasiadas filas de salida, capturan muchas particiones externas o realizan procesos sobre conjuntos de datos extremadamente grandes. Estas consultas pueden ser extremadamente lentas, saturar los recursos de proceso y dificultar que otros compartan el mismo proceso.

Query Watchdog es un proceso que impide que las consultas monopolen los recursos de proceso mediante el examen de las causas más comunes de las consultas grandes y las consultas de terminación que superan un umbral. En este artículo se explica cómo habilitar y configurar el control de consultas.

Importante

Query Watchdog está habilitado para todos los procesos de uso creados mediante la interfaz de usuario.

Ejemplo de una consulta disruptiva

Un analista está realizando algunas consultas ad hoc en un almacenamiento de datos Just-In-Time. El analista usa un proceso de escalado automático compartido que facilita que varios usuarios usen un solo proceso al mismo tiempo. Imagine que hay dos tablas con un millón de filas cada una.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

Estos tamaños de tabla se pueden administrar en Apache Spark. Pero cada una incluye una columna join_key con una cadena vacía en cada fila. Esto puede ocurrir si los datos no están perfectamente limpios o si hay una asimetría de datos considerable donde algunas claves son más frecuentes que otras. Estas claves de combinación vacías son mucho más frecuentes que cualquier otro valor.

En el código siguiente el analista está combinando estas dos tablas por sus claves, lo que genera una salida de un trillón de resultados, y todos ellos se producen conforme a un solo ejecutor (el ejecutor que obtiene la clave " "):

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

Esta consulta parece estar en ejecución. Pero sin saber sobre los datos, el analista ve que "solo" queda una tarea en el curso de la ejecución del trabajo. La consulta nunca finaliza, lo que deja frustrado y confuso al analista sobre por qué no ha funcionado.

En este caso solo hay una clave de combinación problemática. Otras veces puede haber muchas más.

Habilitación y configuración del control de consultas

Para habilitar y configurar el control de consultas, se requieren los pasos siguientes.

  • Habilite el control de consultas con spark.databricks.queryWatchdog.enabled.
  • Configure el tiempo de ejecución de la tarea con spark.databricks.queryWatchdog.minTimeSecs.
  • Muestre la salida con spark.databricks.queryWatchdog.minOutputRows.
  • Configure la relación de salida con spark.databricks.queryWatchdog.outputRatioThreshold.

Para evitar que una consulta cree demasiadas filas de salida para el número de filas de entrada, puede habilitar el control de consultas y configurar el número máximo de filas de salida como un múltiplo del número de filas de entrada. En este ejemplo se usa una proporción de 1000 (el valor predeterminado).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

La última configuración declara que cualquier tarea determinada nunca debe generar más de 1000 veces el número de filas de entrada.

Sugerencia

La relación de salida es totalmente personalizable. Se recomienda empezar a reducir y ver qué umbral funciona bien en su caso. Un intervalo de 1000 a 10 000 es un buen punto de partida.

No solo Query Watchdog impide que los usuarios monopolícen los recursos de proceso para los trabajos que nunca se completarán, sino que también ahorra tiempo al producir un error rápido en una consulta que nunca se habría completado. Por ejemplo, se va a producir un error en la consulta siguiente después de varios minutos porque supera la proporción.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Esto es lo que se vería:

Control de consultas

Normalmente basta con habilitar el control de consultas y establecer la proporción del umbral de salida y entrada, pero también tiene la opción de establecer dos propiedades adicionales: spark.databricks.queryWatchdog.minTimeSecs y spark.databricks.queryWatchdog.minOutputRows. Estas propiedades especifican el tiempo mínimo que una tarea determinada de una consulta debe ejecutarse antes de la cancelación y el número mínimo de filas de salida para una tarea de esa consulta.

Por ejemplo, puede establecer minTimeSecs en un valor mayor si quiere darle la oportunidad de generar un gran número de filas por tarea. Del mismo modo, puede establecer spark.databricks.queryWatchdog.minOutputRows en diez millones si quiere detener una consulta solo después de que una tarea de esa consulta haya generado diez millones de filas. Algo menos y la consulta se ejecuta correctamente, incluso si se supera la proporción de salida y entrada.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Sugerencia

Si configura Query Watchdog en un cuaderno, la configuración no se conserva en los reinicios de proceso. Si quiere configurar Query Watchdog para todos los usuarios de un proceso, se recomienda usar una configuración de proceso.

Detección de consultas en un conjunto de datos extremadamente grande

Otra consulta de gran tamaño típica puede examinar una gran cantidad de datos de grandes tablas o conjuntos de datos. La operación de examen puede durar mucho tiempo y saturar los recursos de proceso (incluso leer metadatos de una tabla de Hive grande puede tardar un período de tiempo significativo). Puede establecer maxHivePartitions de modo que evite la captura de demasiadas particiones de una tabla de Hive grande. Del mismo modo, puede establecer maxQueryTasks de modo que limite las consultas en un conjunto de datos extremadamente grande.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

¿Cuándo se debe habilitar el control de consultas?

Query Watchdog debe estar habilitado para el proceso de análisis ad hoc en el que los analistas de SQL y los científicos de datos comparten un proceso determinado y un administrador debe asegurarse de que las consultas “funcionen bien” entre sí.

¿Cuándo se debe deshabilitar el control de consultas?

En general, no se recomienda cancelar las consultas usadas en un escenario de ETL porque normalmente no hay una persona en el bucle para corregir el error. Se recomienda deshabilitar Query Watchdog para todo el proceso de análisis ad hoc.