Udostępnij za pośrednictwem


Funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDF)

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej w środowisku Databricks Runtime 14.3 LTS lub nowszym.

Funkcja tabeli zdefiniowana przez użytkownika (UDTF) umożliwia rejestrowanie funkcji, które zwracają tabele zamiast wartości skalarnych. W przeciwieństwie do funkcji skalarnych, które zwracają pojedynczą wartość wyniku z każdego wywołania, każdy element UDTF jest wywoływany w klauzuli instrukcji FROM SQL i zwraca całą tabelę jako dane wyjściowe.

Każde wywołanie UDTF może akceptować zero lub więcej argumentów. Te argumenty mogą być wyrażeniami skalarnych lub argumentami tabeli reprezentującymi całe tabele wejściowe.

Podstawowa składnia udTF

Platforma Apache Spark implementuje funkcje UDF języka Python jako klasy języka Python z obowiązkową eval metodą używaną yield do emitowania wierszy wyjściowych.

Aby użyć klasy jako funkcji UDTF, należy zaimportować funkcję PySpark udtf . Usługa Databricks zaleca używanie tej funkcji jako dekoratora i jawnego określania nazw pól i typów przy użyciu returnType opcji (chyba że klasa definiuje metodę analyze zgodnie z opisem w późniejszej sekcji).

Następujący program UDTF tworzy tabelę przy użyciu stałej listy dwóch argumentów liczb całkowitych:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Rejestrowanie funkcji UDTF

Funkcje zdefiniowane przez użytkownika są rejestrowane w środowisku lokalnym SparkSession i są izolowane na poziomie notesu lub zadania.

Nie można zarejestrować funkcji UDTFs jako obiektów w wykazie aparatu Unity, a funkcje UDF nie mogą być używane z magazynami SQL.

Możesz zarejestrować funkcję UDTF do bieżącej SparkSession funkcji do użycia w zapytaniach SQL za pomocą funkcji spark.udtf.register(). Podaj nazwę funkcji SQL i klasę UDTF języka Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Wywoływanie zarejestrowanego udTF

Po zarejestrowaniu można użyć funkcji UDTF w języku SQL przy użyciu %sql polecenia magic lub spark.sql() funkcji:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Korzystanie ze strzałki Apache

Jeśli funkcja UDTF odbiera niewielką ilość danych jako dane wejściowe, ale generuje dużą tabelę, usługa Databricks zaleca użycie narzędzia Apache Arrow. Można ją włączyć, określając useArrow parametr podczas deklarowania funkcji UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Listy argumentów zmiennych — *args i **kwargs

Możesz użyć języka Python *args lub **kwargs składni i zaimplementować logikę do obsługi nieokreślonej liczby wartości wejściowych.

Poniższy przykład zwraca ten sam wynik podczas jawnego sprawdzania długości danych wejściowych i typów argumentów:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Oto ten sam przykład, ale przy użyciu argumentów słów kluczowych:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definiowanie schematu statycznego w czasie rejestracji

Funkcja UDTF zwraca wiersze ze schematem wyjściowym obejmującym uporządkowaną sekwencję nazw kolumn i typów. Jeśli schemat UDTF zawsze powinien pozostać taki sam dla wszystkich zapytań, można określić statyczny, stały schemat po dekoratorze @udtf . Musi to być element :StructType

StructType().add("c1", StringType())

Lub ciąg DDL reprezentujący typ struktury:

c1: string

Obliczanie schematu dynamicznego w czasie wywołania funkcji

Funkcje zdefiniowane przez użytkownika mogą również programowo obliczać schemat danych wyjściowych dla każdego wywołania w zależności od wartości argumentów wejściowych. W tym celu zdefiniuj metodę statyczną o nazwie analyze , która akceptuje zero lub więcej parametrów odpowiadających argumentom podanym do określonego wywołania UDTF.

Każdy argument analyze metody jest wystąpieniem AnalyzeArgument klasy, która zawiera następujące pola:

AnalyzeArgument pole klasy opis
dataType Typ argumentu wejściowego jako DataType. W przypadku argumentów tabeli wejściowej StructType jest to reprezentacja kolumn tabeli.
value Wartość argumentu wejściowego jako Optional[Any]. None Dotyczy to argumentów tabeli lub argumentów skalarnych literału, które nie są stałe.
isTable Czy argument wejściowy jest tabelą jako .BooleanType
isConstantExpression Czy argument wejściowy jest wyrażeniem składanym stałym jako BooleanType.

Metoda analyze zwraca wystąpienie AnalyzeResult klasy, które zawiera schemat tabeli wyników jako StructType plus kilka pól opcjonalnych. Jeśli funkcja UDTF akceptuje argument tabeli wejściowej, AnalyzeResult może również zawierać żądany sposób partycjonowania i porządkować wiersze tabeli wejściowej w kilku wywołaniach UDTF, zgodnie z opisem w dalszej części.

AnalyzeResult pole klasy opis
schema Schemat tabeli wyników jako StructType.
withSinglePartition Czy wysyłać wszystkie wiersze wejściowe do tego samego wystąpienia klasy UDTF co BooleanType.
partitionBy Jeśli ustawiono wartość niepustą, wszystkie wiersze z każdą unikatową kombinacją wartości wyrażeń partycjonowania są używane przez oddzielne wystąpienie klasy UDTF.
orderBy Jeśli ustawiono wartość niepustą, określa kolejność wierszy w ramach każdej partycji.
select Jeśli jest ustawiona wartość niepusta, jest to sekwencja wyrażeń, które funkcja UDTF określa dla katalizatora w celu obliczenia względem kolumn w wejściowym argumencie TABLE. Funkcja UDTF odbiera jeden atrybut wejściowy dla każdej nazwy na liście w kolejności, w której są wymienione.

Ten analyze przykład zwraca jedną kolumnę wyjściową dla każdego wyrazu w argumencie ciągu wejściowego.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Przekazywanie stanu do przyszłych eval wywołań

Metoda analyze może służyć jako wygodne miejsce do przeprowadzenia inicjowania, a następnie przekazać wyniki do przyszłych eval wywołań metody dla tego samego wywołania UDTF.

W tym celu utwórz podklasę i zwróć wystąpienie podklasy AnalyzeResult podklasy analyze z metody . Następnie dodaj dodatkowy argument do __init__ metody , aby zaakceptować to wystąpienie.

Ten analyze przykład zwraca stały schemat danych wyjściowych, ale dodaje informacje niestandardowe w metadanych wyników, które mają być używane przez przyszłe __init__ wywołania metod:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Zwracanie wierszy wyjściowych

Metoda eval jest uruchamiana raz dla każdego wiersza argumentu tabeli wejściowej (lub tylko raz, jeśli nie podano argumentu tabeli), po którym następuje jedno wywołanie terminate metody na końcu. Metoda zwraca zero lub więcej wierszy, które są zgodne ze schematem wyników, generując krotki, listy lub pyspark.sql.Row obiekty.

Ten przykład zwraca wiersz, podając krotkę trzech elementów:

def eval(self, x, y, z):
  yield (x, y, z)

Można również pominąć nawiasy:

def eval(self, x, y, z):
  yield x, y, z

Dodaj przecinek końcowy, aby zwrócić wiersz z tylko jedną kolumną:

def eval(self, x, y, z):
  yield x,

Można również uzyskać pyspark.sql.Row obiekt.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

W tym przykładzie terminate zwracane są wiersze wyjściowe z metody przy użyciu listy języka Python. Stan wewnątrz klasy można przechowywać we wcześniejszych krokach w ocenie udTF w tym celu.

def terminate(self):
  yield [self.x, self.y, self.z]

Przekazywanie argumentów skalarnych do funkcji UDTF

Argumenty skalarne można przekazać do formatu UDTF jako wyrażenia stałe składające się z wartości literałów lub funkcji na ich podstawie. Na przykład:

SELECT * FROM udtf(42, group => upper("finance_department"));

Przekazywanie argumentów tabeli do funkcji UDTF

Funkcje UDF języka Python mogą akceptować tabelę danych wejściowych jako argument oprócz argumentów wejściowych skalarnych. Pojedynczy protokół UDTF może również akceptować argument tabeli i wiele argumentów skalarnych.

Następnie dowolne zapytanie SQL może podać tabelę wejściową przy użyciu słowa kluczowego TABLE , po którym następują nawiasy otaczające odpowiedni identyfikator tabeli, na przykład TABLE(t). Alternatywnie możesz przekazać podzapytywanie tabeli, na przykład TABLE(SELECT a, b, c FROM t) lub TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Argument tabeli wejściowej jest następnie reprezentowany jako pyspark.sql.Row argument eval metody z jednym wywołaniem eval metody dla każdego wiersza w tabeli wejściowej. Możesz użyć standardowych adnotacji pól kolumn PySpark do interakcji z kolumnami w każdym wierszu. W poniższym przykładzie pokazano jawne zaimportowanie typu PySpark Row , a następnie filtrowanie przekazanej tabeli w id polu:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Aby wysłać zapytanie do funkcji, użyj słowa kluczowego TABLE SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Określanie partycjonowania wierszy wejściowych z wywołań funkcji

Podczas wywoływania funkcji UDTF z argumentem tabeli dowolne zapytanie SQL może podzielić tabelę wejściową na kilka wywołań UDTF na podstawie wartości co najmniej jednej kolumny tabeli wejściowej.

Aby określić partycję, użyj PARTITION BY klauzuli w wywołaniu funkcji po argumencie TABLE . Gwarantuje to, że wszystkie wiersze wejściowe z każdą unikatową kombinacją wartości kolumn partycjonowania będą używane przez dokładnie jedno wystąpienie klasy UDTF.

Należy pamiętać, że oprócz prostych odwołań do kolumn klauzula PARTITION BY akceptuje również dowolne wyrażenia na podstawie kolumn tabeli wejściowej. Można na przykład określić LENGTH ciąg, wyodrębnić miesiąc z daty lub połączyć dwie wartości.

Można również określić WITH SINGLE PARTITION zamiast PARTITION BY żądać tylko jednej partycji, w której wszystkie wiersze wejściowe muszą być używane przez dokładnie jedno wystąpienie klasy UDTF.

W ramach każdej partycji można opcjonalnie określić wymaganą kolejność wierszy wejściowych, ponieważ metoda UDTF eval je używa. W tym celu podaj klauzulę ORDER BY po klauzuli PARTITION BY lub WITH SINGLE PARTITION opisanej powyżej.

Rozważmy na przykład następujące funkcje UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Opcje partycjonowania można określić podczas wywoływania funkcji UDTF za pośrednictwem tabeli wejściowej na różne sposoby:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Określanie partycjonowania wierszy wejściowych z analyze metody

Należy pamiętać, że dla każdego z powyższych sposobów partycjonowania tabeli wejściowej podczas wywoływania funkcji UDF w zapytaniach SQL istnieje odpowiedni sposób, aby metoda UDTF analyze automatycznie określała tę samą metodę partycjonowania.

  • Zamiast wywoływać funkcję UDTF jako SELECT * FROM udtf(TABLE(t) PARTITION BY a), możesz zaktualizować analyze metodę , aby ustawić pole partitionBy=[PartitioningColumn("a")] i po prostu wywołać funkcję przy użyciu polecenia SELECT * FROM udtf(TABLE(t)).
  • Za pomocą tego samego tokenu, zamiast określać TABLE(t) WITH SINGLE PARTITION ORDER BY b w zapytaniu SQL, można ustawić analyze pola withSinglePartition=true , a orderBy=[OrderingColumn("b")] następnie po prostu przekazać TABLE(t)polecenie .
  • Zamiast przekazywać TABLE(SELECT a FROM t) zapytanie SQL, możesz ustawić analyze , select=[SelectedColumn("a")] a następnie przekazać polecenie TABLE(t).

W poniższym przykładzie analyze zwraca stały schemat danych wyjściowych, wybiera podzbiór kolumn z tabeli wejściowej i określa, że tabela wejściowa jest partycjonowana na kilka wywołań UDTF na podstawie wartości date kolumny:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])