Obsługa dużych zapytań w interaktywnych przepływach pracy

Wyzwaniem dla interaktywnych przepływów pracy danych jest obsługa dużych zapytań. Dotyczy to zapytań, które generują zbyt wiele wierszy danych wyjściowych, pobierają wiele partycji zewnętrznych lub wykonują obliczenia na bardzo dużych zestawach danych. Te zapytania mogą być bardzo powolne, saturacji zasobów obliczeniowych i utrudniać innym współużytkowanie tych samych zasobów obliczeniowych.

Usługa Watchdog zapytań to proces, który uniemożliwia wykonywanie zapytań w celu monopolizowania zasobów obliczeniowych przez badanie najczęstszych przyczyn dużych zapytań i kończenie zapytań, które przechodzą próg. W tym artykule opisano sposób włączania i konfigurowania usługi Query Watchdog.

Ważne

Usługa Watchdog zapytań jest włączona dla wszystkich obliczeń ogólnego przeznaczenia utworzonych przy użyciu interfejsu użytkownika.

Przykład zapytania powodującego zakłócenia

Analityk wykonuje zapytania ad hoc w magazynie danych just in time. Analityk korzysta z współużytkowanego obliczeniowego skalowania automatycznego, które ułatwia wielu użytkownikom jednoczesne korzystanie z jednego obliczenia. Załóżmy, że istnieją dwie tabele, z których każda ma milion wierszy.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

Te rozmiary tabel można zarządzać na platformie Apache Spark. Jednak każda z nich zawiera kolumnę join_key z pustym ciągiem w każdym wierszu. Może się tak zdarzyć, jeśli dane nie są całkowicie czyste lub jeśli istnieją znaczne niesymetryczności danych, w których niektóre klucze są bardziej powszechne niż inne. Te puste klucze sprzężenia są znacznie bardziej powszechne niż jakakolwiek inna wartość.

W poniższym kodzie analityk łączy te dwie tabele na swoich kluczach, co generuje dane wyjściowe o jeden bilion wyników, a wszystkie te elementy są generowane na jednym module wykonawczym (funkcja wykonawcza, która pobiera " " klucz):

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

To zapytanie wydaje się być uruchomione. Jednak bez znajomości danych analityk widzi, że istnieje "tylko" jedno zadanie pozostawione w trakcie wykonywania zadania. Zapytanie nigdy nie kończy się, pozostawiając analityka sfrustrowany i zdezorientowany, dlaczego nie zadziałał.

W tym przypadku istnieje tylko jeden problematyczny klucz sprzężenia. Innym razem może być o wiele więcej.

Włączanie i konfigurowanie usługi Query Watchdog

Aby włączyć i skonfigurować usługę Query Watchdog, wymagane są następujące kroki.

  • Włącz usługę Watchdog za pomocą polecenia spark.databricks.queryWatchdog.enabled.
  • Skonfiguruj środowisko uruchomieniowe zadania za pomocą polecenia spark.databricks.queryWatchdog.minTimeSecs.
  • Wyświetl dane wyjściowe za pomocą polecenia spark.databricks.queryWatchdog.minOutputRows.
  • Skonfiguruj współczynnik danych wyjściowych za pomocą polecenia spark.databricks.queryWatchdog.outputRatioThreshold.

Aby zapobiec tworzeniu zbyt wielu wierszy wyjściowych dla liczby wierszy wejściowych, można włączyć usługę Query Watchdog i skonfigurować maksymalną liczbę wierszy wyjściowych jako wielokrotność wierszy wejściowych. W tym przykładzie używamy współczynnika 1000 (wartość domyślna).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

Ta ostatnia konfiguracja deklaruje, że każde zadanie nigdy nie powinno generować więcej niż 1000 razy więcej niż liczba wierszy wejściowych.

Napiwek

Współczynnik danych wyjściowych jest całkowicie dostosowywalny. Zalecamy rozpoczęcie od niższego poziomu i zapoznanie się z tym, jaki próg działa dobrze dla Ciebie i Twojego zespołu. Dobry punkt wyjścia to zakres od 1000 do 10 000.

Usługa Query Watchdog nie tylko uniemożliwia użytkownikom monopolizowanie zasobów obliczeniowych dla zadań, które nigdy nie zostaną ukończone, ale także pozwala zaoszczędzić czas dzięki szybkiemu awarii zapytania, które nigdy nie zostałoby ukończone. Na przykład następujące zapytanie zakończy się niepowodzeniem po kilku minutach, ponieważ przekracza stosunek.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Oto, co zobaczysz:

Wykonywanie zapytań względem usługi Watchdog

Zwykle wystarczy włączyć usługę Query Watchdog i ustawić współczynnik progu danych wyjściowych/wejściowych, ale istnieje również możliwość ustawienia dwóch dodatkowych właściwości: spark.databricks.queryWatchdog.minTimeSecs i spark.databricks.queryWatchdog.minOutputRows. Te właściwości określają minimalny czas uruchomienia danego zadania w zapytaniu przed jego anulowaniem i minimalną liczbą wierszy wyjściowych dla zadania w tym zapytaniu.

Możesz na przykład ustawić minTimeSecs wartość wyższą, jeśli chcesz nadać jej szansę na wygenerowanie dużej liczby wierszy na zadanie. Podobnie można ustawić spark.databricks.queryWatchdog.minOutputRows wartość dziesięć milionów, jeśli chcesz zatrzymać zapytanie dopiero po zadaniu w tym zapytaniu, które wygenerowało dziesięć milionów wierszy. Cokolwiek mniej i zapytanie powiedzie się, nawet jeśli współczynnik danych wyjściowych/wejściowych został przekroczony.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Napiwek

Jeśli skonfigurujesz usługę Query Watchdog w notesie, konfiguracja nie będzie się powtarzać podczas ponownego uruchamiania zasobów obliczeniowych. Jeśli chcesz skonfigurować usługę Query Watchdog dla wszystkich użytkowników obliczeń, zalecamy użycie konfiguracji obliczeniowej.

Wykrywanie zapytań w bardzo dużym zestawie danych

Inne typowe duże zapytanie może skanować dużą ilość danych z tabel big tables/datasets. Operacja skanowania może trwać przez długi czas i saturacji zasobów obliczeniowych (nawet odczytywanie metadanych dużej tabeli Hive może zająć dużo czasu). Możesz ustawić ustawienie maxHivePartitions , aby zapobiec pobieraniu zbyt wielu partycji z dużej tabeli programu Hive. Podobnie można również ustawić opcję maxQueryTasks ograniczania zapytań dotyczących bardzo dużego zestawu danych.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Kiedy należy włączyć usługę Query Watchdog?

Usługa Watchdog zapytań powinna być włączona na potrzeby obliczeń analitycznych ad hoc, w których analitycy SQL i analitycy danych współdzielą dane obliczeniowe, a administrator musi upewnić się, że zapytania "dobrze się bawią" ze sobą.

Kiedy należy wyłączyć usługę Query Watchdog?

Ogólnie rzecz biorąc, nie zalecamy chętnie anulowania zapytań używanych w scenariuszu ETL, ponieważ zwykle nie ma człowieka w pętli, aby naprawić błąd. Zalecamy wyłączenie usługi Query Watchdog dla wszystkich obliczeń analitycznych ad hoc.