你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
教程:在 Jupyter Notebook 中使用 Python SDK v2(预览版)创建生产 ML 管道
适用于:
Python SDK azure-ai-ml v2(预览版)
重要
SDK v2 目前以公共预览版提供。 该预览版在提供时没有附带服务级别协议,建议不要将其用于生产工作负载。 某些功能可能不受支持或者受限。 有关详细信息,请参阅 Microsoft Azure 预览版补充使用条款。
注意
有关使用 SDK v1 生成管道的教程,请参阅教程:生成用于图像分类的 Azure 机器学习管道
在本教程中,你将使用使用 Azure 机器学习 (Azure ML) 通过 AzureML Python SDK v2(预览版)创建生产就绪的机器学习 (ML) 项目。
你将了解如何使用 AzureML Python SDK v2 执行以下操作:
- 连接到 Azure ML 工作区
- 创建 Azure ML 数据资产
- 创建可重用的 Azure ML 组件
- 创建、验证和运行 Azure ML 管道
- 将新训练的模型部署为终结点
- 调用 Azure ML 终结点进行推理
先决条件
- 完成快速入门:Azure 机器学习入门以执行以下操作:
- 创建工作区。
- 创建用于开发环境的基于云的计算实例。
- 创建用于训练模型的基于云的计算群集。
安装 SDK
你将在 Azure 机器学习工作室中完成以下试验设置并运行步骤。 此合并接口包括机器学习工具,所有技能级别的数据科学专业人员均可利用这些工具实现数据科学方案。
首先在计算实例上安装 v2 SDK:
登录到 Azure 机器学习工作室。
选择在先决条件中创建的订阅和工作区。
在左侧选择“计算”。
从“计算实例”列表中,找到你创建的实例。
选择“终端”,在计算实例上打开终端会话。
在终端窗口中,使用以下命令安装 Python SDK v2(预览版):
pip install --pre azure-ai-ml有关详细信息,请参阅安装 Python SDK v2。
克隆 azureml-examples 存储库
现在,在终端上运行以下命令:
git clone --depth 1 https://github.com/Azure/azureml-examples在左侧选择“笔记本”。
现在,在左侧选择“文件”
文件夹列表显示访问该工作区的每个用户。 选择你的文件夹,你将发现 azureml-samples 已克隆。
打开克隆的笔记本
打开已克隆到“用户文件”部分的“tutorials”文件夹 。
从 azureml-examples/tutorials/e2e-ds-experience/ 文件夹中选择 e2e-ml-workflow.ipynb 文件。
在顶部栏上,选择在快速入门:Azure 机器学习入门期间创建的计算实例以用于运行笔记本。
重要
本文的其余部分包含的内容与在笔记本中看到的内容相同。
如果要在继续阅读时运行代码,请立即切换到 Jupyter Notebook。 若要在笔记本中运行单个代码单元,请单击代码单元,然后按 Shift+Enter。 或者,通过从顶部工具栏中选择“全部运行”来运行整个笔记本
简介
在本教程中,你将创建一个 Azure ML 管道来训练用于信用违约预测的模型。 该管道将处理数据准备、训练和注册已训练模型的工作。 然后,你将运行该管道,部署模型并使用该模型。
提交管道后它在 AzureML 门户中显示的情况如下图所示。 这是一个相当简单的管道,我们将使用它来演练 AzureML SDK v2。
两个步骤分别是数据准备和训练。
设置管道资源
可以从 CLI、Python SDK 或工作室界面使用 Azure ML 框架。 在此示例中,你将使用 AzureML Python SDK v2 创建管道。
在创建管道之前,需要设置该管道使用的资源:
- 用于训练的数据集
- 运行管道的软件环境
- 要在其中运行作业的计算资源
连接到工作区
在深入了解代码之前,需要连接到 Azure ML 工作区。 工作区是 Azure 机器学习的顶级资源,为使用 Azure 机器学习时创建的所有项目提供了一个集中的处理位置。
# Handle to the workspace
from azure.ai.ml import MLClient
# Authentication package
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
try:
credential = DefaultAzureCredential()
# Check if given credential can get token successfully.
credential.get_token("https://management.azure.com/.default")
except Exception as ex:
# Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
credential = InteractiveBrowserCredential()
在下一个单元格中,输入你的订阅 ID、资源组名称和工作区名称。 若要查找你的订阅 ID,请执行以下操作:
- 在右上方的 Azure 机器学习工作室工具栏中,选择你的工作区名称。
- 在底部选择“查看 Azure 门户中的所有属性”
- 将 Azure 门户中的值复制到代码中。
# Get a handle to the workspace
ml_client = MLClient(
credential=credential,
subscription_id="<SUBSCRIPTION_ID>",
resource_group_name="<RESOURCE_GROUP>",
workspace_name="<AML_WORKSPACE_NAME>",
)
结果是用于管理其他资源和作业的工作区处理程序。
重要
创建 MLClient 不会连接到工作区。 客户端初始化是惰性操作,它会等待完成首次调用(在以下笔记本中,这种调用发生在数据集注册期间)。
从外部 URL 注册数据
用于训练的数据通常位于以下位置之一:
- 本地计算机
- Web
- 大数据存储服务(例如 Azure Blob、Azure Data Lake Storage、SQL)
Azure ML 使用 Data 对象来注册可重用的数据定义,并在管道中使用数据。 在以下部分,你将使用来自 Web URL 的一些数据作为一个示例。 也可以创建来自其他源的数据。
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes
web_path = "https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls"
credit_data = Data(
name="creditcard_defaults",
path=web_path,
type=AssetTypes.URI_FILE,
description="Dataset for credit card defaults",
tags={"source_type": "web", "source": "UCI ML Repo"},
version="1.0.0",
)
此代码刚刚创建了一个 Data 资产,该资产可用作你将在后续部分定义的管道的输入。 此外,可将数据集注册到工作区,使之可跨管道重用。
注册数据集后,可以:
- 在将来的管道中重用和共享该数据集
- 使用版本来跟踪对该数据集的修改
- 在 Azure ML 设计器(Azure ML 的管道创作 GUI)中使用该数据集
由于这是你首次调用工作区,因此系统可能要求你完成身份验证。 身份验证完成后,你将看到数据集注册完成消息。
credit_data = ml_client.data.create_or_update(credit_data)
print(
f"Dataset with name {credit_data.name} was registered to workspace, the dataset version is {credit_data.version}"
)
将来,可以使用 credit_dataset = ml_client.data.get("<DATA ASSET NAME>", version='<VERSION>') 从工作区提取同一数据集。
创建计算资源以运行管道
Azure ML 管道的每个步骤都可以使用不同的计算资源来运行该步骤的特定作业。 它可以是具有 Linux 或 Windows OS 的单节点或多节点计算机,也可以是 Spark 等特定计算结构。
在本部分中,你将预配 Linux 计算群集。
本教程只需要一个基本群集,因此我们将使用具有 2 个 vCPU 核心、7 GB RAM 的 Standard_DS3_v2 模型并创建一个 Azure ML 计算。
提示
如果你已经有一个计算群集,请将下面代码中的“cpu-cluster”替换为你的群集名称。 这样便不必再创建一个群集。
from azure.ai.ml.entities import AmlCompute
cpu_compute_target = "cpu-cluster"
try:
# let's see if the compute target already exists
cpu_cluster = ml_client.compute.get(cpu_compute_target)
print(
f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
)
except Exception:
print("Creating a new cpu compute target...")
# Let's create the Azure ML compute object with the intended parameters
cpu_cluster = AmlCompute(
# Name assigned to the compute cluster
name="cpu-cluster",
# Azure ML Compute is the on-demand VM service
type="amlcompute",
# VM Family
size="STANDARD_DS3_V2",
# Minimum running nodes when there is no job running
min_instances=0,
# Nodes in cluster
max_instances=4,
# How many seconds will the node running after the job termination
idle_time_before_scale_down=180,
# Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
tier="Dedicated",
)
# Now, we pass the object to MLClient's create_or_update method
cpu_cluster = ml_client.begin_create_or_update(cpu_cluster)
print(
f"AMLCompute with name {cpu_cluster.name} is created, the compute size is {cpu_cluster.size}"
)为管道步骤创建作业环境
到目前为止,你已在计算实例(开发计算机)上创建了一个开发环境。 还需要对管道的每个步骤使用一个环境。 每个步骤可以使用自身的环境,或者,你可以对多个步骤使用某些通用环境。
在此示例中,你将使用 conda yaml 文件为作业创建一个 conda 环境。 首先,创建一个目录来存储文件。
import os
dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)
现在,在依赖项目录中创建文件。
%%writefile {dependencies_dir}/conda.yml
name: model-env
channels:
- conda-forge
dependencies:
- python=3.8
- numpy=1.21.2
- pip=21.2.4
- scikit-learn=0.24.2
- scipy=1.7.1
- pandas>=1.1,<1.2
- pip:
- inference-schema[numpy-support]==1.3.0
- xlrd==2.0.1
- mlflow== 1.26.1
- azureml-mlflow==1.42.0
规范包含一些要在管道中使用的普通包(numpy、pip),以及一些 Azure ML 特定的包(azureml-defaults、azureml-mlflow)。
不是非要包含 Azure ML 包才能运行 Azure ML 作业。 但是,添加这些包可与 Azure ML 交互以便记录指标和注册模型,所有这些指标和模型都包含在 Azure ML 作业中。 稍后你将在本教程的训练脚本中使用它们。
使用 yaml 文件在工作区中创建并注册此自定义环境:
from azure.ai.ml.entities import Environment
custom_env_name = "aml-scikit-learn"
pipeline_job_env = Environment(
name=custom_env_name,
description="Custom environment for Credit Card Defaults pipeline",
tags={"scikit-learn": "0.24.2"},
conda_file=os.path.join(dependencies_dir, "conda.yml"),
image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest",
version="1.0",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)
print(
f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)生成训练管道
创建运行管道所需的所有资产后,接下来可以使用 Azure ML Python SDK v2 生成管道本身。
Azure ML 管道是可重用的 ML 工作流,它通常由多个组件构成。 组件的典型生命周期为:
- 编写组件的 yaml 规范,或使用
ComponentMethod以编程方式创建规范。 - (可选)使用名称和版本在工作区中注册组件,使之可重用且可共享。
- 从管道代码加载该组件。
- 使用组件的输入、输出和参数实现管道
- 提交管道。
创建组件 1:数据准备(使用编程定义)
首先让我们创建第一个组件。 此组件处理数据的预处理。 预处理任务在 data_prep.py python 文件中执行。
首先为 data_prep 组件创建源文件夹:
import os
data_prep_src_dir = "./components/data_prep"
os.makedirs(data_prep_src_dir, exist_ok=True)
此脚本执行一个简单的任务:将数据拆分为训练数据集和测试数据集。
Azure ML 将数据集作为文件夹装载到计算中,因此,我们创建了一个辅助 select_first_file 函数来访问已装载的输入文件夹中的数据文件。
MLFlow 用于在管道运行期间记录参数和指标。
%%writefile {data_prep_src_dir}/data_prep.py
import os
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
import logging
import mlflow
def main():
"""Main function of the script."""
# input and output arguments
parser = argparse.ArgumentParser()
parser.add_argument("--data", type=str, help="path to input data")
parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
parser.add_argument("--train_data", type=str, help="path to train data")
parser.add_argument("--test_data", type=str, help="path to test data")
args = parser.parse_args()
# Start Logging
mlflow.start_run()
print(" ".join(f"{k}={v}" for k, v in vars(args).items()))
print("input data:", args.data)
credit_df = pd.read_excel(args.data, header=1, index_col=0)
mlflow.log_metric("num_samples", credit_df.shape[0])
mlflow.log_metric("num_features", credit_df.shape[1] - 1)
credit_train_df, credit_test_df = train_test_split(
credit_df,
test_size=args.test_train_ratio,
)
# output paths are mounted as folder, therefore, we are adding a filename to the path
credit_train_df.to_csv(os.path.join(args.train_data, "data.csv"), index=False)
credit_test_df.to_csv(os.path.join(args.test_data, "data.csv"), index=False)
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()
准备好可以执行所需任务的脚本后,接下来请从该脚本创建一个 Azure ML 组件。
你将使用可运行命令行操作的通用 CommandComponent。 此命令行操作可以直接调用系统命令或运行脚本。 通过 ${{ ... }} 表示法在命令行中指定输入/输出。
from azure.ai.ml import command
from azure.ai.ml import Input, Output
data_prep_component = command(
name="data_prep_credit_defaults",
display_name="Data preparation for training",
description="reads a .xl input, split the input to train and test",
inputs={
"data": Input(type="uri_folder"),
"test_train_ratio": Input(type="number"),
},
outputs=dict(
train_data=Output(type="uri_folder", mode="rw_mount"),
test_data=Output(type="uri_folder", mode="rw_mount"),
),
# The source folder of the component
code=data_prep_src_dir,
command="""python data_prep.py \
--data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
--train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
""",
environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)
(可选)在工作区中注册组件供将来重用。
创建组件 2:训练(使用 yaml 定义)
要创建的第二个组件将使用训练数据和测试数据,基于模型训练一个树,然后返回输出模型。 你将使用 Azure ML 日志记录功能来记录和可视化学习进度。
前面你使用 CommandComponent 类创建了第一个组件。 这一次,你将使用 yaml 定义来定义第二个组件。 每种方法各有优势。 yaml 定义实际上可以随代码一起签入,并提供易于阅读的历史跟踪。 将内置的类记录和代码完成与使用 CommandComponent 的编程方法相结合可以更容易。
为此组件创建目录:
import os
train_src_dir = "./components/train"
os.makedirs(train_src_dir, exist_ok=True)
如此训练脚本中所示,训练模型后,模型文件将会保存并注册到工作区。 现在可以在推理终结点中使用已注册的模型。
对于此步骤的环境,你将使用一个内置(特选)的 Azure ML 环境。 标记 azureml 告知系统使用该特选环境中的名称。
首先,创建用于描述组件的 yaml 文件:
%%writefile {train_src_dir}/train.yml
# <component>
name: train_credit_defaults_model
display_name: Train Credit Defaults Model
# version: 1 # Not specifying a version will automatically update the version
type: command
inputs:
train_data:
type: uri_folder
test_data:
type: uri_folder
learning_rate:
type: number
registered_model_name:
type: string
outputs:
model:
type: uri_folder
code: .
environment:
# for this step, we'll use an AzureML curate environment
azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:21
command: >-
python train.py
--train_data ${{inputs.train_data}}
--test_data ${{inputs.test_data}}
--learning_rate ${{inputs.learning_rate}}
--registered_model_name ${{inputs.registered_model_name}}
--model ${{outputs.model}}
# </component>
现在创建并注册组件:
# importing the Component Package
from azure.ai.ml import load_component
# Loading the component from the yml file
train_component = load_component(path=os.path.join(train_src_dir, "train.yml"))# Now we register the component to the workspace
train_component = ml_client.create_or_update(train_component)
# Create (register) the component in your workspace
print(
f"Component {train_component.name} with Version {train_component.version} is registered"
)从组件创建管道
定义并注册两个组件后,接下来可以开始实现管道。
在此处,你将使用输入数据、拆分比和已注册模型的名称作为输入变量。 然后调用组件,并通过其输入/输出标识符来连接它们。 可以通过 .outputs 属性访问每个步骤的输出。
load_component() 返回的 python 函数的工作方式,与我们在管道中用来调用每个步骤的任何常规 python 函数的工作方式相同。
若要为管道编写代码,请使用特定的 @dsl.pipeline 修饰器来标识 Azure ML 管道。 在修饰器中,可以指定管道说明和默认资源,例如计算和存储。 像 python 函数一样,管道可以使用输入。 然后,你可以使用不同的输入创建单个管道的多个实例。
在此处,我们使用了输入数据、拆分比和已注册模型的名称作为输入变量。 然后我们调用组件,并通过其输入/输出标识符来连接它们。 可以通过 .outputs 属性访问每个步骤的输出。
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output
@dsl.pipeline(
compute=cpu_compute_target,
description="E2E data_perp-train pipeline",
)
def credit_defaults_pipeline(
pipeline_job_data_input,
pipeline_job_test_train_ratio,
pipeline_job_learning_rate,
pipeline_job_registered_model_name,
):
# using data_prep_function like a python call with its own inputs
data_prep_job = data_prep_component(
data=pipeline_job_data_input,
test_train_ratio=pipeline_job_test_train_ratio,
)
# using train_func like a python call with its own inputs
train_job = train_component(
train_data=data_prep_job.outputs.train_data, # note: using outputs from previous step
test_data=data_prep_job.outputs.test_data, # note: using outputs from previous step
learning_rate=pipeline_job_learning_rate, # note: using a pipeline input as parameter
registered_model_name=pipeline_job_registered_model_name,
)
# a pipeline returns a dictionary of outputs
# keys will code for the pipeline output identifier
return {
"pipeline_job_train_data": data_prep_job.outputs.train_data,
"pipeline_job_test_data": data_prep_job.outputs.test_data,
}
现在,通过管道定义使用所选的数据集、拆分比以及为模型选择的名称来实例化管道。
registered_model_name = "credit_defaults_model"
# Let's instantiate the pipeline with the parameters of our choice
pipeline = credit_defaults_pipeline(
# pipeline_job_data_input=credit_data,
pipeline_job_data_input=Input(type="uri_file", path=web_path),
pipeline_job_test_train_ratio=0.2,
pipeline_job_learning_rate=0.25,
pipeline_job_registered_model_name=registered_model_name,
)提交作业
现在可以提交要在 Azure ML 中运行的作业。 这一次你将对 ml_client.jobs 使用 create_or_update。
在此处,你还将传递一个试验名称。 试验是对特定项目执行的所有迭代的容器。 在同一试验名称下提交的所有作业将在 Azure ML 工作室中彼此相邻地列出。
完成后,管道将在工作区中注册一个模型作为训练结果。
import webbrowser
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
pipeline,
# Project's name
experiment_name="e2e_registered_components",
)
# open the pipeline in web browser
webbrowser.open(pipeline_job.services["Studio"].endpoint)
上面的单元格预期输出“False”。 可以使用上面的单元格中生成的链接来跟踪管道的进度。
选择每个组件时,你将看到有关该组件的结果的详细信息。 在此阶段,需要查看两个重要的部分:
Outputs+logs>user_logs>std_log.txt此部分显示脚本运行 sdtout。Outputs+logs>Metric此部分显示记录的不同指标。 在本例中, mlflowautologging已自动记录训练指标。
将模型部署为联机终结点
现在,将机器学习模型部署为 Azure 云中的 Web 服务。
若要部署机器学习服务,通常需要:
- 要部署的模型资产(归档、元数据)。 你已在训练组件中注册了这些资产。
- 一些要作为服务运行的代码。 这些代码根据给定的输入请求执行模型。 此入口脚本接收提交到已部署的 Web 服务的数据并将其传递给模型,然后将模型的响应返回给客户端。 该脚本特定于你的模型。 入口脚本必须能够理解模型期望和返回的数据。 使用 MLFlow 模型时,如本教程所示,会自动创建此脚本
创建新的联机终结点
注册模型并创建推理脚本后,接下来可以创建联机终结点。 终结点名称需在整个 Azure 区域中唯一。 对于本教程,你将使用 UUID 创建唯一的名称。
import uuid
# Creating a unique name for the endpoint
online_endpoint_name = "credit-endpoint-" + str(uuid.uuid4())[:8]from azure.ai.ml.entities import (
ManagedOnlineEndpoint,
ManagedOnlineDeployment,
Model,
Environment,
)
# create an online endpoint
endpoint = ManagedOnlineEndpoint(
name=online_endpoint_name,
description="this is an online endpoint",
auth_mode="key",
tags={
"training_dataset": "credit_defaults",
"model_type": "sklearn.GradientBoostingClassifier",
},
)
endpoint = ml_client.begin_create_or_update(endpoint)
print(f"Endpint {endpoint.name} provisioning state: {endpoint.provisioning_state}")
创建终结点后,可按如下方式检索它:
endpoint = ml_client.online_endpoints.get(name=online_endpoint_name)
print(
f'Endpint "{endpoint.name}" with provisioning state "{endpoint.provisioning_state}" is retrieved'
)将模型部署到终结点
创建终结点后,使用入口脚本部署模型。 每个终结点可以有多个部署,可以使用规则指定发送到这些部署的直接流量。 在此处,你将创建单个部署来处理 100% 的传入流量。 我们为部署选择了颜色名称,例如蓝色、绿色和红色部署,该名称可任选。
可以查看 Azure ML 工作室中的“模型”页,以识别已注册模型的最新版本。 或者,以下代码可以检索可用的最新版本号。
# Let's pick the latest version of the model
latest_model_version = max(
[int(m.version) for m in ml_client.models.list(name=registered_model_name)]
)
部署模型的最新版本。
注意
此部署预计需要大约 6 到 8 分钟。
# picking the model to deploy. Here we use the latest version of our registered model
model = ml_client.models.get(name=registered_model_name, version=latest_model_version)
# create an online deployment.
blue_deployment = ManagedOnlineDeployment(
name="blue",
endpoint_name=online_endpoint_name,
model=model,
instance_type="Standard_DS3_v2",
instance_count=1,
)
blue_deployment = ml_client.begin_create_or_update(blue_deployment)使用示例查询进行测试
将模型部署到终结点后,接下来可以使用它来运行推理。
按照评分脚本的 run 方法中所需的设计创建示例请求文件。
deploy_dir = "./deploy"
os.makedirs(deploy_dir, exist_ok=True)%%writefile {deploy_dir}/sample-request.json
{
"input_data": {
"columns": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22],
"index": [0, 1],
"data": [
[20000,2,2,1,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0],
[10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8]
]
}
}# test the blue deployment with some sample data
ml_client.online_endpoints.invoke(
endpoint_name=online_endpoint_name,
request_file="./deploy/sample-request.json",
deployment_name="blue",
)清理资源
如果你不打算使用终结点,请将其删除以停止使用该资源。 在删除终结点之前,请确保没有其他任何部署正在使用该终结点。
注意
此步骤预计需要大约 6 到 8 分钟。
ml_client.online_endpoints.begin_delete(name=online_endpoint_name)后续步骤
详细了解 Azure ML 日志记录。

