وظائف المعرفة من قبل المستخدم - بيثون

تحتوي هذه المقالة على أمثلة الدالة المعرفة من قبل المستخدم Python (UDF). وهو يبين كيفية تسجيل UDFs ، وكيفية استدعاء UDFs ، والمحاذير المتعلقة بترتيب تقييم التعبيرات الفرعية في Spark SQL.

تسجيل دالة ك UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

يمكنك اختياريا تعيين نوع إرجاع UDF الخاص بك. نوع الإرجاع الافتراضي هو StringType .

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

اتصل ب UDF في سبارك SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

استخدام UDF مع إطارات البيانات

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

بدلا من ذلك، يمكنك تعريف UDF نفسه باستخدام بناء الجملة تعليق توضيحي:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

أمر التقييم والتحقق من القيمة الخالية

لا تضمن spark SQL (بما في ذلك SQL و Api DataFrame و Dataset) ترتيب تقييم التعبيرات الفرعية. وعلى وجه الخصوص، لا يتم بالضرورة تقييم مدخلات عامل التشغيل أو الوظيفة من اليسار إلى اليمين أو بأي ترتيب ثابت آخر. على سبيل المثال، ANDOR لا تحتوي الدلالات المنطقية والتعبيرات على دلالات "الدوائر القصيرة" من اليسار إلى اليمين.

لذلك ، من الخطر الاعتماد على الآثار الجانبية أو ترتيب تقييم التعبيرات المنطقية ، وترتيب WHEREHAVING وبنود ، حيث يمكن إعادة ترتيب هذه التعبيرات والعبارات أثناء تحسين الاستعلام والتخطيط. على وجه التحديد، إذا كان UDF يعتمد على دلالات الدوائر القصيرة في SQL للتحقق من القيمة الخالية، فلا يوجد ضمان بأن الشيك الخالي سيحدث قبل استدعاء UDF. على سبيل المثال،

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

لا تضمن هذه WHERE الجملة strlen UDF ليتم استدعاؤها بعد تصفية القيم الخالية.

لتنفيذ تدقيق null الصحيح نوصي أن تقوم بأي مما يلي:

  • جعل UDF نفسه خالية علم والقيام التدقيق فارغة داخل UDF نفسها
  • استخدام IF أو تعبيرات للقيام الاختيار فارغة CASE WHEN واستدعاء UDF في فرع شرطي
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok