Verarbeiten großer Abfragen in interaktiven Workflows

Eine Herausforderung bei interaktiven Datenworkflows ist die Verarbeitung großer Abfragen. Dazu gehören Abfragen, die zu viele Ausgabezeilen generieren, viele externe Partitionen abrufen oder den Compute für extrem große Datasets durchführen. Diese Abfragen können extrem langsam sein, die Compute-Ressourcen vollständig auslasten und es anderen erschweren, denselben Rechner gemeinsam zu nutzen.

Query Watchdog ist ein Prozess, der verhindert, dass Abfragen Compute-Ressourcen monopolisieren, indem er die häufigsten Ursachen für große Abfragen untersucht und Abfragen, die einen Schwellenwert überschreiten, abbricht. In diesem Artikel wird beschrieben, wie Sie Query Watchdog aktivieren und konfigurieren.

Wichtig

Query Watchdog ist für alle All-Purpose-Compute aktiviert, die über die Benutzeroberfläche erstellt wurden.

Beispiel für eine disruptive Abfrage

Ein Analyst erstellt einige Ad-hoc-Abfragen in einem Just-In-Time-Data Warehouse. Der Analyst verwendet einen freigegebenen Rechner mit automatischer Skalierung, der es mehreren Benutzern erleichtert, einen einzelnen Rechner gleichzeitig zu verwenden. Angenommen, es gibt zwei Tabellen, die jeweils eine Million Zeilen enthalten.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

Diese Tabellengrößen sind in Apache Spark verwaltbar. Sie enthalten jedoch jeweils eine Spalte join_key mit einer leeren Zeichenfolge in jeder Zeile. Dies kann passieren, wenn die Daten nicht perfekt bereinigt sind oder wenn es eine erhebliche Datenschiefe gibt, bei der einige Schlüssel häufiger als andere Schlüssel vorkommen. Diese leeren Join-Schlüssel sind weitaus häufiger als jeder andere Wert.

Im folgenden Code verknüpft der Analytiker diese beiden Tabellen anhand ihrer Schlüssel, was zu einer Ausgabe von einer Billion Ergebnissen führt, die alle in einem einzigen Executor (dem Executor, der den " "-Schlüssel erhält) erzeugt werden:

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

Diese Abfrage wird anscheinend ausgeführt. Aber ohne die Daten zu kennen, sieht der Analytiker, dass im Laufe der Ausführung des Auftrags „nur“ eine einzige Aufgabe übrig bleibt. Die Abfrage wird nie abgeschlossen, weshalb der Analyst frustriert und verärgert ist, warum sie nicht funktioniert hat.

In diesem Fall gibt es nur einen problematischen Join-Schlüssel. In anderen Fällen gibt es möglicherweise noch viele weitere.

Aktivieren und Konfigurieren von Query Watchdog

Zum Aktivieren und Konfigurieren von Query Watchdog sind die folgenden Schritte erforderlich:

  • Aktivieren von Watchdog mit spark.databricks.queryWatchdog.enabled
  • Konfigurieren der Aufgabenlaufzeit mit spark.databricks.queryWatchdog.minTimeSecs
  • Anzeigen der Ausgabe mit spark.databricks.queryWatchdog.minOutputRows
  • Konfigurieren des Ausgabeverhältnisses mit spark.databricks.queryWatchdog.outputRatioThreshold

Um zu verhindern, dass eine Abfrage zu viele Ausgabezeilen für die Anzahl der Eingabezeilen erstellt, können Sie Query Watchdog aktivieren und die maximale Anzahl von Ausgabezeilen als Vielfaches der Anzahl von Eingabezeilen konfigurieren. In diesem Beispiel wird ein Verhältnis von 1000 (Standard) verwendet.

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

Die letztgenannte Konfiguration legt fest, dass eine bestimmte Aufgabe niemals mehr als das 1000-fache der Anzahl der Eingabezeilen erzeugen sollte.

Tipp

Das Ausgabeverhältnis ist vollständig anpassbar. Es wird empfohlen, mit einem niedrigeren Wert zu beginnen und zu sehen, welcher Schwellenwert für Sie und Ihr Team gut funktioniert. Ein Bereich von 1.000 bis 10.000 ist ein guter Startpunkt.

Query Watchdog verhindert nicht nur, dass Benutzer Compute-Ressourcen für Aufträge monopolisieren, die nie abgeschlossen werden, sondern spart auch Zeit, da eine Abfrage, die nie abgeschlossen worden wäre, schnell fehlschlägt. Die folgende Abfrage schlägt zum Beispiel nach einigen Minuten fehl, weil sie das Verhältnis überschreitet.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Hier sehen Sie, was Sie sehen würden:

Query Watchdog

In der Regel reicht es aus, Query Watchdog zu aktivieren und das Verhältnis von Ausgabe-/Eingabeschwellenwert festzulegen. Sie haben jedoch auch die Möglichkeit, zwei zusätzliche Eigenschaften festzulegen: spark.databricks.queryWatchdog.minTimeSecs und spark.databricks.queryWatchdog.minOutputRows. Diese Eigenschaften geben die Mindestzeit, die eine bestimmte Aufgabe in einer Abfrage ausgeführt werden muss, bevor sie abgebrochen wird, sowie die Mindestanzahl von Ausgabezeilen für eine Aufgabe in dieser Abfrage an.

Sie können zum Beispiel minTimeSecs auf einen höheren Wert setzen, wenn die Möglichkeit gegeben werden soll, eine große Anzahl von Zeilen pro Aufgabe zu erzeugen. Ebenso können Sie spark.databricks.queryWatchdog.minOutputRows auf zehn Millionen festlegen, wenn Sie eine Abfrage nur beenden möchten, nachdem eine Aufgabe in dieser Abfrage zehn Millionen Zeilen erzeugt hat. Bei einem geringeren Wert ist die Abfrage erfolgreich, auch wenn das Ausgabe/Eingabe-Verhältnis überschritten wurde.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Tipp

Wenn Sie Query Watchdog in einem Notebook konfigurieren, wird die Konfiguration nicht über Compute-Neustarts hinweg beibehalten. Wenn Sie Query Watchdog für alle Benutzer eines Rechners konfigurieren möchten, empfiehlt es sich, eine Compute-Konfiguration zu verwenden.

Erkennen einer Abfrage für ein extrem großes Dataset

Eine andere typische große Abfrage kann eine große Menge von Daten aus großen Tabellen/Datasets scannen. Der Scanvorgang kann eine lange Zeit dauern und Compute-Ressourcen auslasten (selbst das Lesen von Metadaten einer großen Hive-Tabelle kann sehr lange dauern). Sie können maxHivePartitions festlegen, um zu verhindern, dass zu viele Partitionen aus einer großen Hive-Tabelle abgerufen werden. Auf ähnliche Weise können Sie auch maxQueryTasks festlegen, um Abfragen für ein extrem großes Dataset zu beschränken.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Wann sollten Sie Query Watchdog aktivieren?

Query Watchdog sollte für Ad-hoc-Analyserechner aktiviert werden, bei denen SQL-Analysten und Datenwissenschaftler einen bestimmten Rechner gemeinsam nutzen und ein Administrator sicherstellen muss, dass Abfragen „gut zusammenarbeiten“.

Wann sollten Sie Query Watchdog aktivieren?

Im Allgemeinen wird davon abgeraten, Abfragen, die in einem ETL-Szenario verwendet werden, vorschnell abzubrechen, da in der Regel keine Person in der Schleife steht, um den Fehler zu korrigieren. Wir empfehlen, Query Watchdog für alle Rechner außer Ad-hoc-Analysecluster zu deaktivieren.