Funciones definidas por el usuario: Scala

Este artículo contiene ejemplos de funciones definidas por el usuario (UDF) de Scala. Muestra cómo registrar UDF, cómo invocar UDF y advertencias sobre el orden de evaluación de subexpresiones en Spark SQL.

Registro de una función como UDF

val squared = (s: Long) => {
  s * s
}
spark.udf.register("square", squared)

Llamada a la UDF en Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test

Uso de UDF con DataFrames

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

Orden de evaluación y comprobación de valores NULL

Las SQL Spark (incluidas SQL y las API dataframe y dataset) no garantizan el orden de evaluación de las subexpresiones. En concreto, las entradas de un operador o una función no se evalúan necesariamente de izquierda a derecha ni en ningún otro orden fijo. Por ejemplo, las expresiones lógicas y no tienen semántica de "cortocircuito" de izquierda a AND OR derecha.

Por lo tanto, es peligroso confiar en los efectos secundarios o el orden de evaluación de las expresiones booleanas y en el orden de las cláusulas y , ya que estas expresiones y cláusulas se pueden reordenar durante la optimización y el planeamiento de WHERE HAVING consultas. En concreto, si una UDF se basa en la semántica de cortocircuito en SQL para la comprobación de valores NULL, no hay ninguna garantía de que la comprobación nula se realizará antes de invocar la UDF. Por ejemplo,

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

Esta WHERE cláusula no garantiza que se strlen invoque la UDF después de filtrar los valores NULL.

Para realizar una comprobación correcta de los valores NULL, se recomienda realizar una de las siguientes acciones:

  • Hacer que la PROPIA UDF tenga en cuenta valores NULL y realizar la comprobación de valores NULL dentro de la propia UDF
  • Uso IF de CASE WHEN expresiones o para realizar la comprobación de valores NULL e invocar la UDF en una rama condicional
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok