Pandas-Funktions-APIspandas function APIs

Pandas-Funktionen-APIs ermöglichen es Ihnen, eine native python-Funktion, die Pandas-Instanzen annimmt, auf einen pyspark-dataframe anzuwenden.pandas function APIs enable you to directly apply a Python native function, which takes and outputs pandas instances, to a PySpark DataFrame. Ähnlich wie bei benutzerdefinierten Pandas-Funktionenverwenden Funktions-APIs auch Apache Pfeil zum Übertragen von Daten und Pandas, um mit den Daten zu arbeiten. Allerdings sind python-Typhinweise in Pandas-Funktionen-APIs optional.Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.

Es gibt drei Arten von Pandas-Funktionen-APIs:There are three types of pandas function APIs:

  • Gruppierte KarteGrouped map
  • ZuordnungMap
  • Cogruppierte KarteCogrouped map

Pandas-Funktions-APIs nutzen die gleiche interne Logik, die Pandas UDF-Ausführungen verwendet.pandas function APIs leverage the same internal logic that pandas UDF executions use. Daher gibt es die gleichen Merkmale wie bei Pandas-UDFs, wie z. b. pypfeil, unterstützte SQL-Typen und die Konfigurationen.Therefore, it shares the same characteristics with pandas UDFs such as PyArrow, supported SQL types, and the configurations.

Weitere Informationen finden Sie im Blogbeitrag New Pandas UDFs und python Type Hints (in der bevorstehenden Version von Apache Spark 3,0).For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

Gruppierte KarteGrouped map

Sie transformieren die gruppierten Daten über groupBy().applyInPandas() , um das Muster "Split-Apply-combine" zu implementieren.You transform your grouped data via groupBy().applyInPandas() to implement the “split-apply-combine” pattern. Split-Apply-Combine besteht aus drei Schritten:Split-apply-combine consists of three steps:

  • Teilen Sie die Daten mithilfe von in Gruppen auf DataFrame.groupBy .Split the data into groups by using DataFrame.groupBy.
  • Anwenden einer Funktion auf jede Gruppe.Apply a function on each group. Die Eingabe und die Ausgabe der Funktion sind beide pandas.DataFrame .The input and output of the function are both pandas.DataFrame. Die Eingabedaten enthalten alle Zeilen und Spalten für jede Gruppe.The input data contains all the rows and columns for each group.
  • Kombinieren Sie die Ergebnisse in einem neuen DataFrame .Combine the results into a new DataFrame.

Zum Verwenden von groupBy().applyInPandas() müssen Sie Folgendes definieren:To use groupBy().applyInPandas(), you must define the following:

  • Eine python-Funktion, die die Berechnung für jede Gruppe definiert.A Python function that defines the computation for each group
  • Ein- StructType Objekt oder eine Zeichenfolge, die das Schema der Ausgabe definiert.DataFrameA StructType object or a string that defines the schema of the output DataFrame

Die Spalten Bezeichnungen der zurückgegebenen pandas.DataFrame müssen entweder den Feldnamen im definierten Ausgabe Schema entsprechen, wenn Sie als Zeichen folgen angegeben sind, oder die Feld Datentypen nach Position, wenn keine Zeichen folgen vorliegen, z. b. ganzzahlige Indizes.The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. Siehe Pandas. Dataframe zum bezeichnen von Spalten beim Erstellen einer pandas.DataFrame .See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

Alle Daten für eine Gruppe werden in den Arbeitsspeicher geladen, bevor die Funktion angewendet wird.All data for a group is loaded into memory before the function is applied. Dies kann zu Ausnahmen aufgrund von nicht genügend Arbeitsspeicher führen, insbesondere dann, wenn die Gruppengrößen verzerrt sind.This can lead to out of memory exceptions, especially if the group sizes are skewed. Die Konfiguration für maxrecordsperbatch wird nicht auf Gruppen angewendet, und Sie müssen sicherstellen, dass die gruppierten Daten in den verfügbaren Arbeitsspeicher passen.The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.

Im folgenden Beispiel wird gezeigt, wie verwendet wird groupby().apply() , um den Mittelwert von jedem Wert in der Gruppe zu subtrahieren.The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Ausführliche Informationen finden Sie unter pyspark. SQL. groupeddata. applyinpandas.For detailed usage, see pyspark.sql.GroupedData.applyInPandas.

ZuordnungMap

Sie führen Zuordnungs Vorgänge mit Pandas-Instanzen durch durch, DataFrame.mapInPandas() um einen Iterator von in pandas.DataFrame einen anderen Iterator von umzuwandeln pandas.DataFrame , der den aktuellen pyspark-dataframe darstellt und das Ergebnis als pyspark-dataframe zurückgibt.You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

Die zugrunde liegende Funktion nimmt einen Iterator von an und gibt ihn aus pandas.DataFrame .The underlying function takes and outputs an iterator of pandas.DataFrame. Es kann im Gegensatz zu einigen Pandas-UDFs (z. b. Reihen bis Reihe Pandas UDF) die Ausgabe beliebiger Länge zurückgeben.It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

Das folgende Beispiel veranschaulicht die Verwendung von mapInPandas():The following example shows how to use mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Ausführliche Informationen finden Sie unter pyspark. SQL. dataframe. applyinpandas.For detailed usage, please see pyspark.sql.DataFrame.applyInPandas.

Cogruppierte KarteCogrouped map

Verwenden Sie für cogruppierte Zuordnungs Vorgänge mit Pandas DataFrame.groupby().cogroup().applyInPandas() -Instanzen für zwei pyspark DataFrame -s, die nach einem gemeinsamen Schlüssel cogruppiert werden, und dann eine python-Funktion, die auf jede cogroup angewendet wird.For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas() for two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. Sie besteht aus den folgenden Schritten:It consists of the following steps:

  • Mischen Sie die Daten so, dass die Gruppen der einzelnen Datenrahmen, die einen Schlüssel gemeinsam verwenden, zusammen gruppiert werden.Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
  • Wenden Sie für jede cogroup eine Funktion an.Apply a function to each cogroup. Die Eingabe der Funktion ist zwei pandas.DataFrame (mit einem optionalen Tupel, das den Schlüssel darstellt).The input of the function is two pandas.DataFrame (with an optional tuple representing the key). Bei der Ausgabe der-Funktion handelt es sich um eine pandas.DataFrame .The output of the function is a pandas.DataFrame.
  • Kombinieren Sie die pandas.DataFrame s von allen Gruppen zu einem neuen pyspark DataFrame .Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

Zum Verwenden von groupBy().cogroup().applyInPandas() müssen Sie Folgendes definieren:To use groupBy().cogroup().applyInPandas(), you must define the following:

  • Eine python-Funktion, die die Berechnung für jede cogroup definiert.A Python function that defines the computation for each cogroup.
  • Ein- StructType Objekt oder eine Zeichenfolge, die das Schema der pyspark-Ausgabe definiert DataFrame .A StructType object or a string that defines the schema of the output PySpark DataFrame.

Die Spalten Bezeichnungen der zurückgegebenen pandas.DataFrame müssen entweder den Feldnamen im definierten Ausgabe Schema entsprechen, wenn Sie als Zeichen folgen angegeben sind, oder die Feld Datentypen nach Position, wenn keine Zeichen folgen vorliegen, z. b. ganzzahlige Indizes.The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. Siehe Pandas. Dataframe zum bezeichnen von Spalten beim Erstellen einer pandas.DataFrame .See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

Alle Daten für eine cogroup werden in den Arbeitsspeicher geladen, bevor die Funktion angewendet wird.All data for a cogroup is loaded into memory before the function is applied. Dies kann zu Ausnahmen aufgrund von nicht genügend Arbeitsspeicher führen, insbesondere dann, wenn die Gruppengrößen verzerrt sind.This can lead to out of memory exceptions, especially if the group sizes are skewed. Die Konfiguration für " maxrecordsperbatch " wird nicht angewendet, und Sie müssen sicherstellen, dass die cogruppierten Daten in den verfügbaren Arbeitsspeicher passen.The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.

Im folgenden Beispiel wird gezeigt, wie verwendet wird, groupby().cogroup().applyInPandas() um einen asof join zwischen zwei Datasets auszuführen.The following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Ausführliche Informationen finden Sie unter pyspark. SQL. pandascogroupedops. applyinpandas.For detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.