pandas 関数 APIpandas function APIs

パンダの関数 Api を使用すると、パンダのインスタンスを取得して出力する Python ネイティブ関数を PySpark データフレームに直接適用できます。pandas function APIs enable you to directly apply a Python native function, which takes and outputs pandas instances, to a PySpark DataFrame. パンダの ユーザー定義関数と同様に、関数 api では、 Apache 矢印 を使用してデータを転送し、データを操作することができます。ただし、パンダの関数 Api では Python の型ヒントは省略可能です。Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.

パンダの関数 Api には、次の3種類があります。There are three types of pandas function APIs:

  • グループ化マップGrouped map
  • マップMap
  • グループ化マップCogrouped map

パンダの関数 Api は、UDF の実行で使用されるのと同じ内部ロジックを利用しています。pandas function APIs leverage the same internal logic that pandas UDF executions use. そのため、PyArrow、サポートされる SQL の種類、構成など、パンダの Udf と同じ特性を共有します。Therefore, it shares the same characteristics with pandas UDFs such as PyArrow, supported SQL types, and the configurations.

詳細については、 Apache Spark 3.0 の今後のリリースでの新しいパンダの udf と Python の型ヒントに関するブログ記事を参照してください。For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

グループ化マップGrouped map

「」では、グループ化されたデータを変換して groupBy().applyInPandas() 、"分割-適用-結合" パターンを実装します。You transform your grouped data via groupBy().applyInPandas() to implement the “split-apply-combine” pattern. 分割-適用-結合は、次の3つの手順で構成されます。Split-apply-combine consists of three steps:

  • を使用して、データをグループに分割し DataFrame.groupBy ます。Split the data into groups by using DataFrame.groupBy.
  • 各グループに関数を適用します。Apply a function on each group. 関数の入力と出力は両方とも pandas.DataFrame です。The input and output of the function are both pandas.DataFrame. 入力データには、各グループのすべての行と列が含まれます。The input data contains all the rows and columns for each group.
  • 結果を新しいに結合し DataFrame ます。Combine the results into a new DataFrame.

を使用するには groupBy().applyInPandas() 、次のように定義する必要があります。To use groupBy().applyInPandas(), you must define the following:

  • 各グループの計算を定義する Python 関数A Python function that defines the computation for each group
  • StructType出力のスキーマを定義するオブジェクトまたは文字列。DataFrameA StructType object or a string that defines the schema of the output DataFrame

返されるの列ラベルは、 pandas.DataFrame 文字列として指定されている場合は、定義された出力スキーマ内のフィールド名と一致する必要があります。また、文字列でない場合は、位置によってフィールドのデータ型に一致する必要があります (たとえば、整数のインデックス)。The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. パンダをご覧ください。を構築するときに列にラベルを付ける方法については、データフレームを使用 pandas.DataFrame してください。See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

関数が適用される前に、グループのすべてのデータがメモリに読み込まれます。All data for a group is loaded into memory before the function is applied. これにより、特にグループサイズがずれている場合に、メモリ不足の例外が発生する可能性があります。This can lead to out of memory exceptions, especially if the group sizes are skewed. Maxレコード Perbatchの構成はグループに適用されないため、グループ化されたデータが使用可能なメモリに収まるようにすることができます。The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.

次の例では、を使用して、 groupby().apply() グループ内の各値から平均を減算する方法を示します。The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

使用法の詳細については、「 pyspark」を参照してください。For detailed usage, see pyspark.sql.GroupedData.applyInPandas.

マップMap

DataFrame.mapInPandas()の反復子を、 pandas.DataFrame 現在の PySpark データフレームを表すの別の反復子に変換 pandas.DataFrame し、その結果を PySpark データフレームとして返すために、パンダのインスタンスでマップ操作を実行します。You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

基になる関数は、の反復子を取得して出力し pandas.DataFrame ます。The underlying function takes and outputs an iterator of pandas.DataFrame. 系列とシリーズのパンダの UDF など、一部のパンダ Udf とは対照的に、任意の長さの出力を返すことができます。It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

mapInPandas() を使用する方法を次の例に示します。The following example shows how to use mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

使用法の詳細については、「 pyspark」を参照してください。For detailed usage, please see pyspark.sql.DataFrame.applyInPandas.

グループ化マップCogrouped map

パンダインスタンスでの cogrouped マップ操作の場合は、 DataFrame.groupby().cogroup().applyInPandas() 2 つの PySpark DataFrame s を共通キーでグループ化し、各 cogrouped に適用する Python 関数を使用します。For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas() for two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. 次の手順で構成されます。It consists of the following steps:

  • キーを共有する各データフレームのグループが一緒にグループ化されるようにデータをシャッフルします。Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
  • 各 cogroup に関数を適用します。Apply a function to each cogroup. 関数の入力は2です pandas.DataFrame (省略可能なタプルはキーを表します)。The input of the function is two pandas.DataFrame (with an optional tuple representing the key). 関数の出力は pandas.DataFrame です。The output of the function is a pandas.DataFrame.
  • すべての pandas.DataFrame グループの s を新しい PySpark に結合し DataFrame ます。Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

を使用するには groupBy().cogroup().applyInPandas() 、次のように定義する必要があります。To use groupBy().cogroup().applyInPandas(), you must define the following:

  • 各 cogroup の計算を定義する Python 関数。A Python function that defines the computation for each cogroup.
  • StructType出力 PySpark のスキーマを定義するオブジェクトまたは文字列 DataFrameA StructType object or a string that defines the schema of the output PySpark DataFrame.

返されるの列ラベルは、 pandas.DataFrame 文字列として指定されている場合は、定義された出力スキーマ内のフィールド名と一致する必要があります。また、文字列でない場合は、位置によってフィールドのデータ型に一致する必要があります (たとえば、整数のインデックス)。The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. パンダをご覧ください。を構築するときに列にラベルを付ける方法については、データフレームを使用 pandas.DataFrame してください。See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

Cogroup のすべてのデータは、関数が適用される前にメモリに読み込まれます。All data for a cogroup is loaded into memory before the function is applied. これにより、特にグループサイズがずれている場合に、メモリ不足の例外が発生する可能性があります。This can lead to out of memory exceptions, especially if the group sizes are skewed. Maxレコード Perbatchの構成は適用されないので、グループ化されたデータが使用可能なメモリに収まるようにすることができます。The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.

次の例は、を使用して groupby().cogroup().applyInPandas() asof join 2 つのデータセット間でを実行する方法を示しています。The following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

使用法の詳細については、「 pyspark」を参照してください。For detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.