وظائف Pandas المعرفة من قبل المستخدم

وظيفة pandas المعرفة من قبل المستخدم (UDF) - والمعروفة أيضا باسم UDF المتجهة - هي دالة معرفة من قبل المستخدم تستخدم سهم Apache لنقل البيانات وpandas للعمل مع البيانات. تسمح Pandas UDFs بالعمليات المتجهة التي يمكن أن تزيد من الأداء حتى 100x مقارنة ب Python UDFs في الصف في كل مرة.

للحصول على معلومات أساسية، راجع منشور المدونة New Pandas UDFs وPython Type Hints في الإصدار القادم من Apache Spark 3.0.

يمكنك تعريف Pandas UDF باستخدام الكلمة الأساسية pandas_udf كمصمم والتفاف الدالة مع تلميح نوع Python. توضح هذه المقالة الأنواع المختلفة من Pandas UDFs وتوضح كيفية استخدام Pandas UDFs مع تلميحات النوع.

سلسلة إلى سلسلة UDF

يمكنك استخدام Series إلى Series pandas UDF لتحجيم العمليات العددية. يمكنك استخدامها مع واجهات برمجة التطبيقات مثل select و withColumn.

يجب أن تأخذ الدالة Python سلسلة pandas كإدخال وتعيد سلسلة pandas بنفس الطول، ويجب عليك تحديدها في تلميحات نوع Python. يقوم Spark بتشغيل Pandas UDF عن طريق تقسيم الأعمدة إلى دفعات، استدعاء الدالة لكل دفعة كمجموعة فرعية من البيانات، ثم تسلسل النتائج.

يوضح المثال التالي كيفية إنشاء Pandas UDF يحسب منتج عمودين.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

مكرر سلسلة إلى مكرر سلسلة UDF

UDF المكرر هو نفس Pandas UDF العددي باستثناء:

  • الدالة Python
    • يأخذ مكرر الدفعات بدلا من دفعة إدخال واحدة كإدخل.
    • إرجاع مكرر دفعات الإخراج بدلا من دفعة إخراج واحدة.
  • يجب أن يكون طول الإخراج بأكمله في المكرر هو نفس طول الإدخال بأكمله.
  • يأخذ Pandas UDF الملتف عمود Spark واحدا كمدخل.

يجب تحديد تلميح نوع Python ك Iterator[pandas.Series] ->Iterator[pandas.Series].

يعد Pandas UDF مفيدا عندما يتطلب تنفيذ UDF تهيئة بعض الحالات، على سبيل المثال، تحميل ملف نموذج التعلم الآلي لتطبيق الاستدلال على كل دفعة إدخال.

يوضح المثال التالي كيفية إنشاء Pandas UDF بدعم المكرر.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

مكرر سلسلة متعددة إلى مكرر سلسلة UDF

يتميز مكرر سلسلة متعددة إلى مكرر من سلسلة UDF بخصائص وقيود مماثلة لمكرر السلسلة إلى مكرر سلسلة UDF. تأخذ الدالة المحددة مكرر الدفعات وتخرج مكرر الدفعات. من المفيد أيضا عندما يتطلب تنفيذ UDF تهيئة بعض الحالات.

الاختلافات هي:

  • تأخذ دالة Python الأساسية تكرار مجموعة من سلسلة pandas.
  • تأخذ Pandas UDF الملتفة أعمدة Spark متعددة كمدخل.

يمكنك تحديد تلميحات النوع ك Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

سلسلة إلى UDF عددي

تشبه السلسلة إلى Pandas UDFs العددية وظائف Spark التجميعية. تعرف السلسلة إلى Pandas UDF العددي التجميع من سلسلة pandas واحدة أو أكثر إلى قيمة عددية، حيث تمثل كل سلسلة بانداز عمود Spark. يمكنك استخدام سلسلة لتحجيم Pandas UDF مع واجهات برمجة التطبيقات مثل selectو withColumngroupBy.aggو و pyspark.sql.Window.

يمكنك التعبير عن تلميح النوع ك pandas.Series, ... ->Any. يجب أن يكون نوع الإرجاع نوع بيانات بدائيا، ويمكن أن يكون العددي الذي تم إرجاعه إما نوع Python بدائي، على سبيل المثال، int أو float نوع بيانات NumPy مثل numpy.int64 أو numpy.float64. Any يجب أن يكون من الناحية المثالية نوع عددي محدد.

لا يدعم هذا النوع من UDF التجميع الجزئي ويتم تحميل كافة البيانات لكل مجموعة في الذاكرة.

يوضح المثال التالي كيفية استخدام هذا النوع من UDF لحساب الوسط مع selectgroupByالعمليات و وwindow:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

للحصول على استخدام مفصل، راجع pyspark.sql.functions.pandas_udf.

الاستخدام

تعيين حجم دفعة السهم

إشعار

لا يؤثر هذا التكوين على الحساب الذي تم تكوينه باستخدام وضع الوصول المشترك وDatabricks Runtime 13.3 LTS إلى 14.2.

يتم تحويل أقسام البيانات في Spark إلى دفعات سجل الأسهم، والتي يمكن أن تؤدي مؤقتا إلى استخدام ذاكرة عالية في JVM. لتجنب استثناءات نفاد الذاكرة المحتملة، يمكنك ضبط حجم دفعات سجل السهم عن طريق تعيين spark.sql.execution.arrow.maxRecordsPerBatch التكوين إلى عدد صحيح يحدد الحد الأقصى لعدد الصفوف لكل دفعة. القيمة الافتراضية هي 10000 سجل لكل دفعة. إذا كان عدد الأعمدة كبيرا، يجب تعديل القيمة وفقا لذلك. باستخدام هذا الحد، يتم تقسيم كل قسم بيانات إلى دفعة واحدة أو أكثر من دفعات السجلات للمعالجة.

الطابع الزمني مع دلالات المنطقة الزمنية

يخزن Spark داخليا الطوابع الزمنية كقيم UTC، ويتم تحويل بيانات الطابع الزمني التي تم إحضارها دون منطقة زمنية محددة كوقت محلي إلى UTC بدقة ميكرو ثانية.

عند تصدير بيانات الطابع الزمني أو عرضها في Spark، يتم استخدام المنطقة الزمنية لجلسة العمل لترجمة قيم الطابع الزمني. يتم تعيين المنطقة الزمنية spark.sql.session.timeZone للجلسة مع التكوين والإعدادات الافتراضية إلى المنطقة الزمنية المحلية لنظام JVM. تستخدم datetime64 pandas نوعا بدقة nanosecond، ، datetime64[ns]مع منطقة زمنية اختيارية على أساس كل عمود.

عند نقل بيانات الطابع الزمني من Spark إلى pandas، يتم تحويلها إلى nanoseconds ويتم تحويل كل عمود إلى المنطقة الزمنية لجلسة Spark ثم ترجمتها إلى تلك المنطقة الزمنية، والتي تزيل المنطقة الزمنية وتعرض القيم كوقت محلي. يحدث هذا عند استدعاء toPandas() أعمدة الطابع الزمني أو pandas_udf باستخدامها.

عند نقل بيانات الطابع الزمني من pandas إلى Spark، يتم تحويلها إلى ميكرو ثانية UTC. يحدث هذا عند الاتصال createDataFrame ب pandas DataFrame أو عند إرجاع طابع زمني من Pandas UDF. تتم هذه التحويلات تلقائيا للتأكد من أن Spark يحتوي على بيانات بالتنسيق المتوقع، لذلك ليس من الضروري إجراء أي من هذه التحويلات بنفسك. يتم اقتطاع أي قيم نانو ثانية.

يقوم UDF القياسي بتحميل بيانات الطابع الزمني ككائنات Python datetime، والتي تختلف عن الطابع الزمني ل pandas. للحصول على أفضل أداء، نوصي باستخدام وظائف السلاسل الزمنية ل pandas عند العمل مع الطوابع الزمنية في Pandas UDF. للحصول على التفاصيل، راجع وظيفة Time Series / Date.

مثال لدفتر الملاحظات

يوضح دفتر الملاحظات التالي تحسينات الأداء التي يمكنك تحقيقها باستخدام Pandas UDFs:

دفتر الملاحظات القياسي ل pandas UDFs

الحصول على دفتر الملاحظات