Skalární funkce definované uživatelem – Python

Tento článek obsahuje příklady uživatelem definované funkce Pythonu (UDF). Ukazuje, jak zaregistrovat funkce definované uživatelem, jak vyvolat funkce definované uživatelem a poskytuje upozornění na pořadí vyhodnocení dílčích výrazů ve Spark SQL.

V Databricks Runtime 14.0 a novějších můžete pomocí uživatelem definovaných tabulkových funkcí Pythonu (UDTFs) zaregistrovat funkce, které místo skalárních hodnot vracejí celé relace. Podívejte se, co jsou uživatelem definované funkce tabulek v Pythonu?

Poznámka:

V Databricks Runtime 12.2 LTS a níže nejsou definované uživatelem Pythonu a UDF Pandas podporovány v katalogu Unity na výpočetních prostředcích, které používají režim sdíleného přístupu. Skalární uživatelem a skalární uživatelem definované uživatelem v Pythonu jsou podporované ve službě Databricks Runtime 13.3 LTS a vyšší pro všechny režimy přístupu.

Ve službě Databricks Runtime 13.3 LTS a novějších můžete pomocí syntaxe SQL zaregistrovat skalární uživatelem definované uživatelem Pythonu do katalogu Unity. Viz uživatelem definované funkce (UDF) v katalogu Unity.

Registrace funkce jako funkce definované uživatelem

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Volitelně můžete nastavit návratový typ definovaného uživatelem. Výchozí návratový typ je StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Volání funkce definovaná uživatelem ve Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Použití UDF s datovými rámci

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Alternativně můžete deklarovat stejnou funkci definovanou uživatelem pomocí syntaxe poznámek:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Kontrola pořadí vyhodnocení a hodnoty null

Spark SQL (včetně SQL a rozhraní DATAFrame and Dataset API) nezaručuje pořadí vyhodnocení dílčích výrazů. Zejména vstupy operátoru nebo funkce nejsou nutně vyhodnoceny zleva doprava nebo v jiném pevném pořadí. Například logické AND výrazy OR nemají sémantiku "zkratování" zleva doprava.

Proto je nebezpečné spoléhat se na vedlejší účinky nebo pořadí vyhodnocení logických výrazů a pořadí WHEREHAVING a pořadí klauzulí, protože tyto výrazy a klauzule je možné změnit pořadí během optimalizace a plánování dotazů. Konkrétně platí, že pokud UDF spoléhá na sémantiku zkratování v SQL pro kontrolu hodnoty null, neexistuje žádná záruka, že se kontrola hodnoty null provede před vyvoláním funkce definované uživatelem. Příklad:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Tato WHERE klauzule nezaručuje, že se funkce definovaná uživatelem strlen bude volat po vyfiltrování hodnot null.

Pokud chcete provést správnou kontrolu hodnoty null, doporučujeme provést některou z následujících akcí:

  • Nastavení samotné funkce definovaná uživatelem s podporou hodnoty null a provedení kontroly hodnoty null uvnitř samotného uživatelem definovaného uživatelem
  • Použití IF nebo CASE WHEN výrazy k ověření hodnoty null a vyvolání funkce definovaná uživatelem v podmíněné větvi
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok