Partilhar via


Funções definidas pelo utilizador do Pandas

Uma função definida pelo usuário pandas (UDF) — também conhecida como UDF vetorizada — é uma função definida pelo usuário que usa a seta Apache para transferir dados e pandas para trabalhar com os dados. Os pandas UDFs permitem operações vetorizadas que podem aumentar o desempenho em até 100x em comparação com UDFs Python linha-a-vez.

Para obter informações básicas, consulte a postagem do blog New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

Você define um UDF pandas usando a palavra-chave pandas_udf como decorador e envolve a função com uma dica de tipo Python. Este artigo descreve os diferentes tipos de pandas UDFs e mostra como usar pandas UDFs com dicas de tipo.

Série a Série UDF

Você usa um UDF de pandas Series to Series para vetorizar operações escalares. Você pode usá-los com APIs como select e withColumn.

A função Python deve tomar uma série pandas como entrada e retornar uma série pandas do mesmo comprimento, e você deve especificá-las nas dicas de tipo Python. O Spark executa um UDF pandas dividindo colunas em lotes, chamando a função para cada lote como um subconjunto dos dados e, em seguida, concatenando os resultados.

O exemplo a seguir mostra como criar um UDF pandas que calcula o produto de 2 colunas.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iterador de Série para Iterador de Série UDF

Um iterador UDF é o mesmo que um pandas escalar UDF, exceto:

  • A função Python
    • Usa um iterador de lotes em vez de um único lote de entrada como entrada.
    • Retorna um iterador de lotes de saída em vez de um único lote de saída.
  • O comprimento de toda a saída no iterador deve ser o mesmo que o comprimento de toda a entrada.
  • O UDF de pandas embrulhado usa uma única coluna Spark como entrada.

Você deve especificar a dica de tipo Python como Iterator[pandas.Series] ->Iterator[pandas.Series].

Este pandas UDF é útil quando a execução UDF requer a inicialização de algum estado, por exemplo, carregando um arquivo de modelo de aprendizado de máquina para aplicar inferência a cada lote de entrada.

O exemplo a seguir mostra como criar um UDF pandas com suporte a iteradores.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iterador de várias séries para iterador de séries UDF

Um Iterador de várias séries para Iterador de Série UDF tem características e restrições semelhantes como Iterador de Série para Iterador de Série UDF. A função especificada usa um iterador de lotes e produz um iterador de lotes. Também é útil quando a execução UDF requer a inicialização de algum estado.

As diferenças são:

  • A função Python subjacente leva um iterador de uma tupla de pandas Series.
  • O UDF de pandas embrulhado usa várias colunas do Spark como entrada.

Você especifica as dicas de tipo como Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Série para escalar UDF

As UDFs de séries para pandas escalares são semelhantes às funções agregadas Spark. Uma Série para pandas escalares UDF define uma agregação de uma ou mais séries de pandas para um valor escalar, onde cada série de pandas representa uma coluna Spark. Você usa uma série para escalar pandas UDF com APIs como select, withColumn, groupBy.agge pyspark.sql.Window.

Você expressa a dica de tipo como pandas.Series, ... ->Any. O tipo de retorno deve ser um tipo de dados primitivo, e o escalar retornado pode ser um tipo primitivo Python, por exemplo, int ou float um tipo de dados NumPy, como numpy.int64 ou numpy.float64. Any idealmente deve ser um tipo escalar específico.

Este tipo de UDF não suporta agregação parcial e todos os dados de cada grupo são carregados na memória.

O exemplo a seguir mostra como usar esse tipo de UDF para calcular a média com select, groupBye window operações:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Para uso detalhado, consulte pyspark.sql.functions.pandas_udf.

Utilização

Definindo o tamanho do lote de seta

Nota

Essa configuração não tem impacto na computação configurada com o modo de acesso compartilhado e o Databricks Runtime 13.3 LTS a 14.2.

As partições de dados no Spark são convertidas em lotes de registro de seta, o que pode levar temporariamente ao alto uso de memória na JVM. Para evitar possíveis exceções de falta de memória, você pode ajustar o tamanho dos lotes de registro de seta definindo a spark.sql.execution.arrow.maxRecordsPerBatch configuração como um inteiro que determina o número máximo de linhas para cada lote. O valor padrão é 10.000 registros por lote. Se o número de colunas for grande, o valor deve ser ajustado em conformidade. Usando esse limite, cada partição de dados é dividida em 1 ou mais lotes de registro para processamento.

Carimbo de data/hora com semântica de fuso horário

O Spark armazena internamente carimbos de data/hora como valores UTC, e os dados de carimbo de data/hora trazidos sem um fuso horário especificado são convertidos como hora local em UTC com resolução de microssegundos.

Quando os dados de carimbo de data/hora são exportados ou exibidos no Spark, o fuso horário da sessão é usado para localizar os valores de carimbo de data/hora. O fuso horário da sessão é definido com a spark.sql.session.timeZone configuração e o padrão é o fuso horário local do sistema JVM. O Pandas usa um datetime64 tipo com resolução de nanossegundos, datetime64[ns]com fuso horário opcional por coluna.

Quando os dados de carimbo de data/hora são transferidos do Spark para pandas, eles são convertidos em nanossegundos e cada coluna é convertida para o fuso horário da sessão do Spark e, em seguida, localizada para esse fuso horário, o que remove o fuso horário e exibe valores como hora local. Isso ocorre ao chamar toPandas() ou pandas_udf com colunas de carimbo de data/hora.

Quando os dados de carimbo de data/hora são transferidos de pandas para o Spark, eles são convertidos em microssegundos UTC. Isso ocorre ao chamar createDataFrame com um DataFrame pandas ou ao retornar um carimbo de data/hora de um UDF pandas. Essas conversões são feitas automaticamente para garantir que o Spark tenha dados no formato esperado, portanto, não é necessário fazer nenhuma dessas conversões sozinho. Todos os valores de nanossegundos são truncados.

Um UDF padrão carrega dados de carimbo de data/hora como objetos datetime do Python, o que é diferente de um carimbo de data/hora dos pandas. Para obter o melhor desempenho, recomendamos que você use a funcionalidade de séries temporais de pandas ao trabalhar com carimbos de data/hora em um UDF de pandas. Para obter detalhes, consulte Funcionalidade Série Temporal/Data.

Bloco de notas de exemplo

O caderno a seguir ilustra as melhorias de desempenho que você pode alcançar com pandas UDFs:

pandas UDFs caderno de referência

Obter o bloco de notas