användardefinierade pandas-funktioner
En användardefinierad pandas-funktion (UDF) – även kallad vektoriserad UDF – är en användardefinierad funktion som använder Apache Arrow för att överföra data och pandas för att arbeta med data. Pandas UDF:er tillåter vektoriserade åtgärder som kan öka prestandan med upp till 100 gånger jämfört med python-UDF:er i rad i taget.
Bakgrundsinformation finns i blogginlägget New Pandas UDF:er och Python Type Hints i den kommande versionen av Apache Spark 3.0 och Optimera konvertering mellan PySpark och Pandas DataFrames.
Du definierar en Pandas UDF med nyckelordet pandas_udf
som dekoratör och omsluter funktionen med en Python-typtips.
Den här artikeln beskriver de olika typerna av Pandas UDF:er och visar hur du använder Pandas UDF:er med typtips.
Serie-till-serie-UDF
Du använder en serie till serie pandas UDF för att vektorisera skalära åtgärder.
Du kan använda dem med API:er som select
och withColumn
.
Python-funktionen bör ta en Pandas-serie som indata och returnera en Pandas-serie med samma längd, och du bör ange dessa i Tips av Python-typ. Spark kör en Pandas UDF genom att dela upp kolumner i batchar, anropa funktionen för varje batch som en delmängd av data och sedan sammanfoga resultaten.
I följande exempel visas hur du skapar en Pandas UDF som beräknar produkten av 2 kolumner.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterator av serie till iterator av serie UDF
En iterator-UDF är samma som en skalär Pandas UDF förutom:
- Python-funktionen
- Tar en iterator av batchar i stället för en enda indatabatch som indata.
- Returnerar en iterator för utdatabatcherna i stället för en enda utdatabatch.
- Längden på hela utdata i iteratorn ska vara samma som längden på hela indata.
- Den omslutna Pandas UDF tar en enda Spark-kolumn som indata.
Du bör ange tips av Python-typ som Iterator[pandas.Series]
->Iterator[pandas.Series]
.
Denna Pandas UDF är användbar när UDF-körningen kräver initiering av något tillstånd, till exempel att läsa in en maskininlärningsmodellfil för att tillämpa slutsatsdragning för varje indatabatch.
I följande exempel visas hur du skapar en Pandas UDF med iteratorstöd.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterator för flera serier till iterator i serie UDF
En iterator för flera serier till Iterator i serie UDF har liknande egenskaper och begränsningar som Iterator of Series to Iterator of Series UDF. Den angivna funktionen tar en iterator av batchar och matar ut en iterator med batchar. Det är också användbart när UDF-körningen kräver att vissa tillstånd initieras.
Skillnaderna är:
- Den underliggande Python-funktionen tar en iterator för en tupplar av Pandas Series.
- Den omslutna Pandas UDF tar flera Spark-kolumner som indata.
Du anger typtipsen som Iterator[Tuple[pandas.Series, ...]]
->Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Serie till skalär UDF
Serier till skalär pandas UDF:er liknar Spark-mängdfunktioner.
En serie till skalär Pandas UDF definierar en aggregering från en eller flera Pandas-serier till ett skalärt värde, där varje Pandas-serie representerar en Spark-kolumn.
Du använder en serie för att skala Pandas UDF med API:er som select
, withColumn
, groupBy.agg
och pyspark.sql.Window.
Du uttrycker typtipset som pandas.Series, ...
->Any
. Returtypen ska vara en primitiv datatyp och den returnerade skalären kan vara antingen en Python-primitiv typ, int
till exempel eller float
en NumPy-datatyp som numpy.int64
eller numpy.float64
. Any
bör helst vara en specifik skalär typ.
Den här typen av UDF stöder inte partiell aggregering och alla data för varje grupp läses in i minnet.
I följande exempel visas hur du använder den här typen av UDF för att beräkna medelvärdet med select
, groupBy
och window
åtgärder:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Detaljerad användning finns i pyspark.sql.functions.pandas_udf.
Användning
Ange batchstorlek för pil
Datapartitioner i Spark konverteras till pilpostbatchar, vilket tillfälligt kan leda till hög minnesanvändning i JVM. För att undvika möjliga undantag från minnesbrist kan du justera storleken på pilpostbatcherna genom att ange konfigurationen spark.sql.execution.arrow.maxRecordsPerBatch
till ett heltal som avgör det maximala antalet rader för varje batch. Standardvärdet är 10 000 poster per batch. Om antalet kolumner är stort bör värdet justeras i enlighet med detta. Med den här gränsen är varje datapartition indelad i 1 eller flera postbatchar för bearbetning.
Tidsstämpel med tidszonssemantik
Spark lagrar tidsstämplar internt som UTC-värden, och tidsstämpeldata som tas in utan en angiven tidszon konverteras som lokal tid till UTC med mikrosekundersupplösning.
När tidsstämpeldata exporteras eller visas i Spark används sessionstidszonen för att lokalisera tidsstämpelvärdena. Sessionstidszonen anges med konfigurationen spark.sql.session.timeZone
och är som standard den lokala tidszonen för JVM-systemet. Pandas använder en datetime64
typ med nanosekunders upplösning, , datetime64[ns]
med valfri tidszon per kolumn.
När tidsstämpeldata överförs från Spark till Pandas konverteras de till nanosekunder och varje kolumn konverteras till Spark-sessionens tidszon och lokaliseras sedan till tidszonen, vilket tar bort tidszonen och visar värden som lokal tid. Detta inträffar när du anropar toPandas()
eller pandas_udf
med tidsstämpelkolumner.
När tidsstämpeldata överförs från Pandas till Spark konverteras de till UTC-mikrosekunder. Detta inträffar när du anropar createDataFrame
med en Pandas DataFrame eller när du returnerar en tidsstämpel från en Pandas UDF. Dessa konverteringar görs automatiskt för att säkerställa att Spark har data i förväntat format, så det är inte nödvändigt att göra någon av dessa konverteringar själv. Nanosekunder trunkeras.
En standard-UDF läser in tidsstämpeldata som Python datetime-objekt, vilket skiljer sig från en Pandas-tidsstämpel. För att få bästa möjliga prestanda rekommenderar vi att du använder pandas-tidsseriefunktioner när du arbetar med tidsstämplar i en Pandas UDF. Mer information finns i Time Series/Date-funktioner.
Exempelnotebook-fil
Följande notebook-fil illustrerar de prestandaförbättringar som du kan uppnå med Pandas UDF:er: