增量实时表 Python 语言参考
本文提供了有关增量实时表 Python 编程接口的详细信息和示例。 有关完整的 API 规范,请参阅 Python API 规范。
有关 SQL API 的信息,请参阅增量实时表 SQL 语言参考。
Python 数据集
Python API 在 dlt 模块中定义。 必须在利用 Python API 实现的增量实时表管道中导入 dlt 模块。 向函数应用 @dlt.view 或 @dlt.table 修饰器,以使用 Python 定义视图或表。 你可以使用函数名称或 name 参数来分配表或视图名称。 以下示例定义了两个不同的数据集:一个将 JSON 文件作为输入源的 taxi_raw 视图,一个将 taxi_raw 视图作为输入的 filtered_data 表:
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
视图和表函数必须返回 Spark 数据帧或 Koalas 数据帧。 函数返回的 Koalas 数据帧将由增量实时表运行时转换为 Spark 数据集。
除了从外部数据源读取数据外,还可以使用增量实时表 read() 函数访问同一管道中定义的数据集。 以下示例演示如何使用 read() 函数创建 customers_filtered 数据集:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
还可以使用 spark.table() 函数访问在元存储中注册的同一管道或表中定义的数据集。 使用 spark.table() 函数访问管道中定义的数据集时,在函数参数中的数据集名称前加上 LIVE 关键字:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
如果要从元存储中注册的表读取数据,在函数参数中,忽略 LIVE 关键字,并选择性地使用数据库名称限定表名称:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
增量实时表可确保管道自动捕获数据集之间的依赖关系。 此依赖关系信息用于确定执行更新时的执行顺序,以及在管道的事件日志中记录世系信息。
你还可以在查询函数中使用 spark.sql 表达式返回数据集。 若要从内部数据集读取数据,请在数据集名称前追加 LIVE.:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
视图和表都具有以下可选属性:
comment:此数据集的用户可读说明。spark_conf:一个 Python 字典,包含仅用于执行此查询的 Spark 配置。- 使用预期强制实施的数据质量约束。
表还提供对其具体化的额外控制:
指定如何使用 对表进行分区。 可以使用分区来加快查询速度。
可以在定义视图或表时设置表属性。 有关更多详细信息,请参阅表属性。
使用
path设置为表数据设置存储位置。 默认情况下,如果未设置path,表数据将存储在管道存储位置中。可以选择性地使用 Python
StructType或 SQL DDL 字符串指定表架构。 下面的示例使用显式指定的架构创建一个名为sales的表:sales_schema = StructType([ StructField("customer_id", StringType(), True), StructField("number_of_line_items", StringType(), True), StructField("order_datetime", StringType(), True), StructField("order_number", LongType(), True)] ) @dlt.table( comment="Raw data on sales", schema=sales_schema) def sales(): return ("...") @dlt.table( comment="Raw data on sales", schema="customer_id STRING, customer_name STRING, number_of_line_items STRING, order_datetime STRING, order_number LONG") def sales(): return ("...")默认情况下,如果未指定架构,则增量实时表将从
table定义推断架构。
Python 库
要指定外部 Python 库,请使用 %pip install 魔术命令。 更新开始时,Delta Live Tables 会在运行任何表定义之前运行包含 %pip install 命令的所有单元格。 管道中包含的每个 Python 笔记本均可访问所有已安装的库。 以下示例将安装一个名为 logger 的包,并使其对管道中的任何 Python 笔记本全局可用:
%pip install logger
from logger import log_info
@dlt.table
def dataset():
log_info(...)
return dlt.read(..)
若要安装 Python wheel 包,请将 wheel 路径添加到 %pip install 命令。 已安装的 Python wheel 包可用于管道中的所有表。 以下示例从 DBFS 目录 /dbfs/dlt/ 安装名为 dltfns-1.0-py3-none-any.whl 的 wheel:
%pip install /dbfs/dlt/dltfns-1.0-py3-none-any.whl
Python API 规范
Python 模块
注意
Delta Live Tables Python 接口具有以下限制:
- 不支持
pivot()函数。 在数据集定义中使用pivot()函数会导致不确定的管道延迟。
增量实时表 Python 函数在 dlt 模块中定义。 利用 Python API 实现的管道必须导入此模块:
import dlt
创建表
要在 Python 中定义表,请应用 @table 装饰器。 @table 装饰器是 @create_table 装饰器的别名。
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
创建视图
要在 Python 中定义视图,请应用 @view 装饰器。 @view 装饰器是 @create_view 装饰器的别名。
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Python 属性
| @table 或 @view |
|---|
| name 类型: str表或视图的可选名称。 如果未定义,将使用函数名称作为表名或视图名称。 |
| comment 类型: str表的可选说明。 |
| spark_conf 类型: dict用于执行此查询的 Spark 配置的可选列表。 |
| table_properties 类型: dict表的表属性可选列表。 |
| path 类型: str表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。 |
| partition_cols 类型: array包含一列或多列的可选列表,用于对表进行分区。 |
| 架构 类型: str 或 StructType表的可选架构定义。 架构可以定义为 SQL DDL 字符串,或使用 Python 定义 StructType. |
| 临时 类型: bool创建临时表。 此表中不保留任何元数据。 默认值为“False”。 |
| 表或视图定义 |
|---|
| def () 用于定义数据集的 Python 函数。 如果未设置 name 参数,则使用 <function-name> 作为目标数据集名称。 |
| query 一个 Spark SQL 语句,它返回 Spark Dataset 或 Koalas DataFrame。 使用 dlt.read() 或 spark.table() 从同一管道中定义的数据集执行完整读取操作。 使用 spark.table() 函数从同一管道中定义的数据集读取数据时,在函数参数中的数据集名称前加上 LIVE 关键字。 例如,从名为 customers 的数据集读取数据:spark.table("LIVE.customers")还可以使用 spark.table() 函数从元存储中注册的表中读取数据,方法是省略 LIVE 关键字,并选择性地使用数据库名称限定表名称:spark.table("sales.customers")使用 dlt.read_stream() 从同一管道中定义的数据集执行流式读取操作。使用 spark.sql 函数定义 SQL 查询,以创建返回数据集。使用 PySpark 语法通过 Python 定义 Delta Live Tables 查询。 |
| 预期 |
|---|
| @expect(“description”, “constraint”) 声明由以下参数确定的数据质量约束: description. 如果某行违反了预期,则在目标数据集中包含该行。 |
| @expect_or_drop(“description”, “constraint”) 声明由以下参数确定的数据质量约束: description. 如果某行违反了预期,则从目标数据集中删除该行。 |
| @expect_or_fail(“description”, “constraint”) 声明由以下参数确定的数据质量约束: description. 如果某行违反了预期,则立即停止执行。 |
| @expect_all(expectations) 声明一个或多个数据质量约束。 expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了其中一个预期,则在目标数据集中包含该行。 |
| @expect_all_or_drop(expectations) 声明一个或多个数据质量约束。 expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了任何预期,则从目标数据集中删除该行。 |
| @expect_all_or_fail(expectations) 声明一个或多个数据质量约束。 expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了任何预期,则立即停止执行。 |
表属性
除了 Delta Lake 支持的表属性外,还可以设置以下表属性。
| 表属性 |
|---|
| pipelines.autoOptimize.managed 默认: true启用或禁用此表的自动计划优化。 |
| pipelines.autoOptimize.zOrderCols 默认值:无 一个以逗号分隔的可选列名称列表,用于按 z 顺序对此表进行排序。 |
| pipelines.reset.allowed 默认: true控制是否允许对此表进行完全刷新。 |