사용자 정의 스칼라 함수 - Python

이 문서에는 Python UDF(사용자 정의 함수) 예제가 포함되어 있습니다. UDF를 등록하는 방법, UDF를 호출하는 방법을 보여 주고 Spark SQL에서 하위 식의 평가 순서에 대한 주의 사항을 제공합니다.

Databricks Runtime 14.0 이상에서는 Python UDF(사용자 정의 테이블 함수)를 사용하여 스칼라 값 대신 전체 관계를 반환하는 함수를 등록할 수 있습니다. Python 사용자 정의 테이블 함수란?을 참조하세요.

참고 항목

Databricks Runtime 13.1 이하에서는 공유 액세스 모드를 사용하는 클러스터의 Unity 카탈로그에서 Python UDF 및 Pandas UDF가 지원되지 않습니다. 스칼라 Python UDF 및 스칼라 Pandas UDF는 모든 액세스 모드에 대해 Databricks Runtime 13.2 이상에서 지원됩니다.

Databricks Runtime 13.2 이상에서는 SQL 구문을 사용하여 Unity 카탈로그에 스칼라 Python UDF를 등록할 수 있습니다. Unity 카탈로그의 UDF(사용자 정의 함수)를 참조 하세요.

UDF로 함수 등록

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

필요에 따라 UDF의 반환 형식을 설정할 수 있습니다. 기본 반환 형식은 StringType입니다.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Spark SQL에서 UDF 호출

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

DataFrames에서 UDF 사용

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

또는 주석 구문을 사용하여 동일한 UDF를 선언할 수 있습니다.

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

평가 순서 및 Null 검사

Spark SQL(SQL과 DataFrame 및 데이터 세트 API 포함)은 하위 식의 평가 순서를 보장하지 않습니다. 특히 연산자 또는 함수 입력이 반드시 왼쪽에서 오른쪽 또는 다른 고정 순서로 평가되는 것은 아닙니다. 예를 들어 논리적 ANDOR 식에는 왼쪽에서 오른쪽 “단락” 의미 체계가 없습니다.

따라서 쿼리 최적화 및 계획 중에 식과 절의 순서가 변경될 수 있으므로 부울 식의 부작용 또는 평가 순서와 WHEREHAVING 절의 순서에 의존하는 것은 위험합니다. 특히 UDF가 SQL의 단락 의미 체계를 null 검사에 사용하는 경우 UDF를 호출하기 전에 Null 검사가 수행된다는 보장이 없습니다. 예를 들면 다음과 같습니다.

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

여기서 WHERE 절은 Null을 필터링한 후 strlen UDF가 호출되도록 보장하지 않습니다.

적절한 Null 검사를 수행하려면 다음 중 하나를 수행하는 것이 좋습니다.

  • UDF 자체가 Null을 인식하도록 설정하고 UDF 내에서 Null 검사를 수행합니다.
  • IF 또는 CASE WHEN 식을 사용하여 Null 검사를 수행하고 조건부 분기에서 UDF를 호출합니다.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok