你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

教程 1:使用托管特征存储开发和注册特征集

本系列教程介绍了特征如何无缝集成机器学习生命周期的所有阶段:原型制作、训练和操作化。

可以使用 Azure 机器学习托管特征存储来发现、创建和操作特征。 机器学习生命周期包括原型制作阶段,你可以在此阶段试验各种功能。 它还涉及操作化阶段,在此阶段中,部署模型,推理步骤查找功能数据。 功能充当机器学习生命周期中的结缔组织。 若要详细了解托管特征存储的基本概念,请参阅什么是托管特征存储?了解托管特征存储中的顶级实体

本教程介绍如何使用自定义转换创建特征集规范。 然后,使用该特征集生成训练数据、启用具体化和执行回填。 具体化可计算特征窗口的特征值,然后将这些值存储在具体化存储中。 然后,所有特征查询都可以使用具体化存储中的这些值。

在没有具体化的情况下,特征集查询动态地将转换应用于源,以便在返回值之前计算特征。 这个过程在原型制作阶段运行良好。 但是,对于生产环境中的训练和推理操作,建议具体化这些特征,以实现更高的可靠性和可用性。

本教程是托管特征存储教程系列的第一部分。 在此处,你将了解如何执行以下操作:

  • 创建新的最小特征存储资源。
  • 使用特征转换功能开发和本地测试特征集。
  • 向特征存储注册特征存储实体。
  • 向特征存储注册已开发的特征集。
  • 使用创建的特征生成示例训练数据帧。
  • 对特征集启用脱机具体化,回填特征数据。

本教程系列分为以下两个方向:

  • 仅 SDK 跟踪仅使用 Python SDK。 希望纯粹只基于 Python 进行开发和部署的话,请选择这一方向。
  • SDK 和 CLI 跟踪将 Python SDK 仅用于特征集的开发和测试,将 CLI 用于 CRUD(创建、更新和删除)操作。 此跟踪在首选 CLI/YAML 的持续集成和持续交付 (CI/CD) 或 GitOps 场景中很有用。

先决条件

在继续阅读本教程之前,请确保满足以下先决条件:

  • Azure 机器学习工作区。 有关工作区创建的更多信息,请参阅快速入门:创建工作区资源

  • 在用户帐户上,创建了特征存储的资源组的“所有者”角色。

    如果选择对本教程使用新资源组,则可以通过删除资源组轻松删除所有资源。

准备笔记本环境

本教程使用 Azure 机器学习 Spark 笔记本进行开发。

  1. 在 Azure 机器学习工作室环境下,在左侧窗格中选择“笔记本”,然后选择“示例”选项卡。

  2. 浏览到 featurestore_sample 目录(选择“Samples”>“SDK v2”>“sdk”>“python”>“featurestore_sample”),然后选择“克隆”。

    显示在 Azure 机器学习工作室中选择示例目录的屏幕截图。

  3. 此时会打开“选择目标目录”窗格。 选择“用户”目录,然后选择你的用户名,最后选择“克隆”。

    显示在 Azure 机器学习工作室中为示例资源选择目标目录位置的屏幕截图。

  4. 若要配置笔记本环境,必须上传 conda.yml 文件:

    1. 在左窗格中选择“笔记本”,然后选择“文件”选项卡。
    2. 浏览到 env 目录(选择 用户>your_user_name>featurestore_sample>项目>env),然后选择 conda.yml 文件。
    3. 选择下载

    显示在 Azure 机器学习工作室中选择 Conda YAML 文件的屏幕截图。

    1. 在顶部的导航“计算”下拉列表中选择“无服务器 Spark 计算”。 此操作可能需要一到两分钟。 等待顶部的状态栏显示“配置会话”
    2. 在顶部状态栏中选择“配置会话”
    3. 选择“Python 包”
    4. 选择“上传 conda 文件”
    5. 选择在本地设备上下载的 conda.yml 文件。
    6. (可选)增加会话超时(空闲时间,以分钟为单位)以减少无服务器 Spark 群集启动时间。
  5. 在 Azure 机器学习环境下,打开笔记本,然后选择“配置会话”。

    显示用于配置笔记本会话的选项的屏幕截图。

  6. 在“配置会话”面板上,选择“Python 包”。

  7. 上传 Conda 文件:

    1. 在“Python 包”选项卡上,选择“上传 Conda 文件”。
    2. 浏览到托管 Conda 文件的目录。
    3. 选择“conda.yml”,然后选择“打开”。

    显示托管 Conda 文件的目录的屏幕截图。

  8. 选择“应用”。

    显示 Conda 文件上传的屏幕截图。

启动 Spark 会话

# Run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.
print("start spark session")

为示例设置根目录

import os

# Please update <your_user_alias> below (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

设置 CLI

不适用。

注意

你将使用特征存储跨项目重复使用特征。 你将使用项目工作区(即 Azure 机器学习工作区)利用特征存储中的特征来训练推理模型。 许多项目工作区可以共享和重复使用同一功能商店。

本教程使用两个 SDK:

  • 特征存储 CRUD SDK

    你将使用与 Azure 机器学习工作区配合使用的相同的 MLClient(包名称为 azure-ai-ml)SDK。 功能商店作为一种工作区实现。 因此,此 SDK 用于特征存储、特征集和特征存储实体的 CRUD 操作。

  • 特征存储核心 SDK

    此 SDK (azureml-featurestore) 适用于特征集开发和使用。 本教程的后续步骤介绍了以下操作:

    • 制定特征集规范。
    • 检索特征数据。
    • 列出或获取已注册的特征集。
    • 生成和解析特征检索规范。
    • 使用时间点联接生成训练和推理数据。

本教程不需要明确介绍如何安装这些 SDK,因为前面的 conda.yml 说明已涵盖安装步骤

创建最小功能商店

  1. 设置特征存储参数,包括名称、位置和其他值。

    # We use the subscription, resource group, region of this active project workspace.
    # You can optionally replace them to create the resources in a different subsciprtion/resource group, or use existing resources.
    import os
    
    featurestore_name = "<FEATURESTORE_NAME>"
    featurestore_location = "eastus"
    featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
  2. 创建特征存储。

    from azure.ai.ml import MLClient
    from azure.ai.ml.entities import (
        FeatureStore,
        FeatureStoreEntity,
        FeatureSet,
    )
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    ml_client = MLClient(
        AzureMLOnBehalfOfCredential(),
        subscription_id=featurestore_subscription_id,
        resource_group_name=featurestore_resource_group_name,
    )
    
    
    fs = FeatureStore(name=featurestore_name, location=featurestore_location)
    # wait for feature store creation
    fs_poller = ml_client.feature_stores.begin_create(fs)
    print(fs_poller.result())
  3. 初始化 Azure 机器学习特征存储核心 SDK 客户端。

    如本教程前面所述,特征存储核心 SDK 客户端用于开发和使用特征。

    # feature store client
    from azureml.featurestore import FeatureStoreClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    featurestore = FeatureStoreClient(
        credential=AzureMLOnBehalfOfCredential(),
        subscription_id=featurestore_subscription_id,
        resource_group_name=featurestore_resource_group_name,
        name=featurestore_name,
    )
  4. 向你的用户标识授予特征存储的“Azure 机器学习数据科学家”角色。 从 Azure 门户获取 Microsoft Entra 对象 ID 值,如查找用户对象 ID 中所述。

    将“AzureML 数据科学家”角色分配给用户标识,以便它可以在特征存储工作区中创建资源。 权限传播可能需要一些时间。

    有关访问控制的详细信息,请参阅管理托管特征存储的访问控制

    your_aad_objectid = "<USER_AAD_OBJECTID>"
    
    !az role assignment create --role "AzureML Data Scientist" --assignee-object-id $your_aad_objectid --assignee-principal-type User --scope $feature_store_arm_id

原型和开发功能集

在以下步骤中,你将生成一个名为 transactions 的特征集,该特征集具有基于滚动窗口聚合的特征:

  1. 浏览 transactions 源数据。

    此笔记本使用可公开访问的 Blob 容器中托管的示例数据。 它只能通过 wasbs 驱动程序读取到 Spark 中。 使用自己的源数据创建特征集时,请将其托管在 Azure Data Lake Storage Gen2 帐户中,并在数据路径中使用 abfss 驱动程序。

    # remove the "." in the roor directory path as we need to generate absolute path to read from spark
    transactions_source_data_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet"
    transactions_src_df = spark.read.parquet(transactions_source_data_path)
    
    display(transactions_src_df.head(5))
    # Note: display(training_df.head(5)) displays the timestamp column in a different format. You can can call transactions_src_df.show() to see correctly formatted value
  2. 在本地开发特征集。

    特征集规范是可以在本地开发和测试的特征集的自包含定义。 在此处,你将创建以下滚动窗口聚合特征:

    • transactions three-day count
    • transactions amount three-day sum
    • transactions amount three-day avg
    • transactions seven-day count
    • transactions amount seven-day sum
    • transactions amount seven-day avg

    查看特征转换代码文件:featurestore/featuresets/transactions/transformation_code/transaction_transform.py。 请注意为功能定义的滚动聚合。 这是一个 Spark 转换器。

    若要详细了解特征集和转换,请参阅什么是托管特征存储?

    from azureml.featurestore import create_feature_set_spec
    from azureml.featurestore.contracts import (
        DateTimeOffset,
        TransformationCode,
        Column,
        ColumnType,
        SourceType,
        TimestampColumn,
    )
    from azureml.featurestore.feature_source import ParquetFeatureSource
    
    transactions_featureset_code_path = (
        root_dir + "/featurestore/featuresets/transactions/transformation_code"
    )
    
    transactions_featureset_spec = create_feature_set_spec(
        source=ParquetFeatureSource(
            path="wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet",
            timestamp_column=TimestampColumn(name="timestamp"),
            source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        ),
        feature_transformation=TransformationCode(
            path=transactions_featureset_code_path,
            transformer_class="transaction_transform.TransactionFeatureTransformer",
        ),
        index_columns=[Column(name="accountID", type=ColumnType.string)],
        source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
        temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
        infer_schema=True,
    )
  3. 导出为特征集规范。

    为了向特征存储注册特征集规范,你必须以特定格式保存该规范。

    查看生成的 transactions 特征集规范。 从文件树中打开以下文件以查看规范:featurestore/featuresets/accounts/spec/FeaturesetSpec.yaml。

    该规范包含以下元素:

    • source:对存储资源的引用。 在本例中,它是 blob 存储资源中的 parquet 文件。
    • features:特征及其数据类型的列表。 如果提供转换代码,则该代码必须返回映射到特征和数据类型的数据帧。
    • index_columns:访问特征集中的值所需的联接键。

    若要详细了解规范,请参阅了解托管特征存储中的顶级实体CLI (v2) 特征集 YAML 架构

    保留特征集规范具有另一个好处:特征集规范可以进行源代码管理。

    import os
    
    # Create a new folder to dump the feature set specification.
    transactions_featureset_spec_folder = (
        root_dir + "/featurestore/featuresets/transactions/spec"
    )
    
    # Check if the folder exists, create one if it does not exist.
    if not os.path.exists(transactions_featureset_spec_folder):
        os.makedirs(transactions_featureset_spec_folder)
    
    transactions_featureset_spec.dump(transactions_featureset_spec_folder, overwrite=True)

注册特征存储实体

最佳做法是,实体有助于在使用相同的逻辑实体的功能集中强制使用相同的联接键定义。 实体的示例包括帐户和客户。 实体通常只创建一次,然后在多个特征集之间重用。 若要了解详细信息,请参阅了解托管特征存储中的顶级实体

  1. 初始化特征存储 CRUD 客户端。

    如本教程前面所述,MLClient 用于创建、读取、更新和删除特征存储资产。 以下显示的笔记本代码单元格示例搜索你在前面的步骤中创建的特征存储。 在这里,你无法重复使用之前在本教程中使用的相同 ml_client,因为其范围限定在资源组级别。 设置合适的范围是创建功能商店的先决条件。

    在这个代码示例中,客户端的范围限定为功能商店级别。

    # MLClient for feature store.
    fs_client = MLClient(
        AzureMLOnBehalfOfCredential(),
        featurestore_subscription_id,
        featurestore_resource_group_name,
        featurestore_name,
    )
  2. 向特征存储注册 account 实体。

    创建一个 account 实体,该实体具有 string 类型的联接键 accountID

    from azure.ai.ml.entities import DataColumn, DataColumnType
    
    account_entity_config = FeatureStoreEntity(
        name="account",
        version="1",
        index_columns=[DataColumn(name="accountID", type=DataColumnType.STRING)],
        stage="Development",
        description="This entity represents user account index key accountID.",
        tags={"data_typ": "nonPII"},
    )
    
    poller = fs_client.feature_store_entities.begin_create_or_update(account_entity_config)
    print(poller.result())

将事务功能集注册到功能商店

使用此代码向特征存储注册特征集资产。 然后,可以重复使用该资产,并轻松共享它。 特征集资产注册提供托管功能,包括版本控制与具体化。 本教程系列中的后续步骤介绍了托管功能。

from azure.ai.ml.entities import FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions",
    version="1",
    description="7-day and 3-day rolling aggregation of transactions featureset",
    entities=[f"azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=transactions_featureset_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

了解功能商店 UI

只能通过 SDK 和 CLI 创建和更新功能商店资产。 可以使用 UI 搜索或浏览特征存储:

  1. 打开 Azure 机器学习全球登陆页
  2. 在左窗格中,选择“特征存储”。
  3. 从可访问特征存储的列表中,选择之前在本教程中创建的特征存储。

授予存储 Blob 数据读取者角色对脱机存储中用户帐户的访问权限

必须将存储 Blob 数据读取者角色分配给你在脱机存储中的用户帐户。 这可确保用户帐户可以从脱机具体化存储读取具体化特征数据。

  1. 从 Azure 门户获取 Microsoft Entra 对象 ID 值,如查找用户对象 ID 中所述。

  2. 从特征存储 UI 中的特征存储“概述”页获取有关脱机具体化存储的信息。 可以在“脱机具体化存储”卡中找到脱机具体化存储的存储帐户订阅 ID、存储帐户资源组名称和存储帐户名称的值

    显示功能存储“概述”页上的脱机存储帐户信息的屏幕截图。

    有关访问控制的详细信息,请参阅管理托管特征存储的访问控制

    执行此代码单元以进行角色分配。 权限传播可能需要一些时间。

    # This utility function is created for ease of use in the docs tutorials. It uses standard azure API's.
    # You can optionally inspect it `featurestore/setup/setup_storage_uai.py`.
    import sys
    
    sys.path.insert(0, root_dir + "/featurestore/setup")
    from setup_storage_uai import grant_user_aad_storage_data_reader_role
    
    your_aad_objectid = "<USER_AAD_OBJECTID>"
    storage_subscription_id = "<SUBSCRIPTION_ID>"
    storage_resource_group_name = "<RESOURCE_GROUP>"
    storage_account_name = "<STORAGE_ACCOUNT_NAME>"
    
    grant_user_aad_storage_data_reader_role(
        AzureMLOnBehalfOfCredential(),
        your_aad_objectid,
        storage_subscription_id,
        storage_resource_group_name,
        storage_account_name,
    )

使用已注册的特征集生成训练数据帧

  1. 加载观察数据。

    观察数据通常涉及训练和推理中使用的核心数据。 此数据与功能数据联接以创建完整的训练数据资源。

    观察数据是在事件本身期间捕获的数据。 此处,它具有核心事务数据,包括事务 ID、帐户 ID 和事务量值。 由于你使用它进行训练,因此它还追加目标变量 (is_fraud)。

    observation_data_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/train/*.parquet"
    observation_data_df = spark.read.parquet(observation_data_path)
    obs_data_timestamp_column = "timestamp"
    
    display(observation_data_df)
    # Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value
  2. 获取已注册的特征集并列出其特征。

    # Look up the featureset by providing a name and a version.
    transactions_featureset = featurestore.feature_sets.get("transactions", "1")
    # List its features.
    transactions_featureset.features
    # Print sample values.
    display(transactions_featureset.to_spark_dataframe().head(5))
  3. 选择成为训练数据一部分的特征。 然后,使用特征存储 SDK 来生成训练数据本身。

    from azureml.featurestore import get_offline_features
    
    # You can select features in pythonic way.
    features = [
        transactions_featureset.get_feature("transaction_amount_7d_sum"),
        transactions_featureset.get_feature("transaction_amount_7d_avg"),
    ]
    
    # You can also specify features in string form: featureset:version:feature.
    more_features = [
        f"transactions:1:transaction_3d_count",
        f"transactions:1:transaction_amount_3d_avg",
    ]
    
    more_features = featurestore.resolve_feature_uri(more_features)
    features.extend(more_features)
    
    # Generate training dataframe by using feature data and observation data.
    training_df = get_offline_features(
        features=features,
        observation_data=observation_data_df,
        timestamp_column=obs_data_timestamp_column,
    )
    
    # Ignore the message that says feature set is not materialized (materialization is optional). We will enable materialization in the subsequent part of the tutorial.
    display(training_df)
    # Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value

    时间点联接将功能追加到训练数据。

transactions 特征集上启用脱机具体化

启用特征集具体化后,可以执行回填。 还可以计划重复具体化作业。 有关更多信息,请参阅本系列的教程三

根据功能数据大小在 yaml 文件中设置 spark.sql.shuffle.partitions

Spark 配置 spark.sql.shuffle.partitions 是一个 OPTIONAL 参数,当功能集具体化到脱机存储中时,可能会影响生成的 parquet 文件数(每天)。 此参数的默认值为 200。 最佳做法是避免生成许多小型 parquet 文件。 如果功能集具体化后脱机功能检索变慢,请转到脱机存储中的相应文件夹,检查问题是否涉及过多小 parquet 文件(每天),并相应地调整此参数的值。

注意

此笔记本中使用的示例数据很小。 因此,此参数在 featureset_asset_offline_enabled.yaml 文件中设置为 1。

from azure.ai.ml.entities import (
    MaterializationSettings,
    MaterializationComputeResource,
)

transactions_fset_config = fs_client._featuresets.get(name="transactions", version="1")

transactions_fset_config.materialization_settings = MaterializationSettings(
    offline_enabled=True,
    resource=MaterializationComputeResource(instance_type="standard_e8s_v3"),
    spark_configuration={
        "spark.driver.cores": 4,
        "spark.driver.memory": "36g",
        "spark.executor.cores": 4,
        "spark.executor.memory": "36g",
        "spark.executor.instances": 2,
        "spark.sql.shuffle.partitions": 1,
    },
    schedule=None,
)

fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)
print(fs_poller.result())

还可以将特征集资产保存为 YAML 资源。

## uncomment to run
transactions_fset_config.dump(
    root_dir
    + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled.yaml"
)

回填 transactions 特征集的数据

如前所述,具体化计算特征窗口的特征值,并将这些计算值存储在具体化存储中。 特征具体化可提高计算值的可靠性和可用性。 现在,所有功能查询都使用来自具体化存储的值。 此步骤可对 18 个月的特征窗口执行一次性回填。

注意

可能需要确定回填数据窗口值。 该窗口必须与训练数据的窗口匹配。 例如,若要使用 18 个月的数据进行训练,则必须检索 18 个月的特征。 这意味着应对 18 个月的窗口进行回填。

此代码单元根据定义的特征窗口当前的“无”或“未完成”状态具体化数据

from datetime import datetime
from azure.ai.ml.entities import DataAvailabilityStatus

st = datetime(2022, 1, 1, 0, 0, 0, 0)
et = datetime(2023, 6, 30, 0, 0, 0, 0)

poller = fs_client.feature_sets.begin_backfill(
    name="transactions",
    version="1",
    feature_window_start_time=st,
    feature_window_end_time=et,
    data_status=[DataAvailabilityStatus.NONE],
)
print(poller.result().job_ids)
# Get the job URL, and stream the job logs.
fs_client.jobs.stream(poller.result().job_ids[0])

提示

  • timestamp 列应遵循 yyyy-MM-ddTHH:mm:ss.fffZ 格式。
  • feature_window_start_timefeature_window_end_time 的粒度限制为秒。 datetime 对象中提供的任何毫秒都将被忽略。
  • 仅当特征窗口中的数据与提交回填作业时定义的 data_status 匹配时,才会提交具体化作业。

打印特征集中的示例数据。 输出信息显示数据是从具体化存储中检索的。 get_offline_features() 方法检索训练和推理数据。 默认情况下,它还使用具体化存储。

# Look up the feature set by providing a name and a version and display few records.
transactions_featureset = featurestore.feature_sets.get("transactions", "1")
display(transactions_featureset.to_spark_dataframe().head(5))

进一步探索脱机特征具体化

可以在“具体化作业”UI 中探索特征集的特征具体化状态

  1. 打开 Azure 机器学习全球登陆页

  2. 在左窗格中,选择“特征存储”。

  3. 从可访问的特征存储列表中,选择为其执行回填的特征存储。

  4. 选择“具体化作业”选项卡

    显示功能集“具体化作业”用户界面的屏幕截图。

  • 数据具体化状态可以是

    • 完成(绿色)
    • 未完成(红色)
    • 挂起(蓝色)
    • 无(灰色)
  • 数据间隔表示具有相同数据具体化状态的连续数据部分。 例如,前面的快照在脱机具体化存储中具有 16 个数据间隔

  • 数据最多可以有 2,000 个数据间隔。 如果数据包含的数据间隔超过 2,000 个,请创建新的特征集版本

  • 可以在单个回填作业中提供多个数据状态(例如,["None", "Incomplete"])的列表。

  • 在回填期间,会为定义的特征窗口中的每个数据间隔提交一个新的具体化作业

  • 如果具体化作业处于挂起状态,或者该作业在尚未回填的数据间隔内运行,则不会在该数据间隔内提交新作业

  • 可以重试失败的具体化作业。

    注意

    获取失败的具体化作业的作业 ID:

    • 导航到特征集的“具体化作业”UI
    • 选择“状态”为“失败”的特定作业的“显示名称”
    • 在作业“概述”页上的“名称”属性下找到作业 ID。 它以 Featurestore-Materialization- 开头。

poller = fs_client.feature_sets.begin_backfill(
    name="transactions",
    version=version,
    job_id="<JOB_ID_OF_FAILED_MATERIALIZATION_JOB>",
)
print(poller.result().job_ids)

更新脱机具体化存储

  • 如果必须在特征存储级别更新脱机具体化存储,则特征存储中的所有特征集都应禁用脱机具体化。
  • 如果在功能集上禁用脱机具体化,则将重置脱机具体化存储中已具体化数据的具体化状态。 重置会使已具体化的数据不可用。 启用脱机具体化后,必须重新提交具体化作业。

本教程使用特征存储中的特征生成训练数据,启用脱机特征存储的具体化,并执行回填。 接下来,你将使用这些特征运行模型训练。

清理

本系列的教程五将介绍如何删除资源。

后续步骤