APIs de função do Pandas

As APIs da função pandas permitem-lhe aplicar diretamente uma função nativa python, que leva e produz casos de pandas, para um PySpark DataFrame. Semelhante às funções definidas pelo utilizador dos pandas,as APIs da função também utilizam a Seta Apache para transferir dados e pandas para trabalhar com os dados; no entanto, as dicas do tipo Python são opcionais nas APIs da função pandas.

Existem três tipos de APIs de função pandas:

  • Mapa agrupado
  • Mapa
  • Mapa coagundo

as APIs da função pandas aproveitam a mesma lógica interna que as execuções UDF pandas usam. Portanto, partilha as mesmas características com pandas UDFs como PyArrow, tipos de SQL suportados, e as configurações.

Para mais informações, consulte o blog post New Pandas UDFs e Python Type Hints no próximo lançamento de Apache Spark 3.0.

Mapa agrupado

Transforma os seus dados agrupados groupBy().applyInPandas() para implementar o padrão "split-apply-combine". Split-apply-combine consiste em três etapas:

  • Divida os dados em grupos utilizando DataFrame.groupBy .
  • Aplicar uma função em cada grupo. A entrada e a saída da função são ambas pandas.DataFrame . Os dados de entrada contêm todas as linhas e colunas de cada grupo.
  • Combine os resultados num novo DataFrame .

Para groupBy().applyInPandas() utilizar, deve definir o seguinte:

  • Uma função Python que define o cálculo para cada grupo
  • Um StructType objeto ou uma corda que define o esquema da saída DataFrame

As etiquetas de coluna do devolvido pandas.DataFrame devem corresponder aos nomes de campo no esquema de saída definido, se especificado como cordas, ou corresponder os tipos de dados de campo por posição, se não cordas, por exemplo, índices inteiros. Ver pandas. DataFrame para como rotular colunas ao construir um pandas.DataFrame .

Todos os dados de um grupo são carregados na memória antes da função ser aplicada. Isto pode levar a exceções fora da memória, especialmente se os tamanhos do grupo forem distorcidos. A configuração para maxRecordsPerBatch não é aplicada em grupos e cabe-lhe a si garantir que os dados agrupados se enquadram na memória disponível.

O exemplo a seguir mostra como usar groupby().apply() para subtrair a média de cada valor do grupo.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Para uma utilização detalhada, consulte pyspark.sql.GroupedData.applyInPandas.

Mapa

Executa operações de mapa com casos de pandas DataFrame.mapInPandas() para transformar um iterador pandas.DataFrame de outro iterador pandas.DataFrame que representa o atual PySpark DataFrame e devolve o resultado como um PySpark DataFrame.

A função subjacente toma e produz um iterador de pandas.DataFrame . Pode devolver a produção de comprimento arbitrário em contraste com alguns pandas UDFs, como séries a pandas série UDF.

O exemplo a seguir mostra como mapInPandas() utilizar:

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Para uma utilização detalhada, consulte pyspark.sql.DataFrame.mapInPandas.

Mapa coagundo

Para operações de mapas coagundo com casos de pandas, use DataFrame.groupby().cogroup().applyInPandas() dois PySpark DataFrame s para ser coagrupado por uma chave comum e, em seguida, uma função Python aplicada a cada cogrupo. Consiste nos seguintes passos:

  • Baralhar os dados de modo a que os grupos de cada DataFrame que partilham uma chave sejam coagrupados em conjunto.
  • Aplicar uma função a cada cogrupo. A entrada da função é de dois pandas.DataFrame (com um tuple opcional representando a chave). A saída da função é de pandas.DataFrame .
  • Combine os pandas.DataFrame s de todos os grupos num novo PySpark. DataFrame

Para groupBy().cogroup().applyInPandas() utilizar, deve definir o seguinte:

  • Uma função Python que define o cálculo para cada cogrupo.
  • Um StructType objeto ou uma corda que define o esquema da saída PySpark DataFrame .

As etiquetas de coluna do devolvido pandas.DataFrame devem corresponder aos nomes de campo no esquema de saída definido, se especificado como cordas, ou corresponder os tipos de dados de campo por posição, se não cordas, por exemplo, índices inteiros. Ver pandas. DataFrame para como rotular colunas ao construir um pandas.DataFrame .

Todos os dados de um cogrupo são carregados na memória antes da função ser aplicada. Isto pode levar a exceções fora da memória, especialmente se os tamanhos do grupo forem distorcidos. A configuração para maxRecordsPerBatch não é aplicada e cabe-lhe a si garantir que os dados coagrupados se encaixam na memória disponível.

O exemplo a seguir mostra como usar groupby().cogroup().applyInPandas() para executar um conjunto de asof join dados entre dois conjuntos de dados.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Para uma utilização detalhada, consulte pyspark.sql.PandasCogroupedOps.applyInPandas.