增量实时表数据质量约束

使用预期来定义对数据集内容的数据质量约束。 预期由说明、不变量以及当记录不符合不变量条件时要执行的操作组成。 使用 Python 修饰器或 SQL 约束子句将预期应用于查询。

expectexpect or dropexpect or fail 预期与 Python 或 SQL 查询结合使用,以定义单个数据质量约束。

可以使用 @expect_all@expect_all_or_drop@expect_all_or_fail 修饰器在 Python 管道中定义具有一个或多个数据质量约束的预期。 这些修饰器接受 Python 字典作为参数,其中键是预期名称,值是预期约束。

你可以通过查询增量实时表事件日志来查看数据质量指标,例如违反预期的记录数。

保留无效记录

当你想要保留不符合预期的记录时,请使用 expect 运算符。 不符合预期的记录将与有效记录一起添加到目标数据集中:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

删除无效记录

使用 expect or drop 运算符可阻止处理无效记录。 不符合预期的记录将从目标数据集中删除:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

出现无效记录时失败

当无效记录不可接受时,使用 expect or fail 运算符在记录未通过验证时立即停止执行。 如果这项操作是表更新,系统会自动回滚事务:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

当管道因预期违规而失败时,必须在重新运行管道之前修复管道代码,以正确处理无效数据。

预期违规会修改转换的 Spark 查询计划,以跟踪检测和报告违规所需的信息。 对于许多查询,可以使用此信息来识别导致违规的输入记录。 下面是一个异常示例:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

多个预期

当未通过验证的记录应包含在目标数据集中时,请使用 expect_all 指定多个数据质量约束:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

当未通过验证的记录应从目标数据集中删除时,请使用 expect_all_or_drop 指定多个数据质量约束:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

当未通过验证的记录应停止管道执行时,请使用 expect_all_or_fail 指定多个数据质量约束:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

你还可以将一组预期定义为变量,并将其传递给管道中的一个或多个查询:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset