API de fonctions pandaspandas function APIs

les API de fonction pandas vous permettent d’appliquer directement une fonction native Python, qui prend et génère des instances pandas, vers un tableau PySpark.pandas function APIs enable you to directly apply a Python native function, which takes and outputs pandas instances, to a PySpark DataFrame. À l’instar des fonctions définies par l’utilisateur pandas, lesAPI de fonction utilisent également la flèche Apache pour transférer des données et pandas pour travailler avec les données. Toutefois, les indicateurs de type Python sont facultatifs dans les API de fonction pandas.Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.

Il existe trois types d’API de fonction pandas :There are three types of pandas function APIs:

  • Carte groupéeGrouped map
  • CarteMap
  • Carte cogroupéeCogrouped map

les API de fonction pandas tirent parti de la logique interne que les exécutions de pandas UDF utilisent.pandas function APIs leverage the same internal logic that pandas UDF executions use. Par conséquent, il partage les mêmes caractéristiques avec les fonctions définies par l’utilisateur pandas, telles que PyArrow, les types SQL pris en charge et les configurations.Therefore, it shares the same characteristics with pandas UDFs such as PyArrow, supported SQL types, and the configurations.

Pour plus d’informations, consultez le billet de blog New pandas UDF and python type Hints dans la prochaine version de Apache Spark 3,0.For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

Carte groupéeGrouped map

Vous transformez vos données groupées par groupBy().applyInPandas() le biais de pour implémenter le modèle « fractionner-appliquer-combiner ».You transform your grouped data via groupBy().applyInPandas() to implement the “split-apply-combine” pattern. Split-Apply-combine se compose de trois étapes :Split-apply-combine consists of three steps:

  • Fractionner les données en groupes à l’aide de DataFrame.groupBy .Split the data into groups by using DataFrame.groupBy.
  • Appliquez une fonction sur chaque groupe.Apply a function on each group. L’entrée et la sortie de la fonction sont toutes les deux pandas.DataFrame .The input and output of the function are both pandas.DataFrame. Les données d’entrée contiennent toutes les lignes et les colonnes de chaque groupe.The input data contains all the rows and columns for each group.
  • Combiner les résultats dans un nouveau DataFrame .Combine the results into a new DataFrame.

Pour utiliser groupBy().applyInPandas() , vous devez définir les éléments suivants :To use groupBy().applyInPandas(), you must define the following:

  • Fonction Python qui définit le calcul pour chaque groupeA Python function that defines the computation for each group
  • StructTypeObjet ou chaîne qui définit le schéma de la sortieDataFrameA StructType object or a string that defines the schema of the output DataFrame

Les étiquettes de colonne du retourné pandas.DataFrame doivent correspondre aux noms de champs dans le schéma de sortie défini s’ils sont spécifiés en tant que chaînes, ou correspondre aux types de données de champ par position s’ils ne sont pas des chaînes, par exemple des indices d’entier.The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. Consultez pandas. Tableau pour savoir comment étiqueter des colonnes lors de la construction d’un pandas.DataFrame .See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

Toutes les données d’un groupe sont chargées en mémoire avant que la fonction ne soit appliquée.All data for a group is loaded into memory before the function is applied. Cela peut entraîner des exceptions de mémoire insuffisante, en particulier si les tailles de groupe sont inclinées.This can lead to out of memory exceptions, especially if the group sizes are skewed. La configuration pour maxRecordsPerBatch n’est pas appliquée sur les groupes et c’est à vous de vérifier que les données regroupées tiennent dans la mémoire disponible.The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.

L’exemple suivant montre comment utiliser groupby().apply() pour soustraire la moyenne de chaque valeur du groupe.The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Pour plus d’informations sur l’utilisation, consultez pyspark. Sql. GroupedData. applyInPandas.For detailed usage, see pyspark.sql.GroupedData.applyInPandas.

CarteMap

Vous effectuez des opérations de mappage avec des instances pandas par afin de DataFrame.mapInPandas() transformer un itérateur de pandas.DataFrame en un autre itérateur de pandas.DataFrame qui représente le tableau PySpark actuel et retourne le résultat sous la forme d’un tableau PySpark.You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

La fonction sous-jacente prend et génère un itérateur de pandas.DataFrame .The underlying function takes and outputs an iterator of pandas.DataFrame. Elle peut renvoyer la sortie d’une longueur arbitraire, contrairement à certaines fonctions définies par l’utilisateur, telles que les séries vers des pandas UDF.It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

L'exemple suivant montre comment utiliser mapInPandas() :The following example shows how to use mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Pour plus d’informations sur l’utilisation, consultez pyspark. Sql. tableau. applyInPandas.For detailed usage, please see pyspark.sql.DataFrame.applyInPandas.

Carte cogroupéeCogrouped map

Pour les opérations de mappage cogroupées avec des instances de pandas, utilisez DataFrame.groupby().cogroup().applyInPandas() pour que deux PySpark DataFrame s soient cogroupées par une clé commune, puis une fonction python appliquée à chaque cogroupe.For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas() for two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. Il comprend les étapes suivantes :It consists of the following steps:

  • Lecture aléatoire des données de telle sorte que les groupes de chaque tableau partageant une clé sont cogroupés ensemble.Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
  • Appliquez une fonction à chaque cogroupe.Apply a function to each cogroup. L’entrée de la fonction est deux pandas.DataFrame (avec un tuple facultatif représentant la clé).The input of the function is two pandas.DataFrame (with an optional tuple representing the key). La sortie de la fonction est pandas.DataFrame .The output of the function is a pandas.DataFrame.
  • Combiner les pandas.DataFrame s de tous les groupes dans un nouveau PySpark DataFrame .Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

Pour utiliser groupBy().cogroup().applyInPandas() , vous devez définir les éléments suivants :To use groupBy().cogroup().applyInPandas(), you must define the following:

  • Fonction Python qui définit le calcul pour chaque cogroupe.A Python function that defines the computation for each cogroup.
  • StructTypeObjet ou chaîne qui définit le schéma du PySpark de sortie DataFrame .A StructType object or a string that defines the schema of the output PySpark DataFrame.

Les étiquettes de colonne du retourné pandas.DataFrame doivent correspondre aux noms de champs dans le schéma de sortie défini s’ils sont spécifiés en tant que chaînes, ou correspondre aux types de données de champ par position s’ils ne sont pas des chaînes, par exemple des indices d’entier.The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. Consultez pandas. Tableau pour savoir comment étiqueter des colonnes lors de la construction d’un pandas.DataFrame .See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

Toutes les données d’un cogroupe sont chargées en mémoire avant que la fonction ne soit appliquée.All data for a cogroup is loaded into memory before the function is applied. Cela peut entraîner des exceptions de mémoire insuffisante, en particulier si les tailles de groupe sont inclinées.This can lead to out of memory exceptions, especially if the group sizes are skewed. La configuration pour maxRecordsPerBatch n’est pas appliquée et il vous revient de veiller à ce que les données cogroupées s’inscrivent dans la mémoire disponible.The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.

L’exemple suivant montre comment utiliser groupby().cogroup().applyInPandas() pour effectuer un asof join entre deux datasets.The following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Pour plus d’informations sur l’utilisation, consultez pyspark. Sql. PandasCogroupedOps. applyInPandas.For detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.