Aracılığıyla paylaş


pandas kullanıcı tanımlı işlevleri

Pandas kullanıcı tanımlı işlev (UDF) (vektörleştirilmiş UDF olarak da bilinir), verileri aktarmak için Apache Arrow ve verilerle çalışmak için pandas kullanan kullanıcı tanımlı bir işlevdir. pandas UDF'leri, python UDF'lerine kıyasla performansı 100 kata kadar artırabilen vektörleştirilmiş işlemlere izin verir.

Arka plan bilgileri için Apache Spark 3.0'ın Yaklaşan SürümündeKi Yeni Pandas UDF'leri ve Python Tür İpuçları blog gönderisine bakın.

Anahtar sözcüğünü pandas_udf dekoratör olarak kullanarak bir pandas UDF tanımlarsınız ve işlevi python türü ipucuyla sarmalarsınız. Bu makalede pandas UDF'lerinin farklı türleri açıklanır ve tür ipuçlarıyla pandas UDF'lerinin nasıl kullanılacağı gösterilir.

Seriden Seriye UDF

Skaler işlemleri vektörleştirmek için Series to Series pandas UDF kullanırsınız. Bunları ve withColumngibi select API'lerle kullanabilirsiniz.

Python işlevi bir pandas Serisini giriş olarak almalı ve aynı uzunlukta bir pandas Serisi döndürmelidir ve bunları Python türü ipuçlarında belirtmelisiniz. Spark, sütunları toplu işlere bölerek, her toplu işlemin işlevini verilerin bir alt kümesi olarak çağırarak ve ardından sonuçları birleştirerek pandas UDF çalıştırır.

Aşağıdaki örnekte, 2 sütunun çarpımını hesaplayan bir pandas UDF'nin nasıl oluşturulacağı gösterilmektedir.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Seriyi Yineleyiciden Seri UDF Yineleyicisine

Yineleyici UDF, aşağıdakiler dışında skaler pandas UDF ile aynıdır:

  • Python işlevi
    • Giriş olarak tek bir giriş toplu işlemi yerine toplu iş yineleyicisini alır.
    • Tek bir çıkış toplu işlemi yerine çıkış toplu işlemlerinin yineleyicisini döndürür.
  • Yineleyicideki çıkışın tamamının uzunluğu, girişin tamamının uzunluğuyla aynı olmalıdır.
  • Sarmalanan pandas UDF, giriş olarak tek bir Spark sütunu alır.

Python türü ipucunu ->Iterator[pandas.Series]olarak Iterator[pandas.Series] belirtmelisiniz.

Bu pandas UDF, UDF yürütmesi bazı durumların başlatılmasını gerektirdiğinde (örneğin, her giriş toplu işlemine çıkarım uygulamak için bir makine öğrenmesi modeli dosyası yükleme) yararlı olur.

Aşağıdaki örnekte yineleyici desteğiyle pandas UDF'nin nasıl oluşturulacağı gösterilmektedir.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Birden çok Seriyi Yineleyiciden Seri UDF Yineleyicisine

Seri UDF'nin Iterator'ı için birden çok Serinin Yineleyicisi, Serinin Yineleyicisi ile Seri UDF'nin Yineleyicisi arasında benzer özelliklere ve kısıtlamalara sahiptir. Belirtilen işlev toplu işlemlerin yineleyicisini alır ve bir toplu iş yineleyicisi çıkarır. UDF yürütmesi bazı durumların başlatılmasını gerektirdiğinde de yararlıdır.

Farklar şunlardır:

  • Temel alınan Python işlevi pandas Series demetinin yineleyicisini alır.
  • Sarmalanan pandas UDF, giriş olarak birden çok Spark sütunu alır.

Tür ipuçlarını ->Iterator[pandas.Series]olarak Iterator[Tuple[pandas.Series, ...]] belirtirsiniz.

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Seriden skaler UDF'ye

Skaler pandas UDF'lerine seriler Spark toplama işlevlerine benzer. A Series to scalar pandas UDF, bir veya daha fazla pandas Serisinden bir skaler değere bir toplama tanımlar ve burada her pandas Serisi bir Spark sütununu temsil eder. , , withColumngroupBy.aggve pyspark.sql.Window gibi selectAPI'lerle skaler pandas UDF'yi skaler pandas UDF için bir Seri kullanırsınız.

Tür ipucunu ->Anyolarak pandas.Series, ... ifade edebilirsiniz. Dönüş türü ilkel bir veri türü olmalı ve döndürülen skaler bir Python ilkel türü olabilir, örneğin int veya float veya numpy.float64gibi numpy.int64 bir NumPy veri türü olabilir. Any ideal olarak belirli bir skaler tür olmalıdır.

Bu UDF türü kısmi toplamayı desteklemez ve her grubun tüm verileri belleğe yüklenir.

Aşağıdaki örnekte, , groupByve window işlemleriyle selectortalamayı hesaplamak için bu UDF türünün nasıl kullanılacağı gösterilmektedir:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Ayrıntılı kullanım için bkz . pyspark.sql.functions.pandas_udf.

Kullanım

Ok toplu iş boyutunu ayarlama

Not

Bu yapılandırmanın paylaşılan erişim modu ve Databricks Runtime 13.3 LTS ile 14.2 arasında yapılandırılmış işlem üzerinde hiçbir etkisi yoktur.

Spark'taki veri bölümleri, JVM'de geçici olarak yüksek bellek kullanımına neden olabilecek Ok kaydı toplu işlemlerine dönüştürülür. Bellek dışı özel durumları önlemek için, yapılandırmayı her toplu iş için en fazla satır sayısını belirleyen bir tamsayıya ayarlayarak spark.sql.execution.arrow.maxRecordsPerBatch Ok kaydı toplu işlemlerinin boyutunu ayarlayabilirsiniz. Varsayılan değer, toplu iş başına 10.000 kayıttır. Sütun sayısı büyükse, değer buna göre ayarlanmalıdır. Bu sınır kullanıldığında, her veri bölümü işlenmek üzere 1 veya daha fazla kayıt toplu işlemine ayrılır.

Saat dilimi semantiği ile zaman damgası

Spark, zaman damgalarını dahili olarak UTC değerleri olarak depolar ve belirli bir saat dilimi olmadan getirilen zaman damgası verileri yerel saat olarak UTC'ye mikrosaniye çözünürlükle dönüştürülür.

Zaman damgası verileri Spark'ta dışarı aktarıldığında veya görüntülendiğinde, zaman damgası değerlerini yerelleştirmek için oturum saat dilimi kullanılır. Oturum saat dilimi yapılandırmayla spark.sql.session.timeZone ayarlanır ve varsayılan olarak JVM sistemi yerel saat dilimine ayarlanır. pandas, datetime64[ns]sütun başına isteğe bağlı saat dilimine sahip nanosaniye çözünürlüğüne sahip bir tür kullanırdatetime64.

Zaman damgası verileri Spark'tan pandas'a aktarıldığında, nanosaniyeye dönüştürülür ve her sütun Spark oturum saat dilimine dönüştürülür, ardından bu saat dilimine yerelleştirilir, bu da saat dilimini kaldırır ve değerleri yerel saat olarak görüntüler. Zaman damgası sütunları çağrılırken toPandas() veya pandas_udf bu durum oluşur.

Zaman damgası verileri pandas'tan Spark'a aktarıldığında UTC mikrosaniyesine dönüştürülür. Bu, pandas DataFrame ile çağrı createDataFrame yapılırken veya pandas UDF'den bir zaman damgası döndürülirken oluşur. Bu dönüştürmeler Spark'ın verileri beklenen biçimde olduğundan emin olmak için otomatik olarak yapılır, bu nedenle bu dönüştürmelerin hiçbirini kendiniz yapmanız gerekmez. Nanosaniye değerleri kesilir.

Standart bir UDF, zaman damgası verilerini Python datetime nesneleri olarak yükler ve bu da pandas zaman damgasından farklıdır. En iyi performansı elde etmek için pandas UDF'de zaman damgalarıyla çalışırken pandas zaman serisi işlevselliğini kullanmanızı öneririz. Ayrıntılar için bkz . Zaman Serisi / Tarih işlevselliği.

Örnek not defteri

Aşağıdaki not defteri, pandas UDF'leri ile başarabileceğiniz performans geliştirmelerini gösterir:

pandas UDF karşılaştırma not defteri

Not defterini alma