問題: 非決定的なカスタム UDF が原因でジョブがハングする Apache SparkProblem: Apache Spark Jobs Hang Due to Non-deterministic Custom UDF

問題点Problem

Spark ユーザー定義関数 (UDF) の非決定的な動作によって Apache Spark ジョブが無期限にハングすることがあります。Sometimes Apache Spark jobs hang indefinitely due to the non-deterministic behavior of a Spark User-Defined Function (UDF). このような関数の例を次に示します。Here is an example of such a function:

val convertorUDF = (commentCol: String) =>
    {
              #UDF definition
    }
val translateColumn = udf(convertorUDF)

@No__t_0 API を使用してこの UDF を呼び出し、結果の DataFrame に何らかのフィルター変換を適用すると、各レコードに対して複数回実行され、アプリケーションのパフォーマンスに影響を与える可能性があります。If you call this UDF using the withColumn() API and then apply some filter transformation on the resulting DataFrame, the UDF could potentially execute multiple times for each record, affecting application performance.

val translatedDF = df.withColumn("translatedColumn", translateColumn( df("columnToTranslate")))
val filteredDF = translatedDF.filter(!translatedDF("translatedColumn").contains("Invalid URL Provided")) && !translatedDF("translatedColumn").contains("Unable to connect to Microsoft API"))

原因Cause

場合によっては、決定論的な UDF の動作が不明確で、UDF の定義に応じて重複の呼び出しが実行されることがあります。Sometimes a deterministic UDF can behave nondeterministically, performing duplicate invocations depending on the definition of the UDF. この動作は、データフレームで UDF を使用して withColumn() API を使用して列を追加し、結果の DataFrame に変換 (フィルター) を適用する場合によく見られます。You often see this behavior when you use a UDF on a DataFrame to add an additional column using the withColumn() API, and then apply a transformation (filter) to the resulting DataFrame.

ソリューションSolution

Udf は決定的である必要があります。UDFs must be deterministic. 最適化のため、重複した呼び出しが削除されたり、関数がクエリに存在する回数よりも多くの時間を呼び出したりする可能性があります。Due to optimization, duplicate invocations might be eliminated or the function can be invoked more times than it is present in the query.

最適なオプションは、UDF を使用している DataFrame をキャッシュすることです。The better option is to cache the DataFrame where you are using the UDF. @No__t_0 に大量のデータが含まれている場合は、Parquet フォーマットファイルに書き込むのが最適です。If the DataFrame contains a large amount of data, then writing it to a Parquet format file is optimal.

次のコードを使用して、結果をキャッシュすることができます。You can use the following code to cache the result:

val translatedDF = df.withColumn("translatedColumn", translateColumn( df("columnToTranslate"))).cache()