变更数据捕获与增量实时表结合使用

注意

本文介绍如何根据源数据中的更改来更新增量实时表管道中的表。 若要了解如何记录和查询 Delta 表的行级更改信息,请参阅更改数据馈送

重要

公共预览版提供对 SCD 类型 2 的 Delta Live Tables 支持。

可以在增量实时表中使用变更数据捕获 (CDC) 来根据源数据的更改更新表。 增量实时表 SQL 和 Python 接口支持 CDC。 Delta Live Tables 支持更新具有渐变维度 (SCD) 类型 1 和类型 2 的表:

  • 使用 SCD 类型 1 直接更新记录。 不为更新后的记录保留历史记录。
  • 使用 SCD 类型 2 可保留对记录的所有更新的历史记录。

为了表示更改的有效期,SCD 类型 2 使用生成的 __START_AT__END_AT 列存储每个更改。 Delta Live Tables 使用 SQL 中的 SEQUENCE BY 或 Python 中的 sequence_by 指定的列来生成 __START_AT__END_AT 列。

注意

__START_AT__END_AT 列的数据类型与指定的 SEQUENCE BY 字段的数据类型相同。

SQL

使用 APPLY CHANGES INTO 语句启用增量实时表 CDC 功能:

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
子句
KEYS

唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。

该语句是必需的。
WHERE

应用于源和目标以触发优化(例如分区修剪)的条件。 此条件不能用于删除源行;源中的所有 CDC 行都必须满足此条件,否则将引发错误。 使用 WHERE 子句是可选的,应在你的处理需要特定优化时使用。

此子句是可选的。
IGNORE NULL UPDATES

允许引入包含目标列子集的更新。 当 CDC 事件匹配现有行并指定 IGNORE NULL UPDATES 时,具有 null 的列将在目标中保留其现有值。 这也适用于值为 null 的嵌套列。

此子句是可选的。

默认设置是用 null 值覆盖现有列。
APPLY AS DELETE WHEN

指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 为了处理乱序数据,被删除的行被暂时保留为基础 Delta 表中的无效标记,并在元存储中创建一个视图来筛选掉这些无效标记。 保留间隔可以配置为
pipelines.cdc.tombstoneGCThresholdInSeconds表属性

此子句是可选的。
SEQUENCE BY

指定源数据中 CDC 事件的逻辑顺序的列名。 增量实时表使用此排序来处理乱序到达的更改事件。

该语句是必需的。
COLUMNS

指定要包含在目标表中的列子集。 可以:

* 指定要包含的完整列列表:COLUMNS (userId, name, city)
* 指定要排除的列列表:COLUMNS * EXCEPT (operation, sequenceNum)

此子句是可选的。

当未指定 COLUMNS 子句时,默认设置是包含目标表中的所有列。
STORED AS

将记录存储为 SCD 类型 1 还是 SCD 类型 2。

此子句是可选的。

默认值为 SCD 类型 1。

INSERTUPDATE 事件的默认行为是从源更新插入 CDC 事件:更新目标表中与指定键匹配的任何行或在目标表中不存在匹配记录时插入新行。 可以使用 APPLY AS DELETE WHEN 条件指定对 DELETE 事件的处理。

Python

通过 Python API 中的 apply_changes() 函数使用增量实时表 CDC 功能。 Delta Live Tables Python CDC 接口还提供 create_streaming_live_table() 函数。 可以使用此函数创建 apply_changes() 函数所需的目标表。 参阅示例查询

应用更改函数

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>
)
自变量
目标

类型:str

要更新的表的名称。 可以在执行 apply_changes() 函数之前使用 create_streaming_live_table() 函数创建目标表。

此参数是必需的。
source

类型:str

包含 CDC 记录的数据源。

此参数是必需的。
keys

类型:list

唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。

可以指定以下任一项:

* 字符串列表:["userId", "orderId"]
* Spark SQL col() 函数列表:[col("userId"), col("orderId"]

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是必需的。
sequence_by

类型:strcol()

指定源数据中 CDC 事件的逻辑顺序的列名。 增量实时表使用此排序来处理乱序到达的更改事件。

可以指定以下任一项:

* 字符串:"sequenceNum"
* Spark SQL col() 函数:col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是必需的。
ignore_null_updates

类型:bool

允许引入包含目标列子集的更新。 当 CDC 事件与现有行匹配且 ignore_null_updatesTrue 时,具有 null 的列将在目标中保留其现有值。 这也适用于值为 null 的嵌套列。 当 ignore_null_updatesFalse 时,现有值将被 null 值覆盖。

此参数是可选的。

默认值为 False
apply_as_deletes

类型:strexpr()

指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 为了处理乱序数据,被删除的行被暂时保留为基础 Delta 表中的无效标记,并在元存储中创建一个视图来筛选掉这些无效标记。 保留间隔可以配置为
pipelines.cdc.tombstoneGCThresholdInSeconds表属性

可以指定以下任一项:

* 字符串:"Operation = 'DELETE'"
* Spark SQL expr() 函数:expr("Operation = 'DELETE'")

此参数是可选的。
column_list
except_column_list

类型:list

要包含在目标表中的列的子集。 使用 column_list 指定要包含的列的完整列表。 使用 except_column_list 指定要排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)

此参数是可选的。

当没有 column_listexcept_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。
stored_as_scd_type

类型:strint

将记录存储为 SCD 类型 1 还是 SCD 类型 2。

对于 SCD 类型 1,将其设置为 1;对于 SCD 类型 2,将其设置为 2

此子句是可选的。

默认值为 SCD 类型 1。

INSERTUPDATE 事件的默认行为是从源更新插入 CDC 事件:更新目标表中与指定键匹配的任何行或在目标表中不存在匹配记录时插入新行。 可以使用 apply_as_deletes 参数指定 DELETE 事件的处理。

为输出记录创建目标表

使用 create_streaming_live_table() 函数为 apply_changes() 输出记录创建目标表。

注意

不建议使用 create_target_table() 函数。 Databricks 建议更新现有代码以使用 create_streaming_live_table() 函数。

create_streaming_live_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition"
)
参数
name

类型:str

表名称。

此参数是必需的。
comment

类型:str

表的可选说明。
spark_conf

类型:dict

用于执行此查询的 Spark 配置的可选列表。
table_properties

类型:dict

表的表属性可选列表。
partition_cols

类型:array

包含一列或多列的可选列表,用于对表进行分区。
path

类型:str

表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。
架构

类型:strStructType

表的可选架构定义。 架构可以定义为 SQL DDL 字符串,或使用 Python 定义
StructType.

在指定 apply_changes 目标表的架构时,还必须包含具有与 sequence_by 字段相同数据类型的 __START_AT__END_AT 列。 例如,如果目标表具有 key, STRINGvalue, STRINGsequencing, LONG 列:

create_streaming_live_table(
  name = "target",
  comment = "Target for CDC ingestion.",
  partition_cols=["value"],
  path="$tablePath",
  schema=
    StructType(
      [
        StructField('key', StringType()),
        StructField('value', StringType()),
        StructField('sequencing', LongType()),
        StructField('__START_AT', LongType()),
        StructField('__END_AT', LongType())
      ]
    )
)

注意

  • 必须确保在执行 APPLY CHANGES INTO 查询或 apply_changes 函数之前创建目标表。 参阅示例查询
  • 不会提供目标表的指标(例如输出行数)。
  • 即使没有更改任何列,SCD 类型 2 更新也会为每个输入行添加一个历史记录行。
  • APPLY CHANGES INTO 查询或 apply_changes 函数的目标不可用作流式实时表的源。 从 APPLY CHANGES INTO 查询或 apply_changes 函数的目标读取的表必须是实时表。
  • APPLY CHANGES INTO 查询或 apply_changes() 函数不支持期望。 要对源或目标数据集使用期望,请执行以下操作:
    • 通过定义具有所需期望的中间表来添加对源数据的期望,并将此数据集用作目标表的源。
    • 使用从目标表读取输入数据的下游表添加对目标数据的期望。

表属性

添加了下表属性以控制 DELETE 事件的无效标记管理行为:

表属性
pipelines.cdc.tombstoneGCThresholdInSeconds

设置此值以匹配无序数据之间的最高预期间隔。
pipelines.cdc.tombstoneGCFrequencyInSeconds

控制检查以清理无效标记的频率。

默认值:5 分钟

示例

这些示例演示基于源事件更新目标表的 Delta Live Tables SCD 类型 1 和类型 2 查询:

  1. 创建新的用户记录。
  2. 删除用户记录。
  3. 更新用户记录。 在 SCD 类型 1 示例中,最后的 UPDATE 操作延迟到达并从目标表中删除,展示了无序事件的处理。

下面是这些示例的输入记录。

userId name city operation sequenceNum
124 Raul 瓦哈卡 INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
123 null null DELETE 5
125 Mercedes Guadalajara UPDATE 5
125 Mercedes Mexicali UPDATE 4
123 Isabel 奇瓦瓦 UPDATE 4

运行 SCD 类型 1 示例后,目标表包含以下记录:

userId name city
124 Raul 瓦哈卡
125 Mercedes Guadalajara

运行 SCD 类型 2 示例后,目标表包含以下记录:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 4
123 Isabel 奇瓦瓦 4 5
124 Raul 瓦哈卡 1 null
125 Mercedes Tijuana 2 4
125 Mercedes Mexicali 4 5
125 Mercedes Guadalajara 5 Null

生成测试数据

若要为此示例创建测试记录,请执行以下操作:

  1. 转到 Azure Databricks 登陆页,选择“创建笔记本”,或单击边栏中的 “创建”图标“创建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。

  2. 在“创建笔记本”对话框中为笔记本命名,例如“生成测试 CDC 记录”。 从“默认语言”下拉菜单中选择“SQL”。

  3. 如果有正在运行的群集,则会显示“群集”下拉列表。 选择要将笔记本附加到的群集。 在创建笔记本后,还可以创建要附加到的新群集。

  4. 单击“创建”。

  5. 复制以下查询并将其粘贴到新笔记本的第一个单元中:

    CREATE SCHEMA IF NOT EXISTS cdc_data;
    
    CREATE TABLE
      cdc_data.users
    AS SELECT
      col1 AS userId,
      col2 AS name,
      col3 AS city,
      col4 AS operation,
      col5 AS sequenceNum
    FROM (
      VALUES
      -- Initial load.
      (123, "Isabel",   "Monterrey",   "INSERT", 1),
      (124, "Raul",     "Oaxaca",      "INSERT", 1),
      -- One new user.
      (125, "Mercedes", "Tijuana",     "INSERT", 2),
      -- Isabel is removed from the system and Mercedes moved to Guadalajara.
      (123, null,       null,          "DELETE", 5),
      (125, "Mercedes", "Guadalajara", "UPDATE", 5),
      -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
      (123, "Isabel",   "Chihuahua",   "UPDATE", 4),
      (125, "Mercedes", "Mexicali",    "UPDATE", 4)
    );
    
  6. 若要运行笔记本并填充测试记录,请在最右侧的单元操作菜单 单元操作 中单击 “运行”图标 并选择“运行单元”,或按 Shift+Enter。

创建并运行 SCD 类型 1 示例管道

  1. 转到 Azure Databricks 登陆页,选择“创建笔记本”,或单击边栏中的 “创建”图标“创建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。
  2. 在“创建笔记本”对话框中为笔记本命名,例如“DLT CDC 示例”。 根据首选语言从“默认语言”下拉菜单中选择“Python”或“SQL”。 可将“群集”设置保留为默认值。 增量实时表运行时在运行管道之前创建群集。
  3. 单击“创建”。
  4. 复制 Python 或 SQL 查询并将其粘贴到笔记本的第一个单元中
  5. 创建新管道并在“笔记本库”字段中添加该笔记本。 若要发布管道处理输出,可以选择性地在“目标”字段中输入数据库名称。
  6. 启动管道。 如果配置了“目标”值,则可以查看和验证查询结果

示例查询

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_live_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

创建并运行 SCD 类型 2 示例管道

  1. 转到 Azure Databricks 登陆页,选择“创建笔记本”,或单击边栏中的 “创建”图标“创建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。
  2. 在“创建笔记本”对话框中为笔记本命名,例如“DLT CDC 示例”。 根据首选语言从“默认语言”下拉菜单中选择“Python”或“SQL”。 可将“群集”设置保留为默认值。 增量实时表运行时在运行管道之前创建群集。
  3. 单击“创建”。
  4. 复制 Python 或 SQL 查询并将其粘贴到笔记本的第一个单元中
  5. 创建新管道并在“笔记本库”字段中添加该笔记本。 若要发布管道处理输出,可以选择性地在“目标”字段中输入数据库名称。
  6. 启动管道。 如果配置了“目标”值,则可以查看和验证查询结果

示例查询

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_live_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;