Compartilhar via


O que são funções Python de tabela definidas pelo usuário?

Importante

Esse recurso está em uma versão prévia.

Uma função de tabela definida pelo usuário (UDTF) permite registrar funções que retornam tabelas em vez de valores escalares. As UDTFs funcionam de forma semelhante às CTEs (expressões de tabela comuns) quando referenciadas em consultas SQL. Você faz referência a UDTFs na cláusula FROM de uma instrução SQL e pode encadear operadores adicionais do Spark SQL aos resultados.

As UDTFs são registradas no SparkSession local e isoladas no nível do notebook ou do trabalho.

As UDTFs têm suporte na computação configurada com modos de acesso compartilhado atribuídos ou sem isolamento. Você não pode usar UDTFs no modo de acesso compartilhado.

Não é possível registrar UDTFs como objetos no Catálogo do Unity e elas não podem ser usadas com SQL Warehouses.

Qual é a sintaxe básica de uma UDTF?

O Apache Spark implementa UDTFs Python como classes Python com um método eval obrigatório.

Você emite resultados como linhas usando yield.

Para que o Apache Spark use sua classe como UDTF, você precisará importar a função PySpark udtf.

O Databricks recomenda usar essa função como decorador e sempre especificar explicitamente nomes e tipos de campo com a opção returnType.

O exemplo abaixo cria uma tabela simples com base em entradas escalares por meio de uma UDTF:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Você pode usar a sintaxe do Python *args e implementar a lógica para lidar com um número indeterminado de valores de entrada. O exemplo abaixo retorna o mesmo resultado, mas verificando explicitamente os argumentos nos tipos e no tamanho da entrada:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Registrar uma UDTF

Você pode registrar uma UDTF no SparkSession atual para uso em consultas SQL por meio da seguinte sintaxe:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

O exemplo abaixo registra uma UDTF do Python no SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

Depois de registrada, você pode usar a UDTF no SQL usando o comando magic %sql ou a função spark.sql(), como mostram os seguintes exemplos:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

Gerando resultados

As UDTFs do Python são implementadas com yield para retornar resultados. Os resultados são sempre retornados como uma tabela que contém 0 ou mais linhas com o esquema especificado.

Na transmissão de argumentos escalares, a lógica no método eval será executada exatamente uma vez com o conjunto de argumentos escalares transmitidos. No caso de argumentos de tabela, o método eval é executado uma vez para cada linha na tabela de entrada.

A lógica pode ser gravada para retornar 0, 1 ou muitas linhas por entrada.

A UDTF a seguir demonstra o retorno de 0 ou mais linhas para cada entrada segmentando cada item de uma lista separada por vírgulas em entradas diferentes:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

Transmitir um argumento de tabela a uma UDTF

Você pode usar a palavra-chave SQL TABLE() para transmitir um argumento de tabela a uma UDTF. Você pode usar um nome de tabela ou uma consulta, como mostram os seguintes exemplos:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

Os argumentos de tabela são processados uma linha de cada vez. Você pode usar anotações de campo de coluna PySpark padrão para interagir com as colunas em cada linha. O exemplo a seguir demonstra a importação explícita do tipo PySpark Row e, em seguida, a filtragem da tabela transmitida no campo id:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

Transmitir argumentos escalares a uma UDTF

Você pode transmitir argumentos escalares a uma UDTF pela combinação dos seguintes valores:

  • Constantes escalares
  • Funções escalares
  • Campos em uma relação

Para transmitir campos em uma relação, você precisará registrar a UDTF e usar a palavra-chave SQL LATERAL.

Observação

Você pode usar aliases de tabela em linha para desambiguar colunas.

O exemplo abaixo demonstra o uso de LATERAL para transmitir campos de uma tabela a uma UDTF:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

Definir valores padrão para UDTFs

Opcionalmente, você pode implementar um método __init__ a fim de definir valores padrão para variáveis de classe que podem ser referenciadas em sua lógica Python.

O método __init__ não aceita argumentos e não tem acesso a variáveis ou a informações de estado no SparkSession.

Usar o Apache Arrow com UDTFs

O Databricks recomenda usar o Apache Arrow com UDTFs que recebem uma pequena quantidade de dados como entrada, mas geram uma tabela grande.

Você pode habilitar o Arrow com a especificação do parâmetro useArrow na declaração da UDTF, como no seguinte exemplo:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1