Función escalar definida por el usuario - Python

Este artículo contiene ejemplos de funciones definidas por el usuario (UDF) de Python. Muestra cómo registrar UDF, cómo invocar UDF y advertencias con respecto al orden de evaluación de las subexpresiones en Spark SQL.

En Databricks Runtime 14.0 y versiones posteriores, puede usar funciones de tabla definidas por el usuario (UDTF) de Python para registrar funciones que devuelven relaciones completas en lugar de valores escalares. Consulte ¿Qué son las funciones de tabla definidas por el usuario de Python?.

Nota:

En Databricks Runtime 12.2 LTS y versiones posteriores, las UDF de Python y Pandas no se admiten en Unity Catalog en el proceso que usa el modo de acceso compartido. Las UDF escalares de Python y las UDF escalares de Pandas se admiten en Databricks Runtime 13.3 LTS y versiones posteriores para todos los modos de acceso.

En Databricks Runtime 13.3 LTS y versiones posteriores, puede registrar UDF escalares de Python en Unity Catalog mediante la sintaxis SQL. Consulte Funciones definidas por el usuario (UDF) en Unity Catalog.

Registro de una función como UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Opcionalmente, puede establecer el tipo de valor devuelto de la UDF. El tipo de valor devuelto predeterminado es StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Llamada a la UDF en Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Uso de UDF con DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Como alternativa, puede declarar la misma UDF mediante la sintaxis de anotación:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Orden de evaluación y comprobación de valores NULL

Spark SQL (incluidas SQL y las DataFrame API y Dataset API) no garantiza el orden de evaluación de las subexpresiones. En concreto, las entradas de un operador o función no se evalúan necesariamente de izquierda a derecha ni en ningún otro orden fijo. Por ejemplo, las expresiones lógicas AND y OR no tienen semántica de "cortocircuito" de izquierda a derecha.

Por lo tanto, es peligroso basarse en los efectos secundarios o el orden de evaluación de las expresiones booleanas y el orden de las cláusulas WHERE y HAVING, ya que estas expresiones y cláusulas se pueden reordenar durante la optimización y el planeamiento de consultas. En concreto, si una UDF se basa en la semántica de cortocircuito en SQL para la comprobación de valores NULL, no hay ninguna garantía de que se realizará la comprobación de valores NULL antes de invocar la UDF. Por ejemplo,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Esta cláusula WHERE no garantiza que se invoque la UDF strlen después de filtrar los valores NULL.

Para realizar una comprobación correcta de los valores NULL, se recomienda realizar una de las siguientes acciones:

  • Hacer que la UDF tenga en cuenta los valores NULL y realizar la comprobación de valores NULL dentro de la propia UDF
  • Uso de expresiones IF o CASE WHEN para realizar la comprobación de valores NULL e invocar la UDF en una rama condicional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok