API-интерфейсы функций Pandas

API-интерфейсы функций pandas позволяют напрямую применять собственную функцию Python, которая принимает и выводит экземпляры Pandas в кадр данных PySpark. Как и в случае с пользовательскими функциями Pandas, API-интерфейсы функций также используют Apache Arrow для передачи данных и pandas для работы с данными; Однако указания типов Python являются необязательными в API-интерфейсах функций Pandas.

Существует три типа API-интерфейсов функций Pandas:

  • Сгруппированная карта
  • Карта
  • Согруппированная карта

API-интерфейсы функций pandas используют ту же внутреннюю логику, что и при выполнении определяемой пользователем функции Pandas. Они имеют общие характеристики, такие как PyArrow, поддерживаемые типы SQL и конфигурации.

Дополнительные сведения см. в записи блога New Pandas UDFs and Python Type Hints в предстоящем выпуске Apache Spark 3.0.

Сгруппированная карта

Вы преобразуете сгруппированные данные с помощью groupBy().applyInPandas() для реализации шаблона "разделение-применение-объединение". Разделение, применение и объединение состоит из трех этапов:

  • Разделите данные на группы с помощью DataFrame.groupBy.
  • Примените функцию к каждой группе. Входные и выходные данные функции являются pandas.DataFrame. Входные данные содержат все строки и столбцы для каждой группы.
  • Объедините результаты в новый DataFrame.

Чтобы использовать groupBy().applyInPandas(), необходимо определить следующее:

  • Функция Python, которая определяет вычисления для каждой группы.
  • Объект StructType или строка, определяющие схему выходных данных. DataFrame

Метки столбцов возвращаемого pandas.DataFrame объекта должны соответствовать именам полей в определенной выходной схеме, если они указаны в виде строк, или соответствовать типам данных полей по позиции, если не строки, например целочисленные индексы. См. pandas. Кадр данных о том, как помечать столбцы при создании pandas.DataFrame.

Все данные для группы загружаются в память перед применением функции. Это может привести к исключению нехватки памяти, особенно в случае неравномерного распределения размеров групп. Конфигурация maxRecordsPerBatch не применяется к группам, и вы можете убедиться, что сгруппированные данные помещаются в доступную память.

В следующем примере показано, как использовать groupby().apply() для вычитания среднего значения из каждого значения в группе.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Подробные сведения об использовании см. в разделе pyspark.sql.GroupedData.applyInPandas.

Карта

Операции сопоставления с экземплярами Pandas выполняются для DataFrame.mapInPandas() преобразования итератора pandas.DataFrame в другой итератор pandas.DataFrame , представляющий текущий кадр данных PySpark, и возвращает результат в виде кадра данных PySpark.

Базовая функция принимает и выводит итератор pandas.DataFrame. Он может возвращать выходные данные произвольной длины в отличие от некоторых определяемых пользователем функций Pandas, таких как Series, в series.

В следующем примере показано, как использовать mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Подробные сведения об использовании см. в разделе pyspark.sql.DataFrame.mapInPandas.

Согруппированная карта

Для совместно сгруппированных операций сопоставления с экземплярами Pandas используйте DataFrame.groupby().cogroup().applyInPandas() для совместной группы два pySpark DataFrameпо общему ключу, а затем примените функцию Python к каждой совместной группе, как показано ниже:

  • Перемешивайте данные таким образом, чтобы группы каждого кадра данных, которые совместно используют ключ, были совместно группированы.
  • Примените функцию к каждой совместной группе. Входные данные функции — два pandas.DataFrame (с необязательным кортежем, представляющим ключ). Выходные данные функции — pandas.DataFrame.
  • Объедините элементы pandas.DataFrameиз всех групп в новый PySpark DataFrame.

Чтобы использовать groupBy().cogroup().applyInPandas(), необходимо определить следующее:

  • Функция Python, которая определяет вычисления для каждой совместной группы.
  • Объект StructType или строка, определяющая схему выходных данных PySpark DataFrame.

Метки столбцов возвращаемого pandas.DataFrame объекта должны соответствовать именам полей в определенной выходной схеме, если они указаны в виде строк, или соответствовать типам данных полей по позиции, если не строки, например целочисленные индексы. См. pandas. Кадр данных о том, как помечать столбцы при создании pandas.DataFrame.

Все данные для совместной группы загружаются в память перед применением функции. Это может привести к исключению нехватки памяти, особенно в случае неравномерного распределения размеров групп. Конфигурация для maxRecordsPerBatch не применяется, и вы можете убедиться, что совместно сгруппированные данные помещаются в доступную память.

В следующем примере показано, как использовать groupby().cogroup().applyInPandas() для выполнения между asof join двумя наборами данных.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Подробные сведения об использовании см. в разделе pyspark.sql.PandasCogroupedOps.applyInPandas.