APIs da função pandas

As APIs da função pandas permitem-lhe aplicar diretamente uma função nativa do Python que utiliza e produz instâncias do pandas num DataFrame do PySpark. À semelhança das funções definidas pelo utilizador do pandas, as APIs de função também utilizam o Apache Arrow para transferir dados e pandas para trabalhar com os dados; no entanto, as sugestões de tipo Python são opcionais nas APIs da função pandas.

Existem três tipos de APIs de função pandas:

  • Mapa agrupado
  • Mapa
  • Mapa cogrupo

as APIs da função pandas tiram partido da mesma lógica interna que a execução do Pandas UDF utiliza. Partilham características como o PyArrow, os tipos de SQL suportados e as configurações.

Para obter mais informações, consulte a publicação de blogue New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0 (New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0).

Mapa agrupado

Pode transformar os seus dados agrupados com groupBy().applyInPandas() o para implementar o padrão "split-apply-combine". Split-apply-combine consiste em três passos:

  • Divida os dados em grupos com DataFrame.groupBy.
  • Aplicar uma função em cada grupo. A entrada e o resultado da função são ambos pandas.DataFrame. Os dados de entrada contêm todas as linhas e colunas de cada grupo.
  • Combine os resultados num novo DataFrame.

Para utilizar groupBy().applyInPandas()o , tem de definir o seguinte:

  • Uma função Python que define a computação para cada grupo
  • Um StructType objeto ou uma cadeia que define o esquema da saída DataFrame

As etiquetas de coluna dos devolvidos pandas.DataFrame têm de corresponder aos nomes dos campos no esquema de saída definido, se especificado como cadeias de carateres, ou corresponder aos tipos de dados do campo por posição, caso não sejam cadeias de carateres, por exemplo, índices inteiros. Veja pandas. DataFrame para saber como etiquetar colunas ao construir um pandas.DataFrame.

Todos os dados de um grupo são carregados para a memória antes de a função ser aplicada. Isto pode levar a exceções de memória esgotada, especialmente se os tamanhos do grupo estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada em grupos e cabe-lhe a si garantir que os dados agrupados se enquadram na memória disponível.

O exemplo seguinte mostra como utilizar groupby().apply() para subtrair a média de cada valor no grupo.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Para obter uma utilização detalhada, consulte pyspark.sql.GroupedData.applyInPandas.

Mapa

Efetua operações de pandas.DataFrame mapa com instâncias do pandas para DataFrame.mapInPandas() transformar um iterador de pandas.DataFrame noutro iterador que representa o DataFrame do PySpark atual e devolve o resultado como um DataFrame do PySpark.

A função subjacente utiliza e produz um iterador de pandas.DataFrame. Pode devolver a saída de comprimento arbitrário em contraste com alguns UDFs do pandas, como Série a Série.

O exemplo seguinte mostra como utilizar mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Para obter uma utilização detalhada, consulte pyspark.sql.DataFrame.mapInPandas.

Mapa cogrupo

Para operações de mapa cogrupo com instâncias do pandas, utilize DataFrame.groupby().cogroup().applyInPandas() para coagrupar dois PySparks DataFramepor uma chave comum e, em seguida, aplicar uma função Python a cada cogrupo, conforme mostrado:

  • Baralhar os dados de modo a que os grupos de cada DataFrame que partilham uma chave sejam agrupados em conjunto.
  • Aplicar uma função a cada cogrupo. A entrada da função é duas pandas.DataFrame (com uma cadeia de identificação opcional que representa a chave). O resultado da função é um pandas.DataFrame.
  • Combine os pandas.DataFrames de todos os grupos num novo PySpark DataFrame.

Para utilizar groupBy().cogroup().applyInPandas()o , tem de definir o seguinte:

  • Uma função Python que define a computação para cada cogrupo.
  • Um StructType objeto ou uma cadeia que define o esquema do PySpark DataFramede saída .

As etiquetas de coluna dos devolvidos pandas.DataFrame têm de corresponder aos nomes dos campos no esquema de saída definido, se especificado como cadeias de carateres, ou corresponder aos tipos de dados do campo por posição, caso não sejam cadeias de carateres, por exemplo, índices inteiros. Veja pandas. DataFrame para saber como etiquetar colunas ao construir um pandas.DataFrame.

Todos os dados de um cogrupo são carregados para a memória antes de a função ser aplicada. Isto pode levar a exceções de memória esgotada, especialmente se os tamanhos do grupo estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada e cabe-lhe a si certificar-se de que os dados agrupados se enquadram na memória disponível.

O exemplo seguinte mostra como utilizar groupby().cogroup().applyInPandas() para executar um asof join entre dois conjuntos de dados.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Para obter uma utilização detalhada, consulte pyspark.sql.PandasCogroupedOps.applyInPandas.