APIs da função pandas
As APIs da função pandas permitem-lhe aplicar diretamente uma função nativa do Python que utiliza e produz instâncias do pandas num DataFrame do PySpark. À semelhança das funções definidas pelo utilizador do pandas, as APIs de função também utilizam o Apache Arrow para transferir dados e pandas para trabalhar com os dados; no entanto, as sugestões de tipo Python são opcionais nas APIs da função pandas.
Existem três tipos de APIs de função pandas:
- Mapa agrupado
- Mapa
- Mapa cogrupo
as APIs da função pandas tiram partido da mesma lógica interna que a execução do Pandas UDF utiliza. Partilham características como o PyArrow, os tipos de SQL suportados e as configurações.
Para obter mais informações, consulte a publicação de blogue New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0 (New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0).
Mapa agrupado
Pode transformar os seus dados agrupados com groupBy().applyInPandas()
o para implementar o padrão "split-apply-combine". Split-apply-combine consiste em três passos:
- Divida os dados em grupos com
DataFrame.groupBy
. - Aplicar uma função em cada grupo. A entrada e o resultado da função são ambos
pandas.DataFrame
. Os dados de entrada contêm todas as linhas e colunas de cada grupo. - Combine os resultados num novo
DataFrame
.
Para utilizar groupBy().applyInPandas()
o , tem de definir o seguinte:
- Uma função Python que define a computação para cada grupo
- Um
StructType
objeto ou uma cadeia que define o esquema da saídaDataFrame
As etiquetas de coluna dos devolvidos pandas.DataFrame
têm de corresponder aos nomes dos campos no esquema de saída definido, se especificado como cadeias de carateres, ou corresponder aos tipos de dados do campo por posição, caso não sejam cadeias de carateres, por exemplo, índices inteiros. Veja pandas. DataFrame para saber como etiquetar colunas ao construir um pandas.DataFrame
.
Todos os dados de um grupo são carregados para a memória antes de a função ser aplicada. Isto pode levar a exceções de memória esgotada, especialmente se os tamanhos do grupo estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada em grupos e cabe-lhe a si garantir que os dados agrupados se enquadram na memória disponível.
O exemplo seguinte mostra como utilizar groupby().apply()
para subtrair a média de cada valor no grupo.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Para obter uma utilização detalhada, consulte pyspark.sql.GroupedData.applyInPandas.
Mapa
Efetua operações de pandas.DataFrame
mapa com instâncias do pandas para DataFrame.mapInPandas()
transformar um iterador de pandas.DataFrame
noutro iterador que representa o DataFrame do PySpark atual e devolve o resultado como um DataFrame do PySpark.
A função subjacente utiliza e produz um iterador de pandas.DataFrame
. Pode devolver a saída de comprimento arbitrário em contraste com alguns UDFs do pandas, como Série a Série.
O exemplo seguinte mostra como utilizar mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Para obter uma utilização detalhada, consulte pyspark.sql.DataFrame.mapInPandas.
Mapa cogrupo
Para operações de mapa cogrupo com instâncias do pandas, utilize DataFrame.groupby().cogroup().applyInPandas()
para coagrupar dois PySparks DataFrame
por uma chave comum e, em seguida, aplicar uma função Python a cada cogrupo, conforme mostrado:
- Baralhar os dados de modo a que os grupos de cada DataFrame que partilham uma chave sejam agrupados em conjunto.
- Aplicar uma função a cada cogrupo. A entrada da função é duas
pandas.DataFrame
(com uma cadeia de identificação opcional que representa a chave). O resultado da função é umpandas.DataFrame
. - Combine os
pandas.DataFrame
s de todos os grupos num novo PySparkDataFrame
.
Para utilizar groupBy().cogroup().applyInPandas()
o , tem de definir o seguinte:
- Uma função Python que define a computação para cada cogrupo.
- Um
StructType
objeto ou uma cadeia que define o esquema do PySparkDataFrame
de saída .
As etiquetas de coluna dos devolvidos pandas.DataFrame
têm de corresponder aos nomes dos campos no esquema de saída definido, se especificado como cadeias de carateres, ou corresponder aos tipos de dados do campo por posição, caso não sejam cadeias de carateres, por exemplo, índices inteiros. Veja pandas. DataFrame para saber como etiquetar colunas ao construir um pandas.DataFrame
.
Todos os dados de um cogrupo são carregados para a memória antes de a função ser aplicada. Isto pode levar a exceções de memória esgotada, especialmente se os tamanhos do grupo estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada e cabe-lhe a si certificar-se de que os dados agrupados se enquadram na memória disponível.
O exemplo seguinte mostra como utilizar groupby().cogroup().applyInPandas()
para executar um asof join
entre dois conjuntos de dados.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Para obter uma utilização detalhada, consulte pyspark.sql.PandasCogroupedOps.applyInPandas.