Compartir a través de


¿Qué son las funciones de tabla definidas por el usuario de Python?

Importante

Esta característica está en versión preliminar pública.

Las funciones de tabla definidas por un usuario (UDTF) le permiten registrar funciones que devuelven tablas en lugar de valores escalares. Las UDTF funcionan de forma similar a expresiones de tabla comunes (CTE) cuando se hace referencia a las consultas SQL. Puede hacer referencia a UDTF en la cláusula FROM de una instrucción SQL y puede encadenar operadores adicionales de Spark SQL a los resultados.

Las UDTF se registran en SparkSession local y están aisladas en el nivel de cuaderno o trabajo.

Las UDTF se admiten en el proceso configurado con modos de acceso compartido asignados o sin aislamiento. No se pueden usar UDTF en modo de acceso compartido.

No se pueden registrar UDTF como objetos en Unity Catalog y las UDTF no se pueden usar con almacenes de SQL.

¿Cuál es la sintaxis básica de una UDTF?

Apache Spark implementa las UDTF de Python como clases de Python con un método obligatorio eval.

Los resultados se emiten como filas mediante yield.

Para que Apache Spark use la clase como una UDTF, debe importar la función PySpark udtf.

Databricks recomienda usar esta función como decorador y especificar siempre explícitamente los nombres y tipos de campo mediante la opción returnType.

En el ejemplo siguiente se crea una tabla sencilla a partir de entradas escalares mediante una UDTF:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Puede usar la sintaxis de Python *args e implementar lógica para controlar un número no especificado de valores de entrada. En el ejemplo siguiente se devuelve el mismo resultado al comprobar explícitamente la longitud de entrada y los tipos de los argumentos:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Registro de una UDTF

Puede registrar una UDTF en SparkSession actual para su uso en consultas SQL mediante la sintaxis siguiente:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

En el ejemplo siguiente se registra una UDTF de Python en SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

Una vez registrado, puede usar la UDTF en SQL mediante el comando magic %sql o la función spark.sql(), como en los ejemplos siguientes:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

Obtención de resultados

Las UDTF de Python se implementan con yield para devolver resultados. Los resultados siempre se devuelven como una tabla que contiene 0 o más filas con el esquema especificado.

Al pasar argumentos escalares, la lógica del método eval se ejecuta exactamente una vez con el conjunto de argumentos escalares pasados. Para los argumentos de tabla, el método eval se ejecuta una vez para cada fila de la tabla de entrada.

La lógica se puede escribir para devolver 0, 1 o muchas filas por entrada.

La siguiente UDTF muestra cómo devolver 0 o más filas para cada entrada separando los elementos de una lista separada por comas en entradas independientes:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

Pasar un argumento de tabla a una UDTF

Puede usar la palabra clave TABLE() de SQL para pasar un argumento de tabla a una UDTF. Puede usar un nombre de tabla o una consulta, como en los ejemplos siguientes:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

Los argumentos de tabla se procesan una fila a la vez. Puede usar anotaciones de campo de columna PySpark estándar para interactuar con columnas de cada fila. En el ejemplo siguiente se muestra cómo importar explícitamente el tipo PySpark Row y, a continuación, filtrar la tabla pasada en el campo id:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

Pasar argumentos escalares a una UDTF

Puede pasar argumentos escalares a una UDTF mediante cualquier combinación de los valores siguientes:

  • Constantes escalares
  • Funciones escalares
  • Campos en una relación

Para pasar campos en una relación, debe registrar la UDTF y usar la palabra clave de SQL LATERAL.

Nota:

Puede usar alias de tabla en línea para desambiguar columnas.

En el ejemplo siguiente se muestra cómo usar LATERAL para pasar campos de una tabla a una UDTF:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

Establecimiento de valores predeterminados para UDTF

Opcionalmente, puede implementar un método __init__ para establecer valores predeterminados para las variables de clase a las que puede hacer referencia en la lógica de Python.

El método __init__ no acepta ningún argumento y no tiene acceso a variables ni información de estado en SparkSession.

Uso de Apache Arrow con UDTFs

Databricks recomienda usar Apache Arrow para las UDTF que reciben una pequeña cantidad de datos como entrada, pero generan una tabla grande.

Puede habilitar Arrow especificando el parámetro useArrow al declarar la UDTF, como en el ejemplo siguiente:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1