Share via


対話型ワークフローでの大規模なクエリの処理

対話型データ ワークフローの課題は、大規模なクエリを処理することです。 これには、大量の出力行を生成するクエリ、多数の外部パーティションをフェッチするクエリ、または非常に大きなデータセットに対して計算を行うクエリが含まれます。 これらのクエリは非常に低速になり、コンピューティング リソースを飽和状態にさせ、他のユーザーが同じコンピューティングを共有するのを困難にさせる場合があります。

Query Watchdog は、大規模なクエリの最も一般的な原因を調べ、しきい値を超えるクエリを終了させることで、クエリがコンピューティング リソースを占有するのを防ぐプロセスです。 この記事では、Query Watchdog を有効にして構成する方法について説明します。

重要

Query Watchdog は、UI を使用して作成されたすべての汎用コンピューティングに対して有効になります。

問題を起こすクエリの例

アナリストが、Just-In-Time データ ウェアハウスでいくつかのアドホック クエリを実行しています。 アナリストは共有の自動スケール コンピューティングを使用します。これにより、複数のユーザーが同時に 1 つのコンピューティングを簡単に使用できます。 それぞれ 100 万行のテーブルが 2 つあるとします。

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

これらのテーブル サイズは、Apache Spark で管理可能です。 ただし、各行に空の文字列を含む join_key 列が含まれています。 これは、データが完全にクリーンではない場合や、一部のキーが他のキーよりも一般的であるようなデータ スキューがある場合に発生する可能性があります。 これらの空の結合キーは、他の値よりもはるかに一般的です。

次のコードでは、アナリストはキーによってこれら 2 つのテーブルを結合しています。これにより 1 兆個の結果の出力が生成され、これらすべてが 1 つの Executor (" " キーを取得する Executor) で生成されます。

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

このクエリは実行されているように見えます。 しかし、データを理解していないため、アナリストには、ジョブの実行過程で 1 つのタスクのみが残ってるように見えます。 クエリが完了することはなく、アナリストには、うまくいかなかった理由についての不満と混乱が残ります。

この場合では、問題のある結合キーは 1 つのみです。 さらに多い場合もありえます。

Query Watchdog を有効にして構成する

Query Watchdog を有効にして構成するには、次の手順が必要です。

  • spark.databricks.queryWatchdog.enabled で Watchdog を有効にします。
  • spark.databricks.queryWatchdog.minTimeSecs でタスク ランタイムを構成します。
  • spark.databricks.queryWatchdog.minOutputRows で出力を表示します。
  • spark.databricks.queryWatchdog.outputRatioThreshold で出力比率を構成します。

クエリが入力行数に対して作成する出力行が多すぎるようになるのを防ぐには、Query Watchdog を有効にし、出力行の最大数を入力行数の倍数となるよう構成します。 この例では、比率 1000 (既定値) を使用します。

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

後者の構成では、特定のタスクで入力行数の 1,000 倍を超える行数を生成しないことを宣言しています。

ヒント

出力比率は完全にカスタマイズ可能です。 低目の設定から始め、自分とチームに合ったしきい値を確認することをお勧めします。 1,000 から 10,000 の範囲から始めることをお勧めします。

Query Watchdog では、完了しないジョブのためにユーザーがコンピューティング リソースを占有することを防ぐだけでなく、完了しないであろうクエリを高速で失敗させることで時間を節約します。 たとえば、次のクエリは比率を超えるため、数分で失敗します。

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

こうした場合に表示される情報を次に示します。

Query Watchdog

通常、Query Watchdog を有効にし、出力/入力しきい値の比率を設定することで十分ですが、spark.databricks.queryWatchdog.minTimeSecsspark.databricks.queryWatchdog.minOutputRows という 2 つの追加のプロパティを設定するという選択肢もあります。 これらのプロパティは、クエリ内の特定のタスクをキャンセルするまでに実行する必要がある最小時間と、そのクエリ内のタスクの出力行の最小数を指定します。

たとえば、タスクごとに多数の行を生成する可能性を許容する場合は、minTimeSecs をより大きな値に設定できます。 同様に、クエリ内のタスクが 1,000 万行を生成するまではクエリを停止しない場合は、spark.databricks.queryWatchdog.minOutputRows を 1,000 万に設定できます。 出力/入力比率を超えた場合でも、設定値を超えなければクエリは正常に終了します。

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

ヒント

ノートブックで Query Watchdog を構成する場合、コンピューティングの再起動時に構成が保持されません。 コンピューティングのすべてのユーザーに対して Query Watchdog を構成する場合は、コンピューティング構成を使用することをお勧めします。

非常に大規模なデータセットに対するクエリを検出する

もう 1 つのよくある大規模なクエリは、大規模なテーブル/データセットから大量のデータがスキャンされるという場合です。 スキャン操作が長時間続き、コンピューティング リソースが飽和状態になります (大規模な Hive テーブルのメタデータを読み取る場合でも、かなりの時間がかかる場合があります)。 maxHivePartitions を設定すると、大規模な Hive テーブルから多くのパーティションがフェッチされるのを防ぐことができます。 同様に、maxQueryTasks を設定して、非常に大規模なデータセットに対するクエリを制限することもできます。

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Query Watchdog を有効にするべきケース

SQL アナリストとデータ サイエンティストが特定のコンピューティングを共有しており、管理者は、クエリが "妥当な" 状況で実行されるのを確認する必要がある、アドホック分析コンピューティングに対しては、Query Watchdog を有効にするべきです。

Query Watchdog を無効にするべきケース

一般に、ETL シナリオで使用されるクエリをむやみにキャンセルすることは、ループ内でエラーを修正するのが通常は人ではないため、お勧めしません。 アドホック分析コンピューティングを除くすべてのコンピューティングに対して Query Watchdog を無効にすることをお勧めします。