pandas 函数 APIpandas function APIs

使用 pandas 函数 Api,可以直接将使用 pandas 实例的 Python 本机函数应用到 PySpark 数据帧。pandas function APIs enable you to directly apply a Python native function, which takes and outputs pandas instances, to a PySpark DataFrame. pandas 用户定义的函数类似,函数 api 还使用Apache 箭头传输数据和 pandas 来处理数据;但是,Python 类型提示在 pandas function Api 中是可选的。Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.

有三种类型的 pandas 函数 Api:There are three types of pandas function APIs:

  • 分组映射Grouped map
  • 映射Map
  • Cogrouped 映射Cogrouped map

pandas 函数 Api 利用 pandas UDF 执行使用的相同内部逻辑。pandas function APIs leverage the same internal logic that pandas UDF executions use. 因此,它与 pandas Udf (如 PyArrow、支持的 SQL 类型和配置)共享相同的特征。Therefore, it shares the same characteristics with pandas UDFs such as PyArrow, supported SQL types, and the configurations.

有关详细信息,请参阅博客文章新 Pandas udf 和 Python 类型提示,请参阅即将发布的 Apache Spark 3.0For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

分组映射Grouped map

您可以通过转换已分组的数据 groupBy().applyInPandas() 来实现 "拆分应用合并" 模式。You transform your grouped data via groupBy().applyInPandas() to implement the “split-apply-combine” pattern. 拆分-应用-合并包括三个步骤:Split-apply-combine consists of three steps:

  • 使用将数据拆分成组 DataFrame.groupBySplit the data into groups by using DataFrame.groupBy.
  • 对每个组应用函数。Apply a function on each group. 函数的输入和输出都是 pandas.DataFrameThe input and output of the function are both pandas.DataFrame. 输入数据包含每个组的所有行和列。The input data contains all the rows and columns for each group.
  • 将结果合并到新的中 DataFrameCombine the results into a new DataFrame.

若要使用 groupBy().applyInPandas() ,必须定义以下内容:To use groupBy().applyInPandas(), you must define the following:

  • 为每个组定义计算的 Python 函数A Python function that defines the computation for each group
  • 一个 StructType 对象或定义输出架构的字符串。DataFrameA StructType object or a string that defines the schema of the output DataFrame

pandas.DataFrame如果指定为字符串,则返回的列标签必须与已定义输出架构中的字段名称匹配; 如果不是字符串,则必须与字段数据类型匹配(例如,整数索引)。The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. 请参阅pandas。数据帧如何在构造时标记列 pandas.DataFrameSee pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

在应用函数之前,组的所有数据都将加载到内存中。All data for a group is loaded into memory before the function is applied. 这可能导致内存不足异常,尤其是在组大小不对称的情况下。This can lead to out of memory exceptions, especially if the group sizes are skewed. MaxRecordsPerBatch的配置不应用于组,由您负责确保分组的数据适合可用内存。The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.

下面的示例演示如何使用 groupby().apply() 从组中的每个值减去平均值。The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

有关详细用法,请参阅pyspark. GroupedData. applyInPandasFor detailed usage, see pyspark.sql.GroupedData.applyInPandas.

映射Map

您可以使用 pandas 实例执行映射操作,以便 DataFrame.mapInPandas() 将迭代器转换 pandas.DataFramepandas.DataFrame 表示当前 PySpark 数据帧的的另一个迭代器,并将结果作为 PySpark 数据帧返回。You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

基础函数获取并输出的迭代器 pandas.DataFrameThe underlying function takes and outputs an iterator of pandas.DataFrame. 它可以返回任意长度的输出,而不是与某些 pandas Udf (如序列到 Series pandas UDF)的输出。It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

下面的示例演示如何使用 mapInPandas()The following example shows how to use mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

有关详细用法,请参阅pyspark. 数据帧. applyInPandasFor detailed usage, please see pyspark.sql.DataFrame.applyInPandas.

Cogrouped 映射Cogrouped map

对于使用 pandas 实例的 cogrouped 映射操作,使用 DataFrame.groupby().cogroup().applyInPandas() DataFrame 一个公共密钥 cogrouped 两个 PySpark,然后将 Python 函数应用于每个 cogroup。For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas() for two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. 它包含以下步骤:It consists of the following steps:

  • 无序播放数据,以便将共享某个密钥的每个数据帧的组 cogrouped 在一起。Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
  • 将函数应用于每个 cogroup。Apply a function to each cogroup. 函数的输入为 2 pandas.DataFrame (具有表示密钥的可选元组)。The input of the function is two pandas.DataFrame (with an optional tuple representing the key). 函数的输出为 pandas.DataFrameThe output of the function is a pandas.DataFrame.
  • pandas.DataFrame 所有组中的合并到新 PySpark 中 DataFrameCombine the pandas.DataFrames from all groups into a new PySpark DataFrame.

若要使用 groupBy().cogroup().applyInPandas() ,必须定义以下内容:To use groupBy().cogroup().applyInPandas(), you must define the following:

  • 为每个 cogroup 定义计算的 Python 函数。A Python function that defines the computation for each cogroup.
  • 一个 StructType 对象或定义输出 PySpark 的架构的字符串 DataFrameA StructType object or a string that defines the schema of the output PySpark DataFrame.

pandas.DataFrame如果指定为字符串,则返回的列标签必须与已定义输出架构中的字段名称匹配; 如果不是字符串,则必须与字段数据类型匹配(例如,整数索引)。The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. 请参阅pandas。数据帧如何在构造时标记列 pandas.DataFrameSee pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

在应用函数之前,cogroup 的所有数据都将加载到内存中。All data for a cogroup is loaded into memory before the function is applied. 这可能导致内存不足异常,尤其是在组大小不对称的情况下。This can lead to out of memory exceptions, especially if the group sizes are skewed. 不会应用maxRecordsPerBatch的配置,而是确保 cogrouped 的数据适合可用内存。The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.

下面的示例演示如何使用在 groupby().cogroup().applyInPandas() asof join 两个数据集之间执行。The following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

有关详细用法,请参阅pyspark. PandasCogroupedOps. applyInPandasFor detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.