pandas 関数 API

pandas 関数 API を使用すると、pandas インスタンスを取得して出力する Python ネイティブ関数を PySpark DataFrame に直接適用できます。 pandas ユーザー定義関数と同様に、関数 API でも Apache Arrow を使用してデータを転送し、pandas を使用してデータを操作します。ただし、pandas 関数 API では、Python の型ヒントは省略可能です。

pandas 関数 API には 3 つの種類があります。

  • グループ化マップ
  • マップ
  • 複合グループ化マップ

pandas 関数 API は、pandas UDF 実行で使用されるのと同じ内部ロジックを利用します。 PyArrow、サポートされている SQL の種類、構成などの同じ特性を持っています。

詳細については、ブログ記事「Apache Spark 3.0 の今後のリリースにおける新しい Pandas UDF と Python の型ヒント」を参照してください。

グループ化マップ

グループ化されたデータを groupBy().applyInPandas() を使用して変換して、"分割、適用、結合" パターンを実装します。 "分割、適用、結合" は、3 つの手順で構成されます。

  • DataFrame.groupBy を使用してデータをグループに分割します。
  • 各グループに関数を適用します。 関数の入力と出力はどちらも pandas.DataFrame です。 入力データには、各グループのすべての行と列が含まれます。
  • 結果を新しい DataFrame に結合します。

groupBy().applyInPandas() を使用するには、次のものを定義する必要があります。

  • 各グループの計算を定義する Python 関数
  • 出力 DataFrame のスキーマを定義する StructType オブジェクトまたは文字列

返される pandas.DataFrame の列ラベルは、文字列として指定されている場合は、定義された出力スキーマ内のフィールド名と一致する必要があります。また、文字列でない場合は、位置に基づくフィールドのデータ型に一致する必要があります (たとえば、整数インデックス)。 pandas.DataFrame を作成するときに列にラベルを付ける方法については、「pandas.DataFrame」を参照してください。

関数が適用される前に、グループのすべてのデータがメモリに読み込まれます。 そのため、特にグループ サイズが偏っている場合にメモリ不足例外が発生する可能性があります。 maxRecordsPerBatch の構成はグループに適用されません。グループ化されたデータが使用可能なメモリに収まるようにするのはユーザーの責任です。

次の例は、groupby().apply() を使用してグループ内の各値から平均を減算する方法を示しています。

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

詳細な使用法については、「pyspark.sql.GroupedData.applyInPandas」を参照してください。

マップ

pandas.DataFrame の反復子を現在の PySpark DataFrame を表す pandas.DataFrame の別の反復子に変換し、その結果を PySpark DataFrame として返すために、DataFrame.mapInPandas() によって pandas インスタンスでマップ操作を実行します。

基になる関数は、pandas.DataFrame の反復子を取得して出力します。 "系列から系列" などの一部の pandas UDF とは対照的に、任意の長さの出力を返すことができます。

次の例は、mapInPandas() を使用する方法を示しています。

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

詳細な使用法については、「pyspark.sql.DataFrame.mapInPandas」を参照してください。

複合グループ化マップ

pandas インスタンスとの複合グループ化マップ操作の場合、DataFrame.groupby().cogroup().applyInPandas() を使用して、共通のキーによって 2 つの PySpark DataFrame を復号グループ化した後に、次に示すように各複合グループに Python 関数を適用します。

  • キーを共有する各 DataFrame のグループが一緒に複合グループ化されるように、データをシャッフルします。
  • 各複合グループに関数を適用します。 関数の入力は 2 つの pandas.DataFrame です (キーを表す省略可能なタプルを使用します)。 関数の出力は pandas.DataFrame です。
  • すべてのグループの pandas.DataFrame を新しい PySpark DataFrame に結合します。

groupBy().cogroup().applyInPandas() を使用するには、次のものを定義する必要があります。

  • 各複合グループの計算を定義する Python 関数。
  • 出力 PySpark DataFrame のスキーマを定義する StructType オブジェクトまたは文字列。

返される pandas.DataFrame の列ラベルは、文字列として指定されている場合は、定義された出力スキーマ内のフィールド名と一致する必要があります。また、文字列でない場合は、位置に基づくフィールドのデータ型に一致する必要があります (たとえば、整数インデックス)。 pandas.DataFrame を作成するときに列にラベルを付ける方法については、「pandas.DataFrame」を参照してください。

関数が適用される前に、複合グループのすべてのデータがメモリに読み込まれます。 そのため、特にグループ サイズが偏っている場合にメモリ不足例外が発生する可能性があります。 maxRecordsPerBatch の構成は適用されません。複合グループ化されたデータが使用可能なメモリに収まるようにするのはユーザーの責任です。

次の例は、groupby().cogroup().applyInPandas() を使用して 2 つのデータセット間で asof join を実行する方法を示しています。

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

詳細な使用法については、「pyspark.sql.PandasCogroupedOps.applyInPandas」を参照してください。