API de fonctions pandas

Les API de fonction Pandas permettent d’appliquer directement une fonction native Python, qui prend et génère des instances Pandas, sur un DataFrame PySpark. Tout comme les fonctions Pandas définies par l’utilisateur, les API de fonction utilisent Apache Arrow pour transférer des données et Pandas pour travailler avec les données. Toutefois, les indicateurs de type Python sont facultatifs dans les API de fonction Pandas.

Il existe trois types d’API de fonction Pandas :

  • Mappage groupé
  • Mappage
  • Mappage cogroupé

Les API de fonction Pandas tirent parti de même la logique interne que l’exécution de fonctions Pandas définies par l’utilisateur. Ils partagent des caractéristiques telles que PyArrow, les types SQL supportés, et les configurations.

Pour plus d’informations, consultez le billet de blog Nouvelles fonctions Pandas définies par l’utilisateur et indicateurs de type Python dans la prochaine version d’Apache Spark 3.0.

Mappage groupé

Vous transformez vos données groupées en utilisant groupBy().applyInPandas() pour implémenter le modèle « fractionner-appliquer-combiner ». Le processus se déroule en trois étapes :

  • Fractionner les données en groupes à l’aide de DataFrame.groupBy.
  • Appliquer une fonction sur chaque groupe. L’entrée et la sortie de la fonction sont toutes les deux des pandas.DataFrame. Les données d’entrée contiennent toutes les lignes et toutes les colonnes de chaque groupe.
  • Combiner les résultats dans un nouveau DataFrame.

Pour pouvoir utiliser groupBy().applyInPandas(), vous devez définir les éléments suivants :

  • Fonction Python qui définit le calcul pour chaque groupe
  • Objet ou chaîne StructType qui définit le schéma du DataFrame de sortie

Les étiquettes de colonnes du pandas.DataFrame retourné doivent correspondre au nom des champs dans le schéma de sortie défini si elles sont spécifiées sous forme de chaînes, ou au type de données des champs par position si elles sont d’un autre type, par exemple des indices entiers. Pour savoir comment étiqueter des colonnes lors de la construction d’un pandas.DataFrame, consultez pandas.DataFrame.

Toutes les données d’un groupe sont chargées en mémoire avant que la fonction ne soit appliquée. Cela peut entraîner des exceptions de mémoire insuffisante, en particulier si les groupes sont de taille asymétrique. La configuration de maxRecordsPerBatch n’est pas appliquée sur les groupes. Il vous appartient de vérifier que les données groupées tiennent dans la mémoire disponible.

L’exemple suivant montre comment utiliser groupby().apply() pour soustraire la moyenne de chaque valeur du groupe.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Pour plus d’informations sur l’utilisation, consultez pyspark.sql.GroupedData.applyInPandas.

Mappage

Les opérations de mappage sur des instances Pandas sont effectuées par DataFrame.mapInPandas() afin de transformer un itérateur de pandas.DataFrame en un autre itérateur de pandas.DataFrame qui représente le DataFrame PySpark actuel et retourne le résultat sous la forme d’un DataFrame PySpark.

La fonction sous-jacente prend et génère un itérateur de pandas.DataFrame. Elle peut retourner une sortie d’une longueur arbitraire, contrairement à certaines fonctions définies par l’utilisateur pandas comme la fonction de série à série.

L'exemple suivant montre comment utiliser mapInPandas() :

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Pour plus d’informations sur l’utilisation, consultez pyspark.sql.DataFrame.mapInPandas.

Mappage cogroupé

Pour les opérations de carte cogroupées avec des instances pandas, utilisezDataFrame.groupby().cogroup().applyInPandas() pour cogrouper deux PySpark DataFrames par une clé commune et appliquez ensuite une fonction Python à chaque cogroupe comme indiqué :

  • Brasser les données de sorte que les groupes de chaque DataFrame partageant une clé soient cogroupés.
  • Appliquer une fonction à chaque cogroupe. La fonction prend en entrée deux pandas.DataFrame (avec un tuple facultatif représentant la clé). La sortie de la fonction est pandas.DataFrame.
  • Combiner les pandas.DataFrame de tous les groupes dans un nouveau DataFrame PySpark.

Pour pouvoir utiliser groupBy().cogroup().applyInPandas(), vous devez définir les éléments suivants :

  • Fonction Python qui définit le calcul pour chaque cogroupe
  • Objet ou chaîne StructType qui définit le schéma du DataFrame PySpark de sortie

Les étiquettes de colonnes du pandas.DataFrame retourné doivent correspondre au nom des champs dans le schéma de sortie défini si elles sont spécifiées sous forme de chaînes, ou au type de données des champs par position si elles sont d’un autre type, par exemple des indices entiers. Pour savoir comment étiqueter des colonnes lors de la construction d’un pandas.DataFrame, consultez pandas.DataFrame.

Toutes les données d’un cogroupe sont chargées en mémoire avant que la fonction ne soit appliquée. Cela peut entraîner des exceptions de mémoire insuffisante, en particulier si les groupes sont de taille asymétrique. La configuration de maxRecordsPerBatch n’est pas appliquée. Il vous appartient de vérifier que les données cogroupées tiennent dans la mémoire disponible.

L’exemple suivant montre comment utiliser groupby().cogroup().applyInPandas() pour effectuer une opération asof join entre deux jeux de données.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Pour plus d’informations sur l’utilisation, consultez pyspark.sql.PandasCogroupedOps.applyInPandas.