增量实时表数据质量约束
使用预期来定义对数据集内容的数据质量约束。 预期由说明、不变量以及当记录不符合不变量条件时要执行的操作组成。 使用 Python 修饰器或 SQL 约束子句将预期应用于查询。
将 expect、expect or drop 和 expect or fail 预期与 Python 或 SQL 查询结合使用,以定义单个数据质量约束。
可以使用 @expect_all、@expect_all_or_drop 和 @expect_all_or_fail 修饰器在 Python 管道中定义具有一个或多个数据质量约束的预期。 这些修饰器接受 Python 字典作为参数,其中键是预期名称,值是预期约束。
你可以通过查询增量实时表事件日志来查看数据质量指标,例如违反预期的记录数。
保留无效记录
当你想要保留不符合预期的记录时,请使用 expect 运算符。 不符合预期的记录将与有效记录一起添加到目标数据集中:
Python
@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
删除无效记录
使用 expect or drop 运算符可阻止处理无效记录。 不符合预期的记录将从目标数据集中删除:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
出现无效记录时失败
当无效记录不可接受时,使用 expect or fail 运算符在记录未通过验证时立即停止执行。 如果这项操作是表更新,系统会自动回滚事务:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
当管道因预期违规而失败时,必须在重新运行管道之前修复管道代码,以正确处理无效数据。
预期违规会修改转换的 Spark 查询计划,以跟踪检测和报告违规所需的信息。 对于许多查询,可以使用此信息来识别导致违规的输入记录。 下面是一个异常示例:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
多个预期
当未通过验证的记录应包含在目标数据集中时,请使用 expect_all 指定多个数据质量约束:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
当未通过验证的记录应从目标数据集中删除时,请使用 expect_all_or_drop 指定多个数据质量约束:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
当未通过验证的记录应停止管道执行时,请使用 expect_all_or_fail 指定多个数据质量约束:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
你还可以将一组预期定义为变量,并将其传递给管道中的一个或多个查询:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset