användardefinierade pandas-funktioner

En användardefinierad pandas-funktion (UDF) – även kallad vektoriserad UDF – är en användardefinierad funktion som använder Apache Arrow för att överföra data och pandas för att arbeta med data. Pandas UDF:er tillåter vektoriserade åtgärder som kan öka prestandan med upp till 100 gånger jämfört med python-UDF:er i rad i taget.

Bakgrundsinformation finns i blogginlägget New Pandas UDF:er och Python Type Hints i den kommande versionen av Apache Spark 3.0 och Optimera konvertering mellan PySpark och Pandas DataFrames.

Du definierar en Pandas UDF med nyckelordet pandas_udf som dekoratör och omsluter funktionen med en Python-typtips. Den här artikeln beskriver de olika typerna av Pandas UDF:er och visar hur du använder Pandas UDF:er med typtips.

Serie-till-serie-UDF

Du använder en serie till serie pandas UDF för att vektorisera skalära åtgärder. Du kan använda dem med API:er som select och withColumn.

Python-funktionen bör ta en Pandas-serie som indata och returnera en Pandas-serie med samma längd, och du bör ange dessa i Tips av Python-typ. Spark kör en Pandas UDF genom att dela upp kolumner i batchar, anropa funktionen för varje batch som en delmängd av data och sedan sammanfoga resultaten.

I följande exempel visas hur du skapar en Pandas UDF som beräknar produkten av 2 kolumner.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iterator av serie till iterator av serie UDF

En iterator-UDF är samma som en skalär Pandas UDF förutom:

  • Python-funktionen
    • Tar en iterator av batchar i stället för en enda indatabatch som indata.
    • Returnerar en iterator för utdatabatcherna i stället för en enda utdatabatch.
  • Längden på hela utdata i iteratorn ska vara samma som längden på hela indata.
  • Den omslutna Pandas UDF tar en enda Spark-kolumn som indata.

Du bör ange tips av Python-typ som Iterator[pandas.Series] ->Iterator[pandas.Series].

Denna Pandas UDF är användbar när UDF-körningen kräver initiering av något tillstånd, till exempel att läsa in en maskininlärningsmodellfil för att tillämpa slutsatsdragning för varje indatabatch.

I följande exempel visas hur du skapar en Pandas UDF med iteratorstöd.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iterator för flera serier till iterator i serie UDF

En iterator för flera serier till Iterator i serie UDF har liknande egenskaper och begränsningar som Iterator of Series to Iterator of Series UDF. Den angivna funktionen tar en iterator av batchar och matar ut en iterator med batchar. Det är också användbart när UDF-körningen kräver att vissa tillstånd initieras.

Skillnaderna är:

  • Den underliggande Python-funktionen tar en iterator för en tupplar av Pandas Series.
  • Den omslutna Pandas UDF tar flera Spark-kolumner som indata.

Du anger typtipsen som Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Serie till skalär UDF

Serier till skalär pandas UDF:er liknar Spark-mängdfunktioner. En serie till skalär Pandas UDF definierar en aggregering från en eller flera Pandas-serier till ett skalärt värde, där varje Pandas-serie representerar en Spark-kolumn. Du använder en serie för att skala Pandas UDF med API:er som select, withColumn, groupBy.aggoch pyspark.sql.Window.

Du uttrycker typtipset som pandas.Series, ... ->Any. Returtypen ska vara en primitiv datatyp och den returnerade skalären kan vara antingen en Python-primitiv typ, int till exempel eller float en NumPy-datatyp som numpy.int64 eller numpy.float64. Any bör helst vara en specifik skalär typ.

Den här typen av UDF stöder inte partiell aggregering och alla data för varje grupp läses in i minnet.

I följande exempel visas hur du använder den här typen av UDF för att beräkna medelvärdet med select, groupByoch window åtgärder:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Detaljerad användning finns i pyspark.sql.functions.pandas_udf.

Användning

Ange batchstorlek för pil

Datapartitioner i Spark konverteras till pilpostbatchar, vilket tillfälligt kan leda till hög minnesanvändning i JVM. För att undvika möjliga undantag från minnesbrist kan du justera storleken på pilpostbatcherna genom att ange konfigurationen spark.sql.execution.arrow.maxRecordsPerBatch till ett heltal som avgör det maximala antalet rader för varje batch. Standardvärdet är 10 000 poster per batch. Om antalet kolumner är stort bör värdet justeras i enlighet med detta. Med den här gränsen är varje datapartition indelad i 1 eller flera postbatchar för bearbetning.

Tidsstämpel med tidszonssemantik

Spark lagrar tidsstämplar internt som UTC-värden, och tidsstämpeldata som tas in utan en angiven tidszon konverteras som lokal tid till UTC med mikrosekundersupplösning.

När tidsstämpeldata exporteras eller visas i Spark används sessionstidszonen för att lokalisera tidsstämpelvärdena. Sessionstidszonen anges med konfigurationen spark.sql.session.timeZone och är som standard den lokala tidszonen för JVM-systemet. Pandas använder en datetime64 typ med nanosekunders upplösning, , datetime64[ns]med valfri tidszon per kolumn.

När tidsstämpeldata överförs från Spark till Pandas konverteras de till nanosekunder och varje kolumn konverteras till Spark-sessionens tidszon och lokaliseras sedan till tidszonen, vilket tar bort tidszonen och visar värden som lokal tid. Detta inträffar när du anropar toPandas() eller pandas_udf med tidsstämpelkolumner.

När tidsstämpeldata överförs från Pandas till Spark konverteras de till UTC-mikrosekunder. Detta inträffar när du anropar createDataFrame med en Pandas DataFrame eller när du returnerar en tidsstämpel från en Pandas UDF. Dessa konverteringar görs automatiskt för att säkerställa att Spark har data i förväntat format, så det är inte nödvändigt att göra någon av dessa konverteringar själv. Nanosekunder trunkeras.

En standard-UDF läser in tidsstämpeldata som Python datetime-objekt, vilket skiljer sig från en Pandas-tidsstämpel. För att få bästa möjliga prestanda rekommenderar vi att du använder pandas-tidsseriefunktioner när du arbetar med tidsstämplar i en Pandas UDF. Mer information finns i Time Series/Date-funktioner.

Exempelnotebook-fil

Följande notebook-fil illustrerar de prestandaförbättringar som du kan uppnå med Pandas UDF:er:

Pandas UDF:er benchmark notebook

Hämta notebook-fil