Поделиться через


Справочник по языку для разностных динамических таблиц Python

В этой статье содержатся сведения о интерфейсе программирования Python для Delta Live Tables.

Дополнительные сведения об API SQL см. в Справочнике по языку SQL для разностных динамических таблиц.

Дополнительные сведения о настройке автозагрузчика см. в разделе "Что такое автозагрузчик?".

Ограничения

Интерфейс Python в Delta Live Tables имеет следующие ограничения:

  • Python table и view функции должны возвращать кадр данных. Некоторые функции, работающие с кадрами данных, не возвращают кадры данных и не должны использоваться. Так как преобразования DataFrame выполняются после разрешения полного графа потока данных, при использовании таких операций могут быть непреднамеренные побочные эффекты. Эти операции включают такие функции, как collect(), count(), toPandas(), save()и saveAsTable(). Однако эти функции можно включить вне определений table или view функций, так как этот код выполняется один раз во время этапа инициализации графа.
  • Функция pivot() не поддерживается. Операция pivot в Spark требует активной загрузки входных данных для вычисления схемы выходных данных. Эта возможность не поддерживается в разностных динамических таблицах.

dlt Импорт модуля Python

Функции Python разностных динамических таблиц определяются в модуле dlt. Конвейеры, реализованные с помощью API Python, должны импортировать этот модуль:

import dlt

Создание материализованного представления или потоковой таблицы Разностных динамических таблиц

В Python разностные динамические таблицы определяют, следует ли обновлять набор данных в виде материализованного представления или потоковой таблицы на основе определяющего запроса. Декоратор @table используется для определения материализованных представлений и потоковых таблиц.

Чтобы определить материализованное представление в Python, примените @table к запросу, который выполняет статическое чтение в источнике данных. Чтобы определить таблицу потоковой передачи, примените @table к запросу, который выполняет потоковое чтение в источнике данных. Оба типа набора данных имеют одну и ту же спецификацию синтаксиса, как показано ниже.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Создание представления разностных динамических таблиц

Чтобы определить представление в Python, примените декоратор @view. @table Как и декоратор, можно использовать представления в разностных динамических таблицах для статических или потоковых наборов данных. Ниже приведен синтаксис для определения представлений с помощью Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Пример. Определение таблиц и представлений

Чтобы определить таблицу или представление в Python, примените @dlt.view к функции или @dlt.table декоратору. Для назначения имени таблицы или представления можно использовать имя функции или параметр name. В следующем примере определяются два различных набора данных: представление с именем taxi_raw, которое принимает JSON-файл в качестве источника входных данных, и таблица с именем filtered_data, принимающая представление taxi_raw в качестве входных данных:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Пример. Доступ к набору данных, определенному в том же конвейере

Помимо чтения из внешних источников данных, можно получить доступ к наборам данных, определенным в одном конвейере с функцией Delta Live Tables read() . В следующем примере показано создание customers_filtered набора данных с помощью read() функции:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Вы также можете использовать функцию spark.table() для доступа к набору данных, определенному в том же конвейере. При использовании функции spark.table() для доступа к набору данных, определенному в конвейере, в аргументе функции в начале ключевого слова LIVE добавляется имя набора данных:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Пример. Чтение из таблицы, зарегистрированной в хранилище метаданных

Чтобы считывать данные из таблицы, зарегистрированной в хранилище метаданных Hive, в аргументе функции опустить LIVE ключевое слово и при необходимости указать имя таблицы с именем базы данных:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Пример чтения из таблицы каталога Unity см . в конвейере каталога Unity.

Пример. Доступ к набору данных с помощью spark.sql

Можно также вернуть набор данных с помощью выражения spark.sql в функции запроса. Для чтения из внутреннего набора данных, добавьте в начало LIVE. к имени набора данных:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Создание таблицы для использования в качестве цели операций потоковой передачи

create_streaming_table() Используйте функцию для создания целевой таблицы для записей выходных данных операций потоковой передачи, включая apply_changes() и @append_flow выходных записей.

Примечание.

create_streaming_live_table() Не create_target_table() рекомендуется использовать функции. В Databricks рекомендуется обновить существующий код для использования функции create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Аргументы
name

Тип: str

Имя таблицы.

Этот параметр является обязательным.
comment

Тип: str

Необязательное описание таблицы.
spark_conf

Тип: dict

Необязательный список конфигураций Spark для выполнения этого запроса.
table_properties

Тип: dict

Необязательный список свойств таблицы для таблицы.
partition_cols

Тип: array

Необязательный список из одного или нескольких столбцов, используемых для секционирования таблицы.
path

Тип: str

Дополнительное место хранения данных таблицы. Если значение не задано, система будет по умолчанию использовать место хранения конвейера.
schema

Тип: str или StructType.

Необязательное определение схемы для таблицы. Схемы можно определить как строку SQL DDL или с помощью Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Тип: dict

Необязательные ограничения качества данных для таблицы. См . несколько ожиданий.

Управление материализацией таблиц

Таблицы также обеспечивают дополнительный контроль над их материализацией:

  • Укажите, как секционируются таблицы с помощью partition_cols. Секционирование можно использовать для ускорения запросов.
  • Свойства таблицы можно задать при определении представления или таблицы. См . свойства таблицы Delta Live Table.
  • Задайте место хранения данных таблицы с помощью параметра path. По умолчанию данные таблицы хранятся в расположении хранилища конвейера, если path не задано.
  • Созданные столбцы можно использовать в определении схемы. См . пример. Указание столбцов схемы и секционирования.

Примечание.

Для таблиц меньше 1 ТБ размера Databricks рекомендует разрешить организации управления данными Delta Live Tables. Если таблица не будет расти за пределами терабайта, обычно не следует указывать столбцы секций.

Пример. Указание столбцов схемы и секционирования

При необходимости можно указать схему таблицы с помощью Python StructType или строки SQL DDL. При указании в строке DDL определение может включать созданные столбцы.

В следующем примере создается таблица sales с схемой, указанной с помощью Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

В следующем примере схема таблицы с помощью строки DDL определяет созданный столбец и определяет столбец секционирования:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

По умолчанию разностные динамические таблицы выводят схему из определения table, если схема не указана.

Настройка таблицы потоковой передачи для пропуска изменений в исходной потоковой таблице

Примечание.

  • Флаг skipChangeCommits работает только с spark.readStream помощью option() функции. Этот флаг нельзя использовать в dlt.read_stream() функции.
  • Нельзя использовать skipChangeCommits флаг, если исходная потоковая таблица определена как цель функции apply_changes().

По умолчанию для потоковых таблиц требуются источники только для добавления. Если в таблице потоковой передачи используется другая потоковая таблица в качестве источника, а исходная потоковая таблица требует обновления или удаления, например GDPR "право быть забытым", флаг можно задать при чтении исходной таблицы потоковой передачи, skipChangeCommits чтобы игнорировать эти изменения. Дополнительные сведения об этом флаге см. в разделе "Игнорировать обновления и удаления".

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Свойства разностных динамических таблиц Python

В следующих таблицах описаны параметры и свойства, которые можно указать при определении таблиц и представлений с помощью разностных динамических таблиц:

@table или @view
name

Тип: str

Необязательное имя таблицы или представления. Если значение не определено, имя функции используется в качестве имени таблицы или представления.
comment

Тип: str

Необязательное описание таблицы.
spark_conf

Тип: dict

Необязательный список конфигураций Spark для выполнения этого запроса.
table_properties

Тип: dict

Необязательный список свойств таблицы для таблицы.
path

Тип: str

Дополнительное место хранения данных таблицы. Если значение не задано, система будет по умолчанию использовать место хранения конвейера.
partition_cols

Тип: a collection of str

Необязательная коллекция, например list, одного или нескольких столбцов, используемых для секционирования таблицы.
schema

Тип: str или StructType.

Необязательное определение схемы для таблицы. Схемы можно определить как строку SQL DDL или с помощью Python
StructType.
temporary

Тип: bool

Создайте таблицу, но не публикуйте метаданные для таблицы. Ключевое слово temporary указывает Delta Live Table создать таблицу, доступную конвейеру, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.

Значение по умолчанию —False.
Определение таблицы или представления
def <function-name>()

Функция Python, определяющая набор данных. nameЕсли параметр не задан, то <function-name> используется в качестве имени целевого набора данных.
query

Инструкция Spark SQL, возвращающая набор данных Spark или Koalas DataFrame.

Используйте dlt.read() или spark.table() для выполнения полного считывания из набора данных, определенного в том же конвейере. При использовании функции spark.table() для считывания из набора данных, определенного в том же конвейере, в аргументе функции в начало ключевого слова LIVE добавляется имя набора данных. Например, для чтения из набора данных с именем customers:

spark.table("LIVE.customers")

Можно также использовать функцию spark.table() для чтения из таблицы, зарегистрированной в хранилище метаданных, пропустив ключевое слово LIVE, и при необходимости дополнить имя таблицы именем базы данных:

spark.table("sales.customers")

Используйте dlt.read_stream() для выполнения потокового считывания из набора данных, определенного в том же конвейере.

Используйте функцию spark.sql, чтобы определить запрос SQL для создания возвращаемого набора данных.

Используйте синтаксис PySpark для определения запросов разностных динамических таблиц с Python.
Ожидания
@expect("description", "constraint")

Объявите ограничение качества данных, определяемое с помощью
description. Если строка нарушает ожидание, включите строку в целевой набор данных.
@expect_or_drop("description", "constraint")

Объявите ограничение качества данных, определяемое с помощью
description. Если строка нарушает ожидание, удалите строку из целевого набора данных.
@expect_or_fail("description", "constraint")

Объявите ограничение качества данных, определяемое с помощью
description. Если строка нарушает ожидание, немедленно прервите выполнение.
@expect_all(expectations)

Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, включите строку в целевой набор данных.
@expect_all_or_drop(expectations)

Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, удалите строку из целевого набора данных.
@expect_all_or_fail(expectations)

Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, немедленно прервите выполнение.

Изменение записи данных с помощью Python в разностных динамических таблицах

Используйте функцию apply_changes() в API Python, чтобы использовать функции CDC Delta Live Tables. Интерфейс Python для разностных динамических таблиц также предоставляет функцию create_streaming_table(). Эту функцию можно использовать для создания целевой таблицы, требуемой функцией apply_changes().

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Примечание.

Поведение по умолчанию для событий INSERT и UPDATE заключается в применении upsert к событиям CDC из источника — обновление всех записей в целевой таблице, которые совпадают с указанными ключами, или вставка новой записи, если совпадающая запись не существует в целевой таблице. Способ обработки событий DELETE можно указать с помощью условия APPLY AS DELETE WHEN.

Внимание

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes необходимо также включить столбцы __START_AT и __END_AT с таким же типом данных, как в поле sequence_by.

См . РАЗДЕЛ API APPLY CHANGES: Упрощение записи измененных данных в разностных динамических таблицах.

Аргументы
target

Тип: str

Имя обновляемой таблицы. Функцию create_streaming_table() можно использовать для создания целевой таблицы перед выполнением apply_changes() функции.

Этот параметр является обязательным.
source

Тип: str

Источник данных с записями CDC.

Этот параметр является обязательным.
keys

Тип: list

Столбец или сочетание столбцов, которые однозначно идентифицируют запись в исходных данных. Используется для определения того, какие события CDC применяются к конкретным записям в целевой таблице.

Вы можете указать одно из следующего:

* Список строк: ["userId", "orderId"].
* Список функций col() Spark SQL: [col("userId"), col("orderId"].

Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId).

Этот параметр является обязательным.
sequence_by

Тип: str или col().

Имя столбца, указывающего логический порядок событий CDC в исходных данных. Разностные динамические таблицы используют эту последовательность для обработки событий изменения, которые поступают неупорядоченно.

Вы можете указать одно из следующего:

* Строка: "sequenceNum".
* Функция col() Spark SQL: col("sequenceNum").

Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId).

Этот параметр является обязательным.
ignore_null_updates

Тип: bool

Разрешает прием обновлений с подмножеством целевых столбцов. Если событие CDC совпадает с существующей записью и ignore_null_updates имеет значение True, столбцы с null сохранят существующие значения в целевом объекте. Это также относится к вложенным столбцам со значением null. Если ignore_null_updates имеет значение False, существующие значения будут перезаписаны значениями null.

Это необязательный параметр.

Значение по умолчанию — False.
apply_as_deletes

Тип: str или expr().

Указывает, в каких случаях событие CDC необходимо обрабатывать как DELETE, а не как upsert. Для обработки неупорядоченных данных удаленная запись временно сохраняется в виде отметки полного удаления в базовой разностной таблице, а в хранилище метаданных создается представление, которое отфильтровывает такие отметки. Интервал хранения можно настроить с помощью свойства таблицы
pipelines.cdc.tombstoneGCThresholdInSeconds.

Вы можете указать одно из следующего:

* Строка: "Operation = 'DELETE'".
* Функция expr() Spark SQL: expr("Operation = 'DELETE'").

Это необязательный параметр.
apply_as_truncates

Тип: str или expr().

Указывает, в каких случаях событие CDC необходимо обрабатывать как полную таблицу TRUNCATE. Так как это предложение активирует полное усечение целевой таблицы, его следует использовать только для конкретных вариантов использования, требующих этой функции.

Параметр apply_as_truncates поддерживается только для SCD типа 1. SCD типа 2 не поддерживает усечение.

Вы можете указать одно из следующего:

* Строка: "Operation = 'TRUNCATE'".
* Функция expr() Spark SQL: expr("Operation = 'TRUNCATE'").

Это необязательный параметр.
column_list

except_column_list

Тип: list

Подмножество столбцов для включения в целевую таблицу. Используйте column_list для указания полного списка включаемых столбцов. Используйте except_column_list для указания исключаемых столбцов. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId).

Это необязательный параметр.

По умолчанию включаются все столбцы в целевой таблице, если в функцию не передается аргумент column_list или except_column_list.
stored_as_scd_type

Тип: str или int.

Определяет, следует ли хранить записи в виде SCD типа 1 или SCD типа 2.

Укажите значение 1 для SCD типа 1 или 2 для SCD типа 2.

Предложение не является обязательным.

Значение по умолчанию — SCD типа 1.
track_history_column_list

track_history_except_column_list

Тип: list

Подмножество выходных столбцов для отслеживания журнала в целевой таблице. Используется track_history_column_list для указания полного списка столбцов для отслеживания. Использование
track_history_except_column_list , чтобы указать столбцы, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции Spark SQL col() : - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но нельзя использовать col(source.userId).

Это необязательный параметр.

Значение по умолчанию — включать все столбцы в целевую таблицу, если нет или нет track_history_column_list .
track_history_except_column_list аргумент передается функции.