pandas 函式 APIpandas function APIs

pandas 函式 Api 可讓您直接將 Python 原生函式(接受和輸出 pandas 實例)套用至 PySpark 資料框架。pandas function APIs enable you to directly apply a Python native function, which takes and outputs pandas instances, to a PySpark DataFrame. 類似于 pandas 使用者定義函式,函數 api 也會使用 Apache 箭 號來傳送資料和 pandas,以使用資料;不過,Python 類型提示在 pandas 函式 Api 中是選擇性的。Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.

有三種類型的 pandas 函數 Api:There are three types of pandas function APIs:

  • 群組對應Grouped map
  • 對應Map
  • Cogrouped 對應Cogrouped map

pandas 函式 Api 會利用 pandas UDF 執行所使用的相同內部邏輯。pandas function APIs leverage the same internal logic that pandas UDF executions use. 因此,它會與 pandas Udf (例如 PyArrow、支援的 SQL 類型和設定)共用相同的特性。Therefore, it shares the same characteristics with pandas UDFs such as PyArrow, supported SQL types, and the configurations.

如需詳細資訊,請參閱 Apache Spark 3.0 的即將發行版本中的新 Pandas udf 和 Python 類型提示For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

群組對應Grouped map

您可以透過來轉換您 groupBy().applyInPandas() 的分組資料,以實行「分割套用合併」模式。You transform your grouped data via groupBy().applyInPandas() to implement the “split-apply-combine” pattern. 分割-套用-合併包含三個步驟:Split-apply-combine consists of three steps:

  • 使用將資料分割成群組 DataFrame.groupBySplit the data into groups by using DataFrame.groupBy.
  • 將函數套用至每個群組。Apply a function on each group. 函數的輸入和輸出都是 pandas.DataFrameThe input and output of the function are both pandas.DataFrame. 輸入資料包含每個群組的所有資料列和資料行。The input data contains all the rows and columns for each group.
  • 將結果合併成新的 DataFrameCombine the results into a new DataFrame.

若要使用 groupBy().applyInPandas() ,您必須定義下列各項:To use groupBy().applyInPandas(), you must define the following:

  • 定義每個群組之計算的 Python 函數A Python function that defines the computation for each group
  • StructType定義輸出架構的物件或字串DataFrameA StructType object or a string that defines the schema of the output DataFrame

如果指定為字串,則傳回的資料行標籤 pandas.DataFrame 必須符合所定義輸出架構中的功能變數名稱,或者,如果不是字串,則依位置比對欄位資料類型,例如整數索引。The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. 請參閱 pandas。 如何在建立時標記資料行的資料框架 pandas.DataFrameSee pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

在套用函數之前,群組的所有資料都會載入記憶體中。All data for a group is loaded into memory before the function is applied. 這可能會導致記憶體不足的例外狀況,尤其是當群組大小扭曲時。This can lead to out of memory exceptions, especially if the group sizes are skewed. MaxRecordsPerBatch的設定不會套用在群組上,您可以自行確定群組的資料是否符合可用的記憶體。The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.

下列範例顯示如何使用 groupby().apply() 來減去群組中每個值的平均值。The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

如需詳細使用方式,請參閱 Pyspark GroupedData. applyInPandasFor detailed usage, see pyspark.sql.GroupedData.applyInPandas.

對應Map

您可以使用 pandas 實例來執行對應作業,以便 DataFrame.mapInPandas() 將的 iterator 轉換 pandas.DataFrame 為表示目前 PySpark 資料框架的另一個反覆運算器 pandas.DataFrame ,並傳回結果做為 PySpark 資料框架。You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

基礎函數會接受並輸出的 iterator pandas.DataFrameThe underlying function takes and outputs an iterator of pandas.DataFrame. 它可以傳回任意長度的輸出,相較于某些 pandas Udf,例如數列到數列 pandas UDF。It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

下列範例顯示如何使用 mapInPandas()The following example shows how to use mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

如需詳細使用方式,請參閱 Pyspark 資料框架. applyInPandasFor detailed usage, please see pyspark.sql.DataFrame.applyInPandas.

Cogrouped 對應Cogrouped map

針對具有 pandas 實例的 cogrouped 對應作業,請使用 DataFrame.groupby().cogroup().applyInPandas() 兩個 PySpark DataFrame 來 cogrouped 一般索引鍵,然後將 Python 函式套用至每個 cogroup。For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas() for two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. 它是由下列步驟所組成:It consists of the following steps:

  • 隨機使用資料,以將共用金鑰的每個資料框架群組一起 cogrouped 在一起。Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
  • 將函數套用至每個 cogroup。Apply a function to each cogroup. 函數的輸入是兩個 pandas.DataFrame (,其中包含代表金鑰) 的選擇性元組。The input of the function is two pandas.DataFrame (with an optional tuple representing the key). 函數的輸出為 pandas.DataFrameThe output of the function is a pandas.DataFrame.
  • pandas.DataFrame 所有群組中的 s 合併為新的 PySpark DataFrameCombine the pandas.DataFrames from all groups into a new PySpark DataFrame.

若要使用 groupBy().cogroup().applyInPandas() ,您必須定義下列各項:To use groupBy().cogroup().applyInPandas(), you must define the following:

  • 定義每個 cogroup 之計算的 Python 函數。A Python function that defines the computation for each cogroup.
  • StructType物件或字串,定義輸出 PySpark 的架構 DataFrameA StructType object or a string that defines the schema of the output PySpark DataFrame.

如果指定為字串,則傳回的資料行標籤 pandas.DataFrame 必須符合所定義輸出架構中的功能變數名稱,或者,如果不是字串,則依位置比對欄位資料類型,例如整數索引。The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. 請參閱 pandas。 如何在建立時標記資料行的資料框架 pandas.DataFrameSee pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

在套用函數之前,cogroup 的所有資料都會載入記憶體中。All data for a cogroup is loaded into memory before the function is applied. 這可能會導致記憶體不足的例外狀況,尤其是當群組大小扭曲時。This can lead to out of memory exceptions, especially if the group sizes are skewed. MaxRecordsPerBatch的設定不會套用,而是由您負責確保 cogrouped 的資料符合可用的記憶體。The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.

下列範例示範如何使用 groupby().cogroup().applyInPandas() 來執行 asof join 兩個資料集之間的。The following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

如需詳細使用方式,請參閱 Pyspark PandasCogroupedOps. applyInPandasFor detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.