增量实时表 Python 语言参考

本文提供了有关增量实时表 Python 编程接口的详细信息和示例。 有关完整的 API 规范,请参阅 Python API 规范

有关 SQL API 的信息,请参阅增量实时表 SQL 语言参考

Python 数据集

Python API 在 dlt 模块中定义。 必须在利用 Python API 实现的增量实时表管道中导入 dlt 模块。 向函数应用 @dlt.view@dlt.table 修饰器,以使用 Python 定义视图或表。 你可以使用函数名称或 name 参数来分配表或视图名称。 以下示例定义了两个不同的数据集:一个将 JSON 文件作为输入源的 taxi_raw 视图,一个将 taxi_raw 视图作为输入的 filtered_data 表:

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

视图和表函数必须返回 Spark 数据帧或 Koalas 数据帧。 函数返回的 Koalas 数据帧将由增量实时表运行时转换为 Spark 数据集。

除了从外部数据源读取数据外,还可以使用增量实时表 read() 函数访问同一管道中定义的数据集。 以下示例演示如何使用 read() 函数创建 customers_filtered 数据集:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

还可以使用 spark.table() 函数访问在元存储中注册的同一管道或表中定义的数据集。 使用 spark.table() 函数访问管道中定义的数据集时,在函数参数中的数据集名称前加上 LIVE 关键字:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

如果要从元存储中注册的表读取数据,在函数参数中,忽略 LIVE 关键字,并选择性地使用数据库名称限定表名称:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

增量实时表可确保管道自动捕获数据集之间的依赖关系。 此依赖关系信息用于确定执行更新时的执行顺序,以及在管道的事件日志中记录世系信息。

你还可以在查询函数中使用 spark.sql 表达式返回数据集。 若要从内部数据集读取数据,请在数据集名称前追加 LIVE.

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

视图和表都具有以下可选属性:

  • comment:此数据集的用户可读说明。
  • spark_conf:一个 Python 字典,包含仅用于执行此查询的 Spark 配置。
  • 使用预期强制实施的数据质量约束。

表还提供对其具体化的额外控制:

  • 指定如何使用 对表进行分区。 可以使用分区来加快查询速度。

  • 可以在定义视图或表时设置表属性。 有关更多详细信息,请参阅表属性

  • 使用 path 设置为表数据设置存储位置。 默认情况下,如果未设置 path,表数据将存储在管道存储位置中。

  • 可以选择性地使用 Python StructType 或 SQL DDL 字符串指定表架构。 下面的示例使用显式指定的架构创建一个名为 sales 的表:

    sales_schema = StructType([
      StructField("customer_id", StringType(), True),
      StructField("number_of_line_items", StringType(), True),
      StructField("order_datetime", StringType(), True),
      StructField("order_number", LongType(), True)]
    )
    
    @dlt.table(
      comment="Raw data on sales",
      schema=sales_schema)
    def sales():
      return ("...")
    
    @dlt.table(
      comment="Raw data on sales",
      schema="customer_id STRING, customer_name STRING, number_of_line_items STRING, order_datetime STRING, order_number LONG")
    def sales():
      return ("...")
    

    默认情况下,如果未指定架构,则增量实时表将从 table 定义推断架构。

Python 库

要指定外部 Python 库,请使用 %pip install 魔术命令。 更新开始时,Delta Live Tables 会在运行任何表定义之前运行包含 %pip install 命令的所有单元格。 管道中包含的每个 Python 笔记本均可访问所有已安装的库。 以下示例将安装一个名为 logger 的包,并使其对管道中的任何 Python 笔记本全局可用:

%pip install logger

from logger import log_info

@dlt.table
def dataset():
    log_info(...)
    return dlt.read(..)

若要安装 Python wheel 包,请将 wheel 路径添加到 %pip install 命令。 已安装的 Python wheel 包可用于管道中的所有表。 以下示例从 DBFS 目录 /dbfs/dlt/ 安装名为 dltfns-1.0-py3-none-any.whl 的 wheel:

%pip install /dbfs/dlt/dltfns-1.0-py3-none-any.whl

请参阅安装包含 %pip 的 wheel 包

Python API 规范

Python 模块

注意

Delta Live Tables Python 接口具有以下限制:

  • 不支持 pivot() 函数。 在数据集定义中使用 pivot() 函数会导致不确定的管道延迟。

增量实时表 Python 函数在 dlt 模块中定义。 利用 Python API 实现的管道必须导入此模块:

import dlt

创建表

要在 Python 中定义表,请应用 @table 装饰器。 @table 装饰器是 @create_table 装饰器的别名。

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

创建视图

要在 Python 中定义视图,请应用 @view 装饰器。 @view 装饰器是 @create_view 装饰器的别名。

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Python 属性

@table 或 @view
name

类型:str

表或视图的可选名称。 如果未定义,将使用函数名称作为表名或视图名称。
comment

类型:str

表的可选说明。
spark_conf

类型:dict

用于执行此查询的 Spark 配置的可选列表。
table_properties

类型:dict

表的表属性可选列表。
path

类型:str

表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。
partition_cols

类型:array

包含一列或多列的可选列表,用于对表进行分区。
架构

类型:strStructType

表的可选架构定义。 架构可以定义为 SQL DDL 字符串,或使用 Python 定义
StructType.
临时

类型:bool

创建临时表。 此表中不保留任何元数据。

默认值为“False”。
表或视图定义
def ()

用于定义数据集的 Python 函数。 如果未设置 name 参数,则使用 <function-name> 作为目标数据集名称。
query

一个 Spark SQL 语句,它返回 Spark Dataset 或 Koalas DataFrame。

使用 dlt.read()spark.table() 从同一管道中定义的数据集执行完整读取操作。 使用 spark.table() 函数从同一管道中定义的数据集读取数据时,在函数参数中的数据集名称前加上 LIVE 关键字。 例如,从名为 customers 的数据集读取数据:

spark.table("LIVE.customers")

还可以使用 spark.table() 函数从元存储中注册的表中读取数据,方法是省略 LIVE 关键字,并选择性地使用数据库名称限定表名称:

spark.table("sales.customers")

使用 dlt.read_stream() 从同一管道中定义的数据集执行流式读取操作。

使用 spark.sql 函数定义 SQL 查询,以创建返回数据集。

使用 PySpark 语法通过 Python 定义 Delta Live Tables 查询。
预期
@expect(“description”, “constraint”)

声明由以下参数确定的数据质量约束:
description. 如果某行违反了预期,则在目标数据集中包含该行。
@expect_or_drop(“description”, “constraint”)

声明由以下参数确定的数据质量约束:
description. 如果某行违反了预期,则从目标数据集中删除该行。
@expect_or_fail(“description”, “constraint”)

声明由以下参数确定的数据质量约束:
description. 如果某行违反了预期,则立即停止执行。
@expect_all(expectations)

声明一个或多个数据质量约束。
expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了其中一个预期,则在目标数据集中包含该行。
@expect_all_or_drop(expectations)

声明一个或多个数据质量约束。
expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了任何预期,则从目标数据集中删除该行。
@expect_all_or_fail(expectations)

声明一个或多个数据质量约束。
expectations 是一个 Python 字典,其中的键是预期说明,值是预期约束。 如果某行违反了任何预期,则立即停止执行。

表属性

除了 Delta Lake 支持的表属性外,还可以设置以下表属性。

表属性
pipelines.autoOptimize.managed

默认:true

启用或禁用此表的自动计划优化。
pipelines.autoOptimize.zOrderCols

默认值:无

一个以逗号分隔的可选列名称列表,用于按 z 顺序对此表进行排序。
pipelines.reset.allowed

默认:true

控制是否允许对此表进行完全刷新。