Справочник по языку для разностных динамических таблиц Python
В этой статье содержатся сведения о интерфейсе программирования Python для Delta Live Tables.
Дополнительные сведения об API SQL см. в Справочнике по языку SQL для разностных динамических таблиц.
Дополнительные сведения о настройке автозагрузчика см. в разделе "Что такое автозагрузчик?".
Ограничения
Интерфейс Python в Delta Live Tables имеет следующие ограничения:
- Python
table
иview
функции должны возвращать кадр данных. Некоторые функции, работающие с кадрами данных, не возвращают кадры данных и не должны использоваться. Так как преобразования DataFrame выполняются после разрешения полного графа потока данных, при использовании таких операций могут быть непреднамеренные побочные эффекты. Эти операции включают такие функции, какcollect()
,count()
,toPandas()
,save()
иsaveAsTable()
. Однако эти функции можно включить вне определенийtable
илиview
функций, так как этот код выполняется один раз во время этапа инициализации графа. - Функция
pivot()
не поддерживается. Операцияpivot
в Spark требует активной загрузки входных данных для вычисления схемы выходных данных. Эта возможность не поддерживается в разностных динамических таблицах.
dlt
Импорт модуля Python
Функции Python разностных динамических таблиц определяются в модуле dlt
. Конвейеры, реализованные с помощью API Python, должны импортировать этот модуль:
import dlt
Создание материализованного представления или потоковой таблицы Разностных динамических таблиц
В Python разностные динамические таблицы определяют, следует ли обновлять набор данных в виде материализованного представления или потоковой таблицы на основе определяющего запроса. Декоратор @table
используется для определения материализованных представлений и потоковых таблиц.
Чтобы определить материализованное представление в Python, примените @table
к запросу, который выполняет статическое чтение в источнике данных. Чтобы определить таблицу потоковой передачи, примените @table
к запросу, который выполняет потоковое чтение в источнике данных. Оба типа набора данных имеют одну и ту же спецификацию синтаксиса, как показано ниже.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Создание представления разностных динамических таблиц
Чтобы определить представление в Python, примените декоратор @view
. @table
Как и декоратор, можно использовать представления в разностных динамических таблицах для статических или потоковых наборов данных. Ниже приведен синтаксис для определения представлений с помощью Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Пример. Определение таблиц и представлений
Чтобы определить таблицу или представление в Python, примените @dlt.view
к функции или @dlt.table
декоратору. Для назначения имени таблицы или представления можно использовать имя функции или параметр name
. В следующем примере определяются два различных набора данных: представление с именем taxi_raw
, которое принимает JSON-файл в качестве источника входных данных, и таблица с именем filtered_data
, принимающая представление taxi_raw
в качестве входных данных:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Пример. Доступ к набору данных, определенному в том же конвейере
Помимо чтения из внешних источников данных, можно получить доступ к наборам данных, определенным в одном конвейере с функцией Delta Live Tables read()
. В следующем примере показано создание customers_filtered
набора данных с помощью read()
функции:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
Вы также можете использовать функцию spark.table()
для доступа к набору данных, определенному в том же конвейере. При использовании функции spark.table()
для доступа к набору данных, определенному в конвейере, в аргументе функции в начале ключевого слова LIVE
добавляется имя набора данных:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Пример. Чтение из таблицы, зарегистрированной в хранилище метаданных
Чтобы считывать данные из таблицы, зарегистрированной в хранилище метаданных Hive, в аргументе функции опустить LIVE
ключевое слово и при необходимости указать имя таблицы с именем базы данных:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Пример чтения из таблицы каталога Unity см . в конвейере каталога Unity.
Пример. Доступ к набору данных с помощью spark.sql
Можно также вернуть набор данных с помощью выражения spark.sql
в функции запроса. Для чтения из внутреннего набора данных, добавьте в начало LIVE.
к имени набора данных:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Создание таблицы для использования в качестве цели операций потоковой передачи
create_streaming_table()
Используйте функцию для создания целевой таблицы для записей выходных данных операций потоковой передачи, включая apply_changes() и @append_flow выходных записей.
Примечание.
create_streaming_live_table()
Не create_target_table()
рекомендуется использовать функции. В Databricks рекомендуется обновить существующий код для использования функции create_streaming_table()
.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Аргументы |
---|
name Тип: str Имя таблицы. Этот параметр является обязательным. |
comment Тип: str Необязательное описание таблицы. |
spark_conf Тип: dict Необязательный список конфигураций Spark для выполнения этого запроса. |
table_properties Тип: dict Необязательный список свойств таблицы для таблицы. |
partition_cols Тип: array Необязательный список из одного или нескольких столбцов, используемых для секционирования таблицы. |
path Тип: str Дополнительное место хранения данных таблицы. Если значение не задано, система будет по умолчанию использовать место хранения конвейера. |
schema Тип: str или StructType .Необязательное определение схемы для таблицы. Схемы можно определить как строку SQL DDL или с помощью Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Тип: dict Необязательные ограничения качества данных для таблицы. См . несколько ожиданий. |
Управление материализацией таблиц
Таблицы также обеспечивают дополнительный контроль над их материализацией:
- Укажите, как секционируются таблицы с помощью
partition_cols
. Секционирование можно использовать для ускорения запросов. - Свойства таблицы можно задать при определении представления или таблицы. См . свойства таблицы Delta Live Table.
- Задайте место хранения данных таблицы с помощью параметра
path
. По умолчанию данные таблицы хранятся в расположении хранилища конвейера, еслиpath
не задано. - Созданные столбцы можно использовать в определении схемы. См . пример. Указание столбцов схемы и секционирования.
Примечание.
Для таблиц меньше 1 ТБ размера Databricks рекомендует разрешить организации управления данными Delta Live Tables. Если таблица не будет расти за пределами терабайта, обычно не следует указывать столбцы секций.
Пример. Указание столбцов схемы и секционирования
При необходимости можно указать схему таблицы с помощью Python StructType
или строки SQL DDL. При указании в строке DDL определение может включать созданные столбцы.
В следующем примере создается таблица sales
с схемой, указанной с помощью Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
В следующем примере схема таблицы с помощью строки DDL определяет созданный столбец и определяет столбец секционирования:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
По умолчанию разностные динамические таблицы выводят схему из определения table
, если схема не указана.
Настройка таблицы потоковой передачи для пропуска изменений в исходной потоковой таблице
Примечание.
- Флаг
skipChangeCommits
работает только сspark.readStream
помощьюoption()
функции. Этот флаг нельзя использовать вdlt.read_stream()
функции. - Нельзя использовать
skipChangeCommits
флаг, если исходная потоковая таблица определена как цель функции apply_changes().
По умолчанию для потоковых таблиц требуются источники только для добавления. Если в таблице потоковой передачи используется другая потоковая таблица в качестве источника, а исходная потоковая таблица требует обновления или удаления, например GDPR "право быть забытым", флаг можно задать при чтении исходной таблицы потоковой передачи, skipChangeCommits
чтобы игнорировать эти изменения. Дополнительные сведения об этом флаге см. в разделе "Игнорировать обновления и удаления".
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Свойства разностных динамических таблиц Python
В следующих таблицах описаны параметры и свойства, которые можно указать при определении таблиц и представлений с помощью разностных динамических таблиц:
@table или @view |
---|
name Тип: str Необязательное имя таблицы или представления. Если значение не определено, имя функции используется в качестве имени таблицы или представления. |
comment Тип: str Необязательное описание таблицы. |
spark_conf Тип: dict Необязательный список конфигураций Spark для выполнения этого запроса. |
table_properties Тип: dict Необязательный список свойств таблицы для таблицы. |
path Тип: str Дополнительное место хранения данных таблицы. Если значение не задано, система будет по умолчанию использовать место хранения конвейера. |
partition_cols Тип: a collection of str Необязательная коллекция, например list , одного или нескольких столбцов, используемых для секционирования таблицы. |
schema Тип: str или StructType .Необязательное определение схемы для таблицы. Схемы можно определить как строку SQL DDL или с помощью Python StructType . |
temporary Тип: bool Создайте таблицу, но не публикуйте метаданные для таблицы. Ключевое слово temporary указывает Delta Live Table создать таблицу, доступную конвейеру, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.Значение по умолчанию —False. |
Определение таблицы или представления |
---|
def <function-name>() Функция Python, определяющая набор данных. name Если параметр не задан, то <function-name> используется в качестве имени целевого набора данных. |
query Инструкция Spark SQL, возвращающая набор данных Spark или Koalas DataFrame. Используйте dlt.read() или spark.table() для выполнения полного считывания из набора данных, определенного в том же конвейере. При использовании функции spark.table() для считывания из набора данных, определенного в том же конвейере, в аргументе функции в начало ключевого слова LIVE добавляется имя набора данных. Например, для чтения из набора данных с именем customers :spark.table("LIVE.customers") Можно также использовать функцию spark.table() для чтения из таблицы, зарегистрированной в хранилище метаданных, пропустив ключевое слово LIVE , и при необходимости дополнить имя таблицы именем базы данных:spark.table("sales.customers") Используйте dlt.read_stream() для выполнения потокового считывания из набора данных, определенного в том же конвейере.Используйте функцию spark.sql , чтобы определить запрос SQL для создания возвращаемого набора данных.Используйте синтаксис PySpark для определения запросов разностных динамических таблиц с Python. |
Ожидания |
---|
@expect("description", "constraint") Объявите ограничение качества данных, определяемое с помощью description . Если строка нарушает ожидание, включите строку в целевой набор данных. |
@expect_or_drop("description", "constraint") Объявите ограничение качества данных, определяемое с помощью description . Если строка нарушает ожидание, удалите строку из целевого набора данных. |
@expect_or_fail("description", "constraint") Объявите ограничение качества данных, определяемое с помощью description . Если строка нарушает ожидание, немедленно прервите выполнение. |
@expect_all(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, включите строку в целевой набор данных. |
@expect_all_or_drop(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, удалите строку из целевого набора данных. |
@expect_all_or_fail(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, немедленно прервите выполнение. |
Изменение записи данных с помощью Python в разностных динамических таблицах
Используйте функцию apply_changes()
в API Python, чтобы использовать функции CDC Delta Live Tables. Интерфейс Python для разностных динамических таблиц также предоставляет функцию create_streaming_table(). Эту функцию можно использовать для создания целевой таблицы, требуемой функцией apply_changes()
.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Примечание.
Поведение по умолчанию для событий INSERT
и UPDATE
заключается в применении upsert к событиям CDC из источника — обновление всех записей в целевой таблице, которые совпадают с указанными ключами, или вставка новой записи, если совпадающая запись не существует в целевой таблице. Способ обработки событий DELETE
можно указать с помощью условия APPLY AS DELETE WHEN
.
Внимание
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes
необходимо также включить столбцы __START_AT
и __END_AT
с таким же типом данных, как в поле sequence_by
.
См . РАЗДЕЛ API APPLY CHANGES: Упрощение записи измененных данных в разностных динамических таблицах.
Аргументы |
---|
target Тип: str Имя обновляемой таблицы. Функцию create_streaming_table() можно использовать для создания целевой таблицы перед выполнением apply_changes() функции.Этот параметр является обязательным. |
source Тип: str Источник данных с записями CDC. Этот параметр является обязательным. |
keys Тип: list Столбец или сочетание столбцов, которые однозначно идентифицируют запись в исходных данных. Используется для определения того, какие события CDC применяются к конкретным записям в целевой таблице. Вы можете указать одно из следующего: * Список строк: ["userId", "orderId"] .* Список функций col() Spark SQL: [col("userId"), col("orderId"] .Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Этот параметр является обязательным. |
sequence_by Тип: str или col() .Имя столбца, указывающего логический порядок событий CDC в исходных данных. Разностные динамические таблицы используют эту последовательность для обработки событий изменения, которые поступают неупорядоченно. Вы можете указать одно из следующего: * Строка: "sequenceNum" .* Функция col() Spark SQL: col("sequenceNum") .Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Этот параметр является обязательным. |
ignore_null_updates Тип: bool Разрешает прием обновлений с подмножеством целевых столбцов. Если событие CDC совпадает с существующей записью и ignore_null_updates имеет значение True , столбцы с null сохранят существующие значения в целевом объекте. Это также относится к вложенным столбцам со значением null . Если ignore_null_updates имеет значение False , существующие значения будут перезаписаны значениями null .Это необязательный параметр. Значение по умолчанию — False . |
apply_as_deletes Тип: str или expr() .Указывает, в каких случаях событие CDC необходимо обрабатывать как DELETE , а не как upsert. Для обработки неупорядоченных данных удаленная запись временно сохраняется в виде отметки полного удаления в базовой разностной таблице, а в хранилище метаданных создается представление, которое отфильтровывает такие отметки. Интервал хранения можно настроить с помощью свойства таблицыpipelines.cdc.tombstoneGCThresholdInSeconds .Вы можете указать одно из следующего: * Строка: "Operation = 'DELETE'" .* Функция expr() Spark SQL: expr("Operation = 'DELETE'") .Это необязательный параметр. |
apply_as_truncates Тип: str или expr() .Указывает, в каких случаях событие CDC необходимо обрабатывать как полную таблицу TRUNCATE . Так как это предложение активирует полное усечение целевой таблицы, его следует использовать только для конкретных вариантов использования, требующих этой функции.Параметр apply_as_truncates поддерживается только для SCD типа 1. SCD типа 2 не поддерживает усечение.Вы можете указать одно из следующего: * Строка: "Operation = 'TRUNCATE'" .* Функция expr() Spark SQL: expr("Operation = 'TRUNCATE'") .Это необязательный параметр. |
column_list except_column_list Тип: list Подмножество столбцов для включения в целевую таблицу. Используйте column_list для указания полного списка включаемых столбцов. Используйте except_column_list для указания исключаемых столбцов. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:* column_list = ["userId", "name", "city"] .* column_list = [col("userId"), col("name"), col("city")] * except_column_list = ["operation", "sequenceNum"] * except_column_list = [col("operation"), col("sequenceNum") Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Это необязательный параметр. По умолчанию включаются все столбцы в целевой таблице, если в функцию не передается аргумент column_list или except_column_list . |
stored_as_scd_type Тип: str или int .Определяет, следует ли хранить записи в виде SCD типа 1 или SCD типа 2. Укажите значение 1 для SCD типа 1 или 2 для SCD типа 2.Предложение не является обязательным. Значение по умолчанию — SCD типа 1. |
track_history_column_list track_history_except_column_list Тип: list Подмножество выходных столбцов для отслеживания журнала в целевой таблице. Используется track_history_column_list для указания полного списка столбцов для отслеживания. Использованиеtrack_history_except_column_list , чтобы указать столбцы, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции Spark SQL col() : - track_history_column_list = ["userId", "name", "city"] . - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Это необязательный параметр. Значение по умолчанию — включать все столбцы в целевую таблицу, если нет или нет track_history_column_list .track_history_except_column_list аргумент передается функции. |
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по