pandas 用戶定義函式

pandas 使用者定義函式 (UDF)也稱為向量化 UDF—是使用者定義函式,會使用 Apache Arrow 來傳輸數據和 pandas 來處理數據。 pandas UDF 允許向量化作業,相較於一次 Python UDF 的數據列,可提升高達 100 倍的效能。

如需背景資訊,請參閱即將推出的Apache Spark 3.0版本中的新 Pandas UDF 和 Python 類型提示部落格文章

您可以使用 關鍵詞pandas_udf來定義 pandas UDF 作為裝飾專案,並以 Python 類型提示包裝函式。 本文說明不同類型的 pandas UDF,並示範如何搭配類型提示使用 pandas UDF。

數列到數列 UDF

您可以使用數列至系列 pandas UDF 來向量化純量作業。 您可以使用它們搭配 API,例如 selectwithColumn

Python 函式應採用 pandas 數列做為輸入,並傳回相同長度的 pandas 系列,您應該在 Python 類型提示中指定這些專案。 Spark 會執行 pandas UDF,方法是將數據行分割成批次,呼叫每個批次的函式作為數據的子集,然後串連結果。

下列範例示範如何建立可計算 2 個數據行乘積的 pandas UDF。

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

數列反覆運算器到數列 UDF 的反覆運算器

反覆運算器 UDF 與純量 pandas UDF 相同,不同之處在於:

  • Python 函式
    • 採用批次反覆運算器,而不是單一輸入批次作為輸入。
    • 傳回輸出批次的反覆運算器,而不是單一輸出批次。
  • 反覆運算器中整個輸出的長度應該與整個輸入的長度相同。
  • 包裝的 pandas UDF 會採用單一 Spark 數據行作為輸入。

您應該將 Python 類型提示指定為 Iterator[pandas.Series] ->Iterator[pandas.Series]

當 UDF 執行需要初始化某些狀態時,此 pandas UDF 很有用,例如,載入機器學習模型檔案以將推斷套用至每個輸入批次。

下列範例示範如何使用反覆運算器支援建立 pandas UDF。

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

多個數列反覆運算器到數列 UDF 的反覆運算器

數列 UDF 反覆運算器有多個數列反覆運算器的反覆運算器,其特性和限制與 數位 UDF 的反覆運算器類似。 指定的函式會採用批次反覆運算器,並輸出批次反覆運算器。 當UDF執行需要初始化某些狀態時,它也很有用。

差異包括:

  • 基礎 Python 函式會採用 pandas 系列 Tuple 的反覆運算器
  • 包裝的 pandas UDF 會採用 多個 Spark 數據行做為輸入。

您會將類型提示指定為 Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series]

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

純量 UDF 的數列

對純量 pandas UDF 的數列類似於 Spark 聚合函數。 純量 pandas UDF 的數列會定義一或多個 pandas Series 到純量值的匯總,其中每個 pandas 數列都代表 Spark 數據行。 您可以使用數列搭配 、、 withColumngroupBy.aggpyspark.sql.WindowselectAPI 來純量 pandas UDF。

您會將類型提示表示為 pandas.Series, ... ->Any。 傳回型別應該是基本數據類型,而傳回的純量可以是 Python 基本類型, int 例如 或 float 或 NumPy 資料類型,例如 numpy.int64numpy.float64Any 在理想情況下應該是特定的純量類型。

這種類型的 UDF 不支援 部分匯總,而且每個群組的所有數據都會載入記憶體中。

下列範例示範如何使用這種類型的 UDF 來計算具有 selectgroupBywindow 作業的平均值:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

如需詳細的使用方式,請參閱 pyspark.sql.functions.pandas_udf

使用方式

設定箭號批次大小

注意

此設定不會影響使用共用存取模式和 Databricks Runtime 13.3 LTS 到 14.2 設定的計算。

Spark 中的數據分割會轉換成箭頭記錄批次,這可暫時導致 JVM 中的高記憶體使用量。 若要避免記憶體不足的例外狀況,您可以將組態設定為 spark.sql.execution.arrow.maxRecordsPerBatch 整數,以調整箭號記錄批次的大小,以決定每個批次的數據列數目上限。 每個批次的預設值為 10,000 筆記錄。 如果數據行數目很大,則應該據以調整值。 使用此限制,每個數據分割區會分成 1 或多個記錄批次進行處理。

具有時區語意的時間戳

Spark 會在內部將時間戳儲存為 UTC 值,而不含指定時區的時間戳數據會轉換成 UTC,並以微秒解析度轉換成 UTC。

在Spark中匯出或顯示時間戳數據時,工作階段時區會用來當地語系化時間戳值。 會話時區會設定為組態, spark.sql.session.timeZone 並預設為 JVM 系統本機時區。 pandas 會使用具有 datetime64 nanosecond 解析的類型, datetime64[ns]且具有每個數據行的選擇性時區。

當時間戳數據從 Spark 傳輸到 pandas 時,它會轉換成 nanoseconds,而每個數據行都會轉換成 Spark 會話時區,然後當地語系化為該時區,這會移除時區,並將值顯示為當地時間。 呼叫 或 pandas_udf 使用時間戳數據行時toPandas(),就會發生此情況。

當時間戳數據從 pandas 傳輸到 Spark 時,它會轉換成 UTC 微秒。 當使用 pandas DataFrame 呼叫 createDataFrame ,或從 pandas UDF 傳回時間戳時,就會發生這種情況。 這些轉換會自動完成,以確保Spark具有預期的格式數據,因此您不需要自行執行任何轉換。 任何 nanosecond 值會截斷。

標準 UDF 會將時間戳數據載入為 Python datetime 物件,這與 pandas 時間戳不同。 若要獲得最佳效能,建議您在使用 pandas UDF 中的時間戳時,使用 pandas 時間序列功能。 如需詳細資訊,請參閱 時間序列/日期功能

範例筆記本

下列筆記本說明您可以使用 pandas UDF 達成的效能改進:

pandas UDF 基準檢驗筆記本

取得筆記本