Azure Machine Learning SDK v2 でコンポーネントを使用して機械学習パイプラインを作成して実行する

適用対象: Python SDK azure-ai-ml v2 (現行)

この記事では、Python SDK v2 を使って Azure Machine Learning パイプラインを構築し、データの準備、画像分類モデルのトレーニング、モデルのスコア付けという 3 つのステップを含む画像分類タスクを完了する方法について説明します。 機械学習パイプラインは、速度、移植性、再利用によってワークフローを最適化します。これにより、ユーザーはインフラストラクチャや自動化にではなく、機械学習に専念することができます。

この例では、Fashion MNIST データセットの画像を分類するために、小規模な Keras の畳み込みニューラル ネットワークをトレーニングします。 パイプラインは次のようになります。

Screenshot showing pipeline graph of the image classification Keras example.

この記事では、次のタスクを完了します。

  • パイプライン ジョブの入力データを準備する
  • データの準備、トレーニング、スコア付けを行う 3 つのコンポーネントを作成する
  • コンポーネントからパイプラインを構成する
  • コンピューティングを使ってワークスペースにアクセスする
  • パイプライン ジョブを送信する
  • コンポーネントとトレーニングされたニューラル ネットワークの出力を確認する
  • (省略可能) コンポーネントを登録し、ワークスペース内で再利用および共有する

Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。 無料版または有料版の Azure Machine Learning を今すぐお試しください。

前提条件

  • Azure Machine Learning ワークスペース - このワークスペースがまだない場合は、「リソースの作成チュートリアル」を完了してください。

  • Azure Machine Learning Python SDK v2 をインストールした Python 環境 (インストール手順)。概要のセクションを参照してください。 この環境は、Azure Machine Learning リソースを定義および制御するためのものであり、トレーニングのためにランタイムに使用される環境とは分離されています。

  • examples リポジトリを複製する

    トレーニング例を実行するには、最初に examples リポジトリを複製し、sdk ディレクトリに変更します。

    git clone --depth 1 https://github.com/Azure/azureml-examples --branch sdk-preview
    cd azureml-examples/sdk
    

対話型の Python セッションを開始する

この記事では、Azure Machine Learning 用の Python SDK を使用して、Azure Machine Learning パイプラインを作成および制御します。 この記事では、Python REPL 環境または Jupyter ノートブックでコード スニペットを対話的に実行することを前提としています。

この記事は、Azure Machine Learning の例リポジトリの sdk/python/jobs/pipelines/2e_image_classification_keras_minist_convnet ディレクトリにある image_classification_keras_minist_convnet.ipynb ノートブックに基づいています。

必要なライブラリをインポートする

この記事に必要な Azure Machine Learning の必須ライブラリをすべてインポートします。

# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component

パイプライン ジョブの入力データを準備する

この画像分類パイプラインの入力データを準備する必要があります。

Fashion-MNIST は、10 のクラスに分割されたファッションの画像のデータセットです。 各画像は 28x28 のグレースケール画像であり、6 万のトレーニングと 1 万のテストの画像があります。 Fashion-MNIST には従来の MNIST の手書き数字データベースよりも画像分類が難しいという問題があります。 これは、元の手書き数字データベースと同じ圧縮されたバイナリ形式で配布されます。

Web ベースのデータを参照するジョブの入力データを定義するには、以下を実行します。

from azure.ai.ml import Input

fashion_ds = Input(
    path="wasbs://demo@data4mldemo6150520719.blob.core.windows.net/mnist-fashion/"
)

Input を定義することにより、データソースの場所への参照を作成します。 データは既存の場所に残るので、追加のストレージ コストは発生しません。

パイプラインを構築するためのコンポーネントを作成する

画像分類タスクは、データの準備、モデルのトレーニング、モデルのスコア付けという 3 つのステップに分割できます。

Azure Machine Learning コンポーネントは、機械学習パイプラインで 1 つのステップを実行する自己完結型のコードです。 この記事では、画像分類タスクのために 3 つのコンポーネントを作成します。

  • トレーニングとテストのためにデータを準備する
  • トレーニング データを使って画像分類のためにニューラル ネットワークをトレーニングする
  • テスト データを使ってモデルにスコアを付ける

各コンポーネントについて、以下を準備する必要があります。

  1. 実行ロジックを含む Python スクリプトを準備する

  2. コンポーネントのインターフェイスを定義する

  3. 実行時環境、コンポーネントを実行するコマンドなど、コンポーネントのその他のメタデータを追加します。

次のセクションでは、コンポーネントを 2 種類の方法で作成します。最初の 2 つのコンポーネントには Python 関数を使用し、3 つ目のコンポーネントには YAML 定義を使用します。

データ準備コンポーネントを作成する

このパイプラインの最初のコンポーネントを使って、fashion_ds の圧縮されたデータ ファイルを、トレーニング用とスコアリング用の 2 つの csv ファイルに変換します。 このコンポーネントの定義には Python 関数を使います。

Azure Machine Learning 例リポジトリ内の例に従っている場合は、ソース ファイルは既に prep/ フォルダー内にあります。 このフォルダー内には、コンポーネントを構築するファイルが 2 つあります。コンポーネントを定義する prep_component.py と、コンポーネントの実行時環境を定義する conda.yaml です。

Python 関数を使ってコンポーネントを定義する

command_component() 関数をデコレーターとして使うことで、コンポーネントのインターフェイス、メタデータ、実行するコードを Python 関数から簡単に定義できます。 修飾された各 Python 関数は、パイプライン サービスで処理できる 1 つの静的仕様 (YAML) に変換されます。

# Converts MNIST-formatted files at the passed-in input path to training data output path and test data output path
import os
from pathlib import Path
from mldesigner import command_component, Input, Output


@command_component(
    name="prep_data",
    version="1",
    display_name="Prep Data",
    description="Convert data to CSV file, and split to training and test data",
    environment=dict(
        conda_file=Path(__file__).parent / "conda.yaml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
)
def prepare_data_component(
    input_data: Input(type="uri_folder"),
    training_data: Output(type="uri_folder"),
    test_data: Output(type="uri_folder"),
):
    convert(
        os.path.join(input_data, "train-images-idx3-ubyte"),
        os.path.join(input_data, "train-labels-idx1-ubyte"),
        os.path.join(training_data, "mnist_train.csv"),
        60000,
    )
    convert(
        os.path.join(input_data, "t10k-images-idx3-ubyte"),
        os.path.join(input_data, "t10k-labels-idx1-ubyte"),
        os.path.join(test_data, "mnist_test.csv"),
        10000,
    )


def convert(imgf, labelf, outf, n):
    f = open(imgf, "rb")
    l = open(labelf, "rb")
    o = open(outf, "w")

    f.read(16)
    l.read(8)
    images = []

    for i in range(n):
        image = [ord(l.read(1))]
        for j in range(28 * 28):
            image.append(ord(f.read(1)))
        images.append(image)

    for image in images:
        o.write(",".join(str(pix) for pix in image) + "\n")
    f.close()
    o.close()
    l.close()

上記のコードでは、@command_component デコレーターを使って、表示名 Prep Data のコンポーネントを定義しています。

  • name は、コンポーネントの一意の識別子です。

  • version はコンポーネントの現在のバージョンです。 1 つのコンポーネントに複数のバージョンを設定することができます。

  • display_name は、UI 内のコンポーネントを示すわかりやすい表示名であり、一意ではありません。

  • 通常、description を使って、このコンポーネントで完了できるタスクを記述します。

  • environment を使って、このコンポーネントの実行時環境を指定します。 このコンポーネントの環境では、Docker イメージを指定し、conda.yaml ファイルを参照します。

    conda.yaml ファイルには、次のようにコンポーネントに使われるすべてのパッケージが含まれています。

    name: imagekeras_prep_conda_env
    channels:
      - defaults
    dependencies:
      - python=3.7.11
      - pip=20.0
      - pip:
        - mldesigner==0.1.0b4
    
  • prepare_data_component 関数を使って、input_data に対して 1 つの入力を、training_datatest_data に対しては 2 つの出力を定義します。 input_data は入力データのパスです。 training_datatest_data はトレーニング データとテスト データの出力データ パスです。

  • このコンポーネントを使って、input_data のデータをトレーニング データ csv に変換して training_data にして、テスト データ csv を test_data に変換します。

スタジオ UI のコンポーネントは次のような外観です。

  • コンポーネントはパイプライン グラフのブロックです。
  • input_datatraining_datatest_data は、他のコンポーネントに接続してデータのストリーミングを行うコンポーネントのポートです。

Screenshot of the Prep Data component in the UI and code.

これで Prep Data コンポーネントのソース ファイルをすべて準備できました。

モデルのトレーニング コンポーネントを作成する

このセクションでは、Prep Data コンポーネントと同様に、画像分類モデルをトレーニングするコンポーネントを Python 関数で作成します。

違いは、トレーニングのロジックがより複雑になるため、元のトレーニング コードを別の Python ファイルに配置できる点です。

このコンポーネントのソース ファイルは、Azure Machine Learning 例リポジトリtrain/ フォルダーの下にあります。 このフォルダーには、コンポーネントを構築するファイルが 3 つあります。

  • train.py: モデルをトレーニングする実際のロジックが含まれています。
  • train_component.py: コンポーネントのインターフェイスを定義し、train.py の関数をインポートします。
  • conda.yaml: コンポーネントの実行時環境を定義します。

実行ロジックを含むスクリプトを取得する

train.py ファイルには通常の Python 関数が含まれており、これを使うと、画像分類のために Keras ニューラル ネットワークをトレーニングするトレーニング モデル ロジックを実行できます。 コードを確認するには、GitHub 上の train.py ファイルを参照してください。

Python 関数を使ってコンポーネントを定義する

トレーニング関数を適切に定義したら、Azure Machine Learning SDK v2 で @command_component を使って関数をコンポーネントとしてラップし、Azure Machine Learning パイプラインで使用できます。

import os
from pathlib import Path
from mldesigner import command_component, Input, Output


@command_component(
    name="train_image_classification_keras",
    version="1",
    display_name="Train Image Classification Keras",
    description="train image classification with keras",
    environment=dict(
        conda_file=Path(__file__).parent / "conda.yaml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
)
def keras_train_component(
    input_data: Input(type="uri_folder"),
    output_model: Output(type="uri_folder"),
    epochs=10,
):
    # avoid dependency issue, execution logic is in train() func in train.py file
    from train import train

    train(input_data, output_model, epochs)

上記のコードでは、@command_component を使って、表示名 Train Image Classification Keras のコンポーネントを定義しています。

  • keras_train_component 関数を使って、トレーニング データの取得元となる 1 つの入力 input_data、トレーニング時にエポックを指定する 1 つの入力 epochs、モデル ファイルを出力する 1 つの出力 output_model を定義します。 epochs の既定値は 10 です。 このコンポーネントの実行ロジックは、上記の train.pytrain() 関数によるものです。

モデルのトレーニング コンポーネントは、データの準備コンポーネントよりも構成が少し複雑です。 conda.yaml は次のような内容です。

name: imagekeras_train_conda_env
channels:
  - defaults
dependencies:
  - python=3.7.11
  - pip=20.2
  - pip:
    - mldesigner==0.1.0b12
    - azureml-mlflow==1.50.0
    - tensorflow==2.7.0
    - numpy==1.21.4
    - scikit-learn==1.0.1
    - pandas==1.3.4
    - matplotlib==3.2.2
    - protobuf==3.20.0

これで Train Image Classification Keras コンポーネントのソース ファイルをすべて準備できました。

モデルのスコア付けコンポーネントを作成する

このセクションでは、これまでのコンポーネントとは別に、Yaml の仕様とスクリプトを使って、トレーニング済みモデルにスコアを付けるコンポーネントを作成します。

Azure Machine Learning 例リポジトリ内の例に従っている場合は、ソース ファイルは既に score/ フォルダー内にあります。 このフォルダーには、コンポーネントを構築するファイルが 3 つあります。

  • score.py: コンポーネントのソース コードを格納します
  • score.yaml: コンポーネントのインターフェイスとその他の詳細を定義します。
  • conda.yaml: コンポーネントの実行時環境を定義します。

実行ロジックを含むスクリプトを取得する

score.py ファイルには、トレーニング モデル ロジックを実行する通常の Python 関数が含まれています。

from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import Callback
from keras.models import load_model

import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import mlflow


def get_file(f):

    f = Path(f)
    if f.is_file():
        return f
    else:
        files = list(f.iterdir())
        if len(files) == 1:
            return files[0]
        else:
            raise Exception("********This path contains more than one file*******")


def parse_args():
    # setup argparse
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument(
        "--input_data", type=str, help="path containing data for scoring"
    )
    parser.add_argument(
        "--input_model", type=str, default="./", help="input path for model"
    )

    parser.add_argument(
        "--output_result", type=str, default="./", help="output path for model"
    )

    # parse args
    args = parser.parse_args()

    # return args
    return args


def score(input_data, input_model, output_result):

    test_file = get_file(input_data)
    data_test = pd.read_csv(test_file, header=None)

    img_rows, img_cols = 28, 28
    input_shape = (img_rows, img_cols, 1)

    # Read test data
    X_test = np.array(data_test.iloc[:, 1:])
    y_test = to_categorical(np.array(data_test.iloc[:, 0]))
    X_test = (
        X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
    )

    # Load model
    files = [f for f in os.listdir(input_model) if f.endswith(".h5")]
    model = load_model(input_model + "/" + files[0])

    # Log metrics of the model
    eval = model.evaluate(X_test, y_test, verbose=0)

    mlflow.log_metric("Final test loss", eval[0])
    print("Test loss:", eval[0])

    mlflow.log_metric("Final test accuracy", eval[1])
    print("Test accuracy:", eval[1])

    # Score model using test data
    y_predict = model.predict(X_test)
    y_result = np.argmax(y_predict, axis=1)

    # Output result
    np.savetxt(output_result + "/predict_result.csv", y_result, delimiter=",")


def main(args):
    score(args.input_data, args.input_model, args.output_result)


# run script
if __name__ == "__main__":
    # parse args
    args = parse_args()

    # call main function
    main(args)

score.py のコードは、input_datainput_modeloutput_result という 3 つのコマンドライン引数を受け取ります。 このプログラムを実行すると、入力データを使って入力モデルにスコアを付け、スコアリング結果を出力することができます。

Yaml を使ってコンポーネントを定義する

このセクションでは、有効な YAML コンポーネント仕様形式でコンポーネント仕様を作成する方法について説明します。 このファイルによって、次の情報が指定されます。

  • メタデータ: name、display_name、version、type など。
  • インターフェイス: 入力と出力
  • コマンド、コード、および環境: コンポーネントの実行に使用されるコマンド、コード、および環境
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command

name: score_image_classification_keras
display_name: Score Image Classification Keras
inputs:
  input_data: 
    type: uri_folder
  input_model:
    type: uri_folder
outputs:
  output_result:
    type: uri_folder
code: ./
command: python score.py --input_data ${{inputs.input_data}} --input_model ${{inputs.input_model}} --output_result ${{outputs.output_result}}
environment:
  conda_file: ./conda.yaml
  image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
  • name は、コンポーネントの一意の識別子です。 その表示名は Score Image Classification Keras です。
  • このコンポーネントは、2 つの入力と 1 つの出力があります。
  • このソース コード パスは code セクションで定義されており、クラウド内でコンポーネントを実行すると、そのパスのすべてのファイルがこのコンポーネントのスナップショットとしてアップロードされます。
  • command セクションには、このコンポーネントの実行時に実行するコマンドを指定します。
  • environment セクションには、docker イメージと conda yaml ファイルが含まれています。 ソース ファイルはサンプル リポジトリにあります。

これで、モデルのスコア付けコンポーネントのソース ファイルはすべて揃いました。

コンポーネントを読み込んでパイプラインを構築する

Python 関数で定義したデータの準備コンポーネントとモデルのトレーニング コンポーネントの場合は、通常の Python 関数と同様にコンポーネントをインポートできます。

次のコードでは、prep フォルダーの下にある prep_component.py ファイルと、train フォルダーの下にある train_component ファイルから、それぞれ prepare_data_component()keras_train_component() の関数をインポートしています。

%load_ext autoreload
%autoreload 2

# load component function from component python file
from prep.prep_component import prepare_data_component
from train.train_component import keras_train_component

# print hint of components
help(prepare_data_component)
help(keras_train_component)

yaml で定義されたスコア付けコンポーネントの場合は、load_component() 関数を使って読み込むことができます。

# load component function from yaml
keras_score_component = load_component(source="./score/score.yaml")

パイプラインを構築する

これで、パイプラインの構築に使うすべてのコンポーネントと入力データの作成と読み込みを完了しました。 これらをパイプラインに構成することができます。

Note

サーバーレス コンピューティングを使うには、先頭に from azure.ai.ml.entities import ResourceConfiguration を追加します。 そして、次のように置き換えます。

  • default_compute="serverless", を含む default_compute=cpu_compute_target,
  • train_node.resources = "ResourceConfiguration(instance_type="Standard_NC6s_v3",instance_count=2) を含む train_node.compute = gpu_compute_target
# define a pipeline containing 3 nodes: Prepare data node, train node, and score node
@pipeline(
    default_compute=cpu_compute_target,
)
def image_classification_keras_minist_convnet(pipeline_input_data):
    """E2E image classification pipeline with keras using python sdk."""
    prepare_data_node = prepare_data_component(input_data=pipeline_input_data)

    train_node = keras_train_component(
        input_data=prepare_data_node.outputs.training_data
    )
    train_node.compute = gpu_compute_target

    score_node = keras_score_component(
        input_data=prepare_data_node.outputs.test_data,
        input_model=train_node.outputs.output_model,
    )


# create a pipeline
pipeline_job = image_classification_keras_minist_convnet(pipeline_input_data=fashion_ds)

パイプラインには既定のコンピューティング cpu_compute_target があります。つまり、特定のノードにコンピューティングを指定しない場合、そのノードは既定のコンピューティング上で実行されます。

パイプラインは、パイプライン レベルの入力 pipeline_input_data があります。 パイプライン ジョブを送信するときに、パイプライン入力に値を割り当てることができます。

パイプラインには、prepare_data_node、train_node、score_node の 3 つのノードが含まれています。

  • prepare_data_nodeinput_data には pipeline_input_data の値が使われます。

  • train_nodeinput_data は prepare_data_node の training_data 出力のものです。

  • score_node の input_data は prepare_data_node の test_data 出力、input_model は train_node の output_model 出力のものです。

  • train_node によって CNN モデルがトレーニングされるので、そのコンピューティングを gpu_compute_target として指定できます。こうすることで、トレーニングのパフォーマンスが向上します。

パイプライン ジョブを送信する

パイプラインを構築したら、ワークスペースに送信します。 ジョブを送信するには、まずワークスペースに接続する必要があります。

ワークスペースにアクセスする

資格情報を構成する

DefaultAzureCredential を使用してワークスペースにアクセスします。 DefaultAzureCredential を使うと、ほとんどの Azure SDK 認証シナリオに対応できます。

うまくいかない場合は、その他の使用できる資格情報については、資格情報の構成例の記事と azure-identity のリファレンス ドキュメントを参照してください。

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

コンピューティングを使ってワークスペースへのハンドルを取得する

Azure Machine Learning Services を管理する MLClient オブジェクトを作成します。 サーバーレス コンピューティングを使用する場合は、これらのコンピューティングを作成する必要はありません。

# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))

重要

このコード スニペットでは、ワークスペースの構成 json ファイルが現在のディレクトリまたはその親に保存されていることを想定しています。 ワークスペースの作成方法について詳しくは、ワークスペース リソースの作成に関するページを参照してください。 構成をファイルに保存する方法について詳しくは、「ワークスペース構成ファイルを作成する」を参照してください。

パイプライン ジョブをワークスペースに送信する

ワークスペースのハンドルを取得したので、パイプライン ジョブを送信できます。

pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job

上記のコードでは、画像分類パイプライン ジョブを pipeline_samples という実験に送信しています。 実験が存在しない場合は、自動的に作成されます。 pipeline_input_datafashion_ds を使います。

pipeline_job を呼び出すと、次のような出力が生成されます。

Experimentsubmit の呼び出しが迅速完了し、次のような出力が生成されます。

実験 名前 種類 状態 詳細ページ
pipeline_samples sharp_pipe_4gvqx6h1fb pipeline 準備 Azure Machine Learning スタジオへのリンク。

パイプラインの実行を監視するには、リンクを開くか、次を実行して完了するまでブロックします。

# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

重要

パイプラインの初回実行には約 "15 分" かかります。 依存関係をすべてダウンロードする必要があるほか、Docker イメージの作成と Python 環境のプロビジョニングおよび作成が行われます。 パイプラインを再度実行する際は、それらのリソースが作成されるのではなく再利用されるので、時間が大幅に短縮されます。 ただし、パイプラインの総実行時間は、実際のスクリプトのワークロードと、各パイプライン ステップで実行される処理によって異なります。

出力をチェックアウトし、UI でパイプラインをデバッグする

パイプラインのジョブ詳細ページである Link to Azure Machine Learning studio を開くことができます。 次のようなパイプラインのグラフが表示されます。

Screenshot of the pipeline job detail page.

コンポーネントを右クリックして各コンポーネントのログと出力を確認することや、コンポーネントを選んで詳細ウィンドウを開くことができます。 UI でパイプラインをデバッグする方法については、パイプライン エラーをデバッグする方法に関する記事を参照してください。

(省略可能) コンポーネントをワークスペースに登録する

前のセクションでは、3 つのコンポーネントを使ってパイプラインを構築し、E2E で画像分類タスクを完了しました。 ワークスペースにコンポーネントを登録することで、ワークスペース内でコンポーネントを共有したり、再利用したりすることもできます。 たとえば、次のようにデータの準備コンポーネントを登録することができます。

try:
    # try get back the component
    prep = ml_client.components.get(name="prep_data", version="1")
except:
    # if not exists, register component using following code
    prep = ml_client.components.create_or_update(prepare_data_component)

# list all components registered in workspace
for c in ml_client.components.list():
    print(c)

ml_client.components.get() を使うと、登録されているコンポーネントを名前とバージョンを指定して取得できます。 ml_client.components.create_or_update() を使うと、あらかじめ Python 関数または yaml から読み込んだコンポーネントを登録できます。

次の手順