您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

将 Azure 流分析与 Azure 机器学习集成(预览版)

可以在 Azure 流分析作业中将机器学习模型实现为用户定义的函数 (UDF),以便基于流输入数据执行实时评分和预测。 Azure 机器学习可让你使用任何流行的开源工具(例如 Tensorflow、scikit-learn 或 PyTorch)来准备、训练和部署模型。

先决条件

在将机器学习模型作为函数添加到流分析作业之前,请完成以下步骤:

  1. 使用 Azure 机器学习将模型部署为 Web 服务

  2. 评分脚本应该包含由 Azure 机器学习用来生成架构规范的示例输入和输出。 流分析使用该架构来理解 Web 服务的函数签名。 可以此示例 swagger 定义为参考来确保已正确设置。

  3. 确保 Web 服务接受并返回 JSON 序列化的数据。

  4. Azure Kubernetes 服务中部署模型,以进行大规模生产部署。 如果 Web 服务无法处理来自作业的请求数,流分析作业的性能将会下降,从而影响延迟。 仅当你使用 Azure 门户时,Azure 容器实例中部署的模型才受支持。 流分析中尚不支持使用 Azure 机器学习设计器生成的模型。

将机器学习模型添加到作业

可直接从 Azure 门户或 Visual Studio Code 将 Azure 机器学习函数添加到流分析作业。

Azure 门户

  1. 在 Azure 门户中导航到你的流分析作业,在“作业拓扑”下选择“函数”。 然后从“+添加”下拉菜单中,选择“Azure 机器学习服务” 。

    添加 Azure 机器学习 UDF

  2. 在“Azure 机器学习服务函数”窗体中填写以下属性值:

    配置 Azure 机器学习 UDF

Visual Studio Code

  1. 在 Visual Studio Code 中打开流分析项目,再右键单击“函数”文件夹。 然后,选择“添加函数”。 从下拉列表中选择“机器学习 UDF”。

    在 VS Code 中添加 UDF

    在 VS Code 中添加 Azure 机器学习 UDF

  2. 输入函数名称,然后使用 CodeLens 中的“从订阅中选择”,在配置文件中填写设置。

    在 VS Code 中选择 Azure 机器学习 UDF

    在 VS Code 中配置 Azure 机器学习 UDF

下表描述了流分析中的 Azure 机器学习服务函数的每个属性。

属性 说明
函数别名 输入一个名称以在查询中调用函数。
订阅 你的 Azure 订阅。
Azure 机器学习工作区 用于将模型部署为 Web 服务的 Azure 机器学习工作区。
部署 托管模型的 Web 服务。
函数签名 从 API 的架构规范推理出的 Web 服务签名。 如果签名无法加载,请检查是否已在评分脚本中提供了用于自动生成架构的示例输入和输出。
每个分区的并行请求数 这是一项高级配置,用于优化大规模吞吐量。 此数字表示从作业的每个分区发送到 Web 服务的并发请求数。 具有 6 个或更少流单元 (SU) 的作业有一个分区。 具有 12 个 SU 的作业有两个分区,具有 18 个 SU 的作业有三个分区,依此类推。

例如,如果你的作业有两个分区,而你将此参数设置为 4,那么将会有 8 个并发请求从该作业发送到 Web 服务。 在目前的公共预览版中,此值默认为 20,且无法更新。
最大批数 这是一项高级配置,用于优化大规模吞吐量。 此数字表示在发送到 Web 服务的单个请求中要一起进行批处理的最大事件数。

支持的输入参数

当流分析查询调用某个 Azure 机器学习 UDF 时,作业将创建一个要发送到 Web 服务的 JSON 序列化请求。 该请求基于模型特定的架构。 必须在评分脚本中提供示例输入和输出,以自动生成架构。 该架构使流分析能够为任何支持的数据类型(例如 numpy、pandas 和 PySpark)构造 JSON 序列化请求。 可以在单个请求中对多个输入事件一起进行批处理。

以下流分析查询示例演示如何调用 Azure 机器学习 UDF:

SELECT udf.score(<model-specific-data-structure>)
INTO output
FROM input

流分析仅支持为 Azure 机器学习函数传递一个参数。 在将数据作为输入传递给机器学习 UDF 之前,可能需要准备该数据。 必须确保 ML UDF 的输入不为 NULL,因为 NULL 输入将导致作业失败。

向 UDF 传递多个输入参数

机器学习模型的最常见输入示例是 numpy 数组和数据帧。 可以使用 JavaScript UDF 创建数组,并使用 WITH 子句创建 JSON 序列化的数据帧。

创建输入数组

可以创建接受 N 个输入的 JavaScript UDF,并创建可用作 Azure 机器学习 UDF 输入的数组。

function createArray(vendorid, weekday, pickuphour, passenger, distance) {
    'use strict';
    var array = [vendorid, weekday, pickuphour, passenger, distance]
    return array;
}

将 JavaScript UDF 添加到作业后,可以使用以下查询调用 Azure 机器学习 UDF:

WITH 
ModelInput AS (
#use JavaScript UDF to construct array that will be used as input to ML UDF
SELECT udf.createArray(vendorid, weekday, pickuphour, passenger, distance) as inputArray
FROM input
)

SELECT udf.score(inputArray)
INTO output
FROM ModelInput
#validate inputArray is not null before passing it to ML UDF to prevent job from failing
WHERE inputArray is not null

以下 JSON 是一个示例请求:

{
    "data": [
        ["1","Mon","12","1","5.8"],
        ["2","Wed","10","2","10"]
    ]
}

创建 Pandas 或 PySpark 数据帧

可以使用 WITH 子句来创建可作为输入传递给 Azure 机器学习 UDF 的 JSON 序列化数据帧,如下所示。

以下查询通过选择所需字段创建一个数据帧,并使用该数据帧作为 Azure 机器学习 UDF 的输入。

WITH 
Dataframe AS (
SELECT vendorid, weekday, pickuphour, passenger, distance
FROM input
)

SELECT udf.score(Dataframe)
INTO output
FROM Dataframe

以下 JSON 是来自上述查询的示例请求:

{
    "data": [{
            "vendorid": "1",
            "weekday": "Mon",
            "pickuphour": "12",
            "passenger": "1",
            "distance": "5.8"
        }, {
            "vendorid": "2",
            "weekday": "Tue",
            "pickuphour": "10",
            "passenger": "2",
            "distance": "10"
        }
    ]
}

优化 Azure 机器学习 UDF 的性能

将模型部署到 Azure Kubernetes 服务时,可以分析模型以确定资源利用率。 还可以为部署启用 App Insights,以了解请求速率、响应时间和失败率。

如果你的方案使用较高的事件吞吐量,那么你可能需要更改流分析中的以下参数,以实现最佳性能和较低的端到端延迟:

  1. 最大批计数。
  2. 每个分区的并行请求数。

确定适当的批大小

部署 Web 服务后,发送具有不同批大小的示例请求。批大小从 50 开始,以 100 为增量递增。 例如 200、500、1000、2000,等等。 你将发现,在达到特定的批大小后,响应延迟会增大。 该特定大小(超过该值后,响应延迟会增大)应是作业的最大批计数。

确定每个分区的并行请求数

经过最佳缩放后,流分析作业应该能够将多个并行请求发送到 Web 服务,并在几毫秒内收到响应。 Web 服务的响应延迟可能会直接影响流分析作业的延迟和性能。 如果从作业调用 Web 服务花费了很长时间,那么你可能会发现水印延迟增大,还可能会发现积压的输入事件数量增加。

若要防止此类延迟,请确保已为 Azure Kubernetes 服务 (AKS) 群集预配了适当数量的节点和副本。 Web 服务高度可用并返回成功响应至关重要。 如果作业收到来自 Web 服务的“服务不可用”响应 (503),它会使用指数退避不断重试。 除“成功”(200) 和“服务不可用”(503) 以外的任何响应都会导致作业进入失败状态。

后续步骤