你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure 流分析查询中的输入验证

输入验证是用于防范格式错误的事件或意外事件影响主要查询逻辑的一种技术。 查询已升级为显式处理和检查记录,因此它们不会破坏主要逻辑。

为了实现输入验证,我们向查询添加了两个初始步骤。 首先确保提交给核心业务逻辑的架构符合其预期。 然后,会审异常,并可以选择将无效记录路由到辅助输出中。

带有输入验证的查询的结构如下:

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

若要查看设置了输入验证的查询综合示例,请参阅以下部分:具有输入验证的查询示例

本文演示了如何实现此技术。

上下文

Azure 流分析 (ASA) 作业处理来自流的数据。 流是传输的已序列化原始数据序列(CSV、JSON、AVRO...)。若要从流中读取数据,应用程序需要知道所用的特定序列化格式。 在 ASA 中,必须在配置流输入时定义事件序列化格式。

反序列化数据后,需要应用一种架构,以赋予数据的意义。 架构是指流中字段的列表及其各自的数据类型。 使用 ASA 时,无需在输入级别设置传入数据的架构。 ASA 原生支持动态输入架构。 它预期字段(列)的列表及其类型在事件(行)之间会发生更改。 当未显式提供数据类型的情况下,ASA 还会推断数据类型,并根据需要尝试隐式强制转换类型。

动态架构处理是一项强大的功能,在流处理中发挥了关键的作用。 数据流通常包含来自多个源的数据,这些源具有多种事件类型,其中每种类型都有唯一的架构。 若要路由、筛选和处理此类流上的事件,不管这些事件的架构如何,ASA 都必须引入所有这些事件。

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

但是,动态架构处理提供的功能有一个潜在的缺点。 意外的事件可能会流经主要查询逻辑并破坏该逻辑。 例如,我们可以对 NVARCHAR(MAX) 类型的字段使用 ROUND。 ASA 会将其隐式转换为浮点值以匹配 ROUND 的签名。 在此处,我们预期或希望此字段始终包含数值。 但是,当我们收到字段设置为 "NaN" 的事件时,或者完全缺少该字段时,作业可能会失败。

通过输入验证,我们可将初步步骤添加到查询,以处理这种格式错误的事件。 我们主要使用 WITHTRY_CAST 来实现此方法。

场景:对不可靠事件生成者进行输入验证

我们将生成一个新的 ASA 作业,用于从单个事件中心引入数据。 与大多数情况下一样,我们无需对数据生成方负责。 此处的生成方是指多个硬件供应商销售的 IoT 设备。

与利益干系人面谈后,我们就序列化格式和架构达成了一致。 所有设备将此类消息推送到通用事件中心,即 ASA 作业的输入。

架构协定的定义如下:

字段名称 字段类型 字段说明
deviceId Integer 唯一的设备标识符
readingTimestamp datetime 由中心网关生成的消息时间
readingStr 字符串
readingNum Numeric
readingArray 字符串数组

这种定义在 JSON 序列化下提供了以下示例消息:

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

我们已经可以看到架构协定与其实现之间的差异。 在 JSON 格式中,日期/时间没有数据类型。 日期/时间将作为字符串传输(参阅前面的 readingTimestamp)。 ASA 可以轻松解决该问题,但会显示需要验证和显式强制转换类型。 对于在 CSV 中序列化的数据而言更是如此,因为所有值随后将作为字符串传输。

另外还有一种差异。 ASA 使用自身的类型系统,而该系统与传入的类型不匹配。 如果 ASA 对整数 (bigint)、日期/时间、字符串 (nvarchar(max)) 和数组使用内置类型,则它仅通过浮点值支持数值。 对于大多数应用程序而言,这种不匹配并不是问题。 但在某些特殊情况下,它可能会导致精度略有偏差。 在这种情况下,我们会在新字段中将数值转换为字符串。 然后在下游,我们使用支持定点小数的系统来检测并纠正潜在的偏差。

回到我们的查询,在此处我们打算:

  • readingStr 传递给 JavaScript UDF
  • 统计数组中的记录数
  • readingNum 舍入到小数点后的第二位
  • 将数据插入 SQL 表中

目标 SQL 表具有以下架构:

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

良好的做法是在作业执行过程中将发生的情况映射到每个字段:

字段 输入 (JSON) 继承的类型 (ASA) 输出 (Azure SQL) 评论
deviceId 数字 bigint integer
readingTimestamp 字符串 nvarchar(MAX) datetime2
readingStr 字符串 nvarchar(MAX) nvarchar(200) 由 UDF 使用
readingNum 数字 FLOAT decimal(18,2) 有待舍入
readingArray array(string) array of nvarchar(MAX) integer 有待计数

先决条件

我们将使用“ASA 工具”扩展在 Visual Studio Code 中开发查询。 此教程的前几个步骤将引导你安装所需的组件。

在 VS Code 中,我们将本地运行和本地输入/输出结合使用,以避免产生成本,并加速调试循环。 不需要设置事件中心或 Azure SQL 数据库。

基础查询

让我们从一个不使用输入验证的基本实现开始。 我们将在下一部分添加输入验证。

我们将在 VS Code 中创建一个新的 ASA 项目

input 文件夹中创建一个名为 data_readings.json 的新 JSON 文件,并在其中添加以下记录:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

然后定义一个名为 readings 的本地输入,并在其中引用上面创建的 JSON 文件。

配置后,该本地输入应如下所示:

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

通过预览数据,可以看到我们的记录已正确加载。

右键单击 Functions 文件夹并选择 ASA: Add Function,创建名为 udfLen 的新 JavaScript UDF。 使用的代码是:

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

本地运行中不需要定义输出。 除非有多个输出,否则甚至不需要使用 INTO。 在 .asaql 文件中,可将现有查询替换为:

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

让我们快速浏览一下提交的查询:

  • 若要统计每个数组中的记录数,首先需要将记录解包。 我们将使用 CROSS APPLYGetArrayElements()此处提供了更多示例)
    • 这样,我们显示了查询中的两个数据集:原始输入和数组值。 为了确保不会混淆字段,我们定义了别名 (AS r) 并在每个位置使用了这些别名
    • 然后,若要对数组值实际运行 COUNT,需要使用 GROUP BY 进行聚合
    • 为此,必须定义一个时间窗口。 此处由于我们的逻辑不需要时间窗口,因此快照窗口是正确的选择
  • 我们还必须对所有字段运行 GROUP BY,并将其全部投影到 SELECT 中。 显式投影字段是良好的做法,因为 SELECT * 会让错误从输入流向输出
    • 如果我们定义了时间窗口,可能需要使用 TIMESTAMP BY 定义时间戳。 在此处,无需定义时间戳即可使逻辑正常工作。 对于本地运行,如果不使用 TIMESTAMP BY,所有记录将加载到单个时间戳,即运行开始时间。
  • 使用 UDF 来筛选 readingStr 少于两个字符的读数。 在此处我们应已使用 LEN。 我们只是出于演示目的使用了 UDF

可以启动运行并观察正在处理的数据:

deviceId readingTimestamp readingStr readingNum arrayCount
1 2021-12-10T10:00:00 一个字符串。 1.71 2
2 2021-12-10T10:01:00 另一个字符串 2.38 1
3 2021-12-10T10:01:20 第三个字符串 -4.85 3
1 2021-12-10T10:02:10 第四个字符串 1.21 2

现在我们知道查询可正常运行,接下来让我们针对更多数据测试查询。 将 data_readings.json 的内容替换为以下记录:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

在此处我们发现了以下问题:

  • 设备 #1 一切正常
  • 设备 #2 忘记了包含 readingStr
  • 设备 #3 将 NaN 作为数字发送
  • 设备 #4 发送了空记录而不是数组

现在运行作业不会获得正常的结果。 我们会收到以下错误消息之一:

设备 2 将提供:

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

设备 3 将提供:

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

设备 4 将提供:

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

每次都允许格式错误的记录在未经验证的情况下从输入流向主要查询逻辑。 现在我们认识到了输入验证的价值。

实现输入验证

让我们扩展上述查询以验证输入。

输入验证的第一步是定义核心业务逻辑的架构预期。 回顾原始要求,我们的主要逻辑是:

  • readingStr 传递给 JavaScript UDF 以度量其长度
  • 统计数组中的记录数
  • readingNum 舍入到小数点后的第二位
  • 将数据插入 SQL 表中

对于每一点,我们可以列出预期:

  • UDF 需要一个字符串类型的不能为 null 的参数(此处为 nvarchar(max))
  • GetArrayElements() 需要一个数组类型的参数,或 null 值
  • Round 需要一个 bigint 或浮点类型的参数,或 null 值
  • 我们不应依赖 ASA 执行隐式强制转换,而应该自行执行这种强制转换,并在查询中处理类型冲突

一种方法是调整主要逻辑来处理这些异常。 但在本例中,我们相信主要逻辑是完美的。 因此让我们改为验证传入的数据。

首先,使用 WITH 添加一个输入验证层作为查询的第一步。 我们将使用 TRY_CAST 将字段转换为其预期类型,并在转换失败时将其设置为 NULL

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

对于我们使用的最后一个输入文件(包含错误的文件),此查询将返回以下集:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00 一个字符串。 1.7145 ["A","B"] 1 2021-12-10T10:00:00Z 一个字符串。 1.7145 ["A","B"]
2 2021-12-10T10:01:00 Null 2.378 ["C"] 2 2021-12-10T10:01:00Z Null 2.378 ["C"]
3 2021-12-10T10:01:20 第三个字符串 NaN ["D","E","F"] 3 2021-12-10T10:01:20Z 第三个字符串 NULL ["D","E","F"]
4 2021-12-10T10:02:10 第四个字符串 1.2126 {} 4 2021-12-10T10:02:10Z 第四个字符串 1.2126 NULL

可以看到,上述两个错误正在得到解决。 我们已将 NaN{} 转换为 NULL。 现在,我们确信这些记录已正确插入到目标 SQL 表中。

现在必须确定如何处理存在缺失值或无效值的记录。 经过一番讨论,我们决定拒绝存在空的/无效 readingArray 或缺失 readingStr 的记录。

因此我们添加了另一个层,用于会审验证输出与主要逻辑之间的记录:

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

良好的做法是为两个输出编写一个 WHERE 子句,并在第二个输出中使用 NOT (...)。 这样,任何记录都不能从两个输出中排除并丢失。

现在我们获得了两个输出。 Debug1 包含要发送到主要逻辑的记录:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00Z 一个字符串。 1.7145 ["A","B"]
3 2021-12-10T10:01:20Z 第三个字符串 Null ["D","E","F"]

Debug2 包含要拒绝的记录:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 Null 2.378 ["C"] 2 2021-12-10T10:01:00Z NULL 2.378 ["C"]
4 2021-12-10T10:02:10 第四个字符串 1.2126 {} 4 2021-12-10T10:02:10Z 第四个字符串 1.2126 NULL

最后一步是将主要逻辑添加回来。 我们还将添加用于收集拒绝记录的输出。 此处最好是使用不强制执行强类型化的输出适配器,例如存储帐户。

在最后一部分可以找到完整查询。

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

这样,我们将获得以下 SQLOutput 集,并且不会出错:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00Z 一个字符串。 1.7145 2
3 2021-12-10T10:01:20Z 第三个字符串 Null 3

其他两条记录已发送到 BlobOutput 供人工审查和后期处理。 我们的查询现在是安全的。

具有输入验证的查询示例

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

扩展输入验证

GetType 可用于显式检查类型。 它能够很好地处理投影中的 CASE 或设置级别的 WHEREGetType 还可用于根据元数据存储库动态检查传入的架构。 可以通过引用数据集加载存储库。

单元测试是确保查询具有复原能力的良好做法。 我们将构建一系列由输入文件及其预期输出组成的测试。 我们的查询必须与它生成的输出相匹配才能通过测试。 在 ASA 中,单元测试是通过 asa-streamanalytics-cicd npm 模块完成的。 应在部署管道中创建并测试具有各种格式错误事件的测试用例。

最后,可以在 VS Code 中进行一些小规模的集成测试。 可以通过在本地运行以生成实时输出将记录插入 SQL 表中。

获取支持

如需获取进一步的帮助,可前往 Azure 流分析的 Microsoft 问答页面

后续步骤