次の方法で共有


機械学習パイプライン内で (Azure Synapse Analytics で実行される) Apache Spark を使用する方法 (非推奨)

適用対象:Python SDK azureml v1

警告

Python SDK v1 で使用できる Azure Machine Learning とのAzure Synapse Analytics の統合は非推奨になっています。 ユーザーは、Azure Machine Learning にリンク サービスとして登録された Synapse ワークスペースを、引き続き使用できます。 ただし、新しい Synapse ワークスペースは、リンクされたサービスとして Azure Machine Learning に登録できなくなります。 CLI v2 と Python SDK v2 で利用できるサーバーレス Spark コンピューティングと、アタッチされた Synapse Spark プールを使うことをお勧めします。 詳細については、https://aka.ms/aml-spark を参照してください。

この記事では、Azure Synapse Analytics を利用する Apache Spark プールを、Azure Machine Learning パイプラインのデータ準備ステップのコンピューティング先として使う方法を説明します。 データの準備やトレーニングなどの特定のステップに適したコンピューティング リソースを 1 つのパイプラインで使用する方法について説明します。 データを Spark ステップ用に準備する方法と、次のステップに渡す方法についても説明します。

前提条件

Apache Spark プールは、Azure Synapse Analytics ワークスペースで作成して管理します。 Apache Spark プールを Azure Machine Learning ワークスペースと統合するには、Azure Synapse Analytics ワークスペースにリンクする必要があります。 Azure Machine Learning ワークスペースと Azure Synapse Analytics ワークスペースをリンクしたら、次のものを使って Apache Spark プールをアタッチできます

  • Azure Machine Learning スタジオ

  • Python SDK (後で説明します)

  • Azure Resource Manager (ARM) テンプレート。 詳しくは、ARM テンプレートの例をご覧ください

    • コマンド ラインを使って ARM テンプレートに従い、リンク サービスを追加し、次のコード サンプルを使って Apache Spark プールをアタッチできます。
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

重要

Synapse ワークスペースに正常にリンクするには、ユーザーが Synapse ワークスペースの所有者ロールを付与されている必要があります。 Azure portal でご自身のアクセス権を確認してください。

リンク サービスは、作成時にシステム割り当てマネージド ID (SAI) を取得します。 このリンク サービスの SAI に Synapse Studio から "Synapse Apache Spark 管理者" ロールを割り当てて、それが Spark ジョブを送信できるようにする必要があります (「Synapse Studio で Synapse RBAC ロールの割り当てを管理する方法」をご覧ください)。

また、Azure Machine Learning ワークスペースのユーザーに、Azure portal のリソース管理から "共同作成者" ロールを付与する必要があります。

このコードは、ワークスペース内のリンク サービスを取得する方法を示したものです。

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

最初に、Workspace.from_config()config.json ファイル内の構成を使って Azure Machine Learning ワークスペースにアクセスします。 (詳しくは、ワークスペース構成ファイルの作成に関する記事をご覧ください)。 次に、このコードにより、ワークスペースで利用できるすべてのリンク サービスが出力されます。 最後に、LinkedService.get() によって、'synapselink1' という名前のリンクされたサービスが取得されます。

Azure Machine Learning のコンピューティング先として Apache Spark プールをアタッチする

機械学習パイプラインのステップで Apache Spark プールを利用するには、次のコード サンプルで示すように、それをパイプライン ステップの ComputeTarget としてアタッチする必要があります。

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

コードは、最初に SynapseCompute を構成します。 linked_service 引数は、前の手順で作成または取得した LinkedService オブジェクトです。 type 引数は SynapseSpark である必要があります。 SynapseCompute.attach_configuration()pool_name 引数は、Azure Synapse Analytics ワークスペースの既存のプールのそれと一致している必要があります。 Azure Synapse Analytics ワークスペースでの Apache Spark プールの作成について詳しくは、「クイック スタート: Synapse Studio を使用してサーバーレス Apache Spark プールを作成する」をご覧ください。 attach_config の型は ComputeTargetAttachConfiguration

構成を作成した後、WorkspaceComputeTargetAttachConfiguration の値、および Machine Learning ワークスペース内でコンピューティングの参照に使う名前を渡して、機械学習の ComputeTarget を作成します。 ComputeTarget.attach() の呼び出しは非同期であるため、このサンプルは、呼び出しが完了するまでブロックされます。

リンクされた Apache Spark プールを使用する SynapseSparkStep を作成する

サンプル ノートブックの Apache spark プールの Spark ジョブでは、単純な機械学習パイプラインが定義されています。 このノートブックでは、最初に、前のステップで定義した synapse_compute を利用したデータ準備手順が定義されています。 次に、ノートブックでは、トレーニングにいっそう適したコンピューティング先を利用するトレーニング ステップが定義されます。 サンプル ノートブックでは、タイタニック号の生存者のデータベースを使って、データの入力と出力を示します。 実際にデータをクリーンアップしたり、予測モデルを作成したりすることはありません。 このサンプルではトレーニングが実際に行われることはないため、トレーニング ステップでは安価な CPU ベースのコンピューティング リソースを使います。

データは、DatasetConsumptionConfig オブジェクトを通して機械学習パイプラインに送られます。このオブジェクトは表形式のデータまたはファイルのセットを保持できます。 データは、多くの場合、ワークスペースのデータストア内にある BLOB ストレージのファイルから取得されます。 このコード サンプルでは、機械学習パイプラインに対する入力を作成する一般的なコードを示します。

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

このコード サンプルでは、Titanic.csv ファイルが BLOB ストレージにあることが想定されています。 このコードでは、TabularDatasetFileDataset の両方としてファイルを読み取る方法を示します。 このコードはデモだけを目的としたものです。これは、入力を複製したり、単一のデータ ソースを、テーブルを含むリソースと、厳密なファイルの両方として解釈したりすると、ややこしくなるためです。

重要

FileDataset を入力として使うには、azureml-core のバージョン 1.20.0 以降が必要です。 これは、後で説明するように、Environment クラスで指定できます。 ステップが完了したら、次のコード サンプルで示すように、出力データを格納できます。

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

このコード サンプルの datastore は、データを test という名前のファイルに格納します。 データは、Machine Learning ワークスペース内で registered_dataset という名前の Dataset として使用できます。

パイプラインのステップには、データに加えて、ステップごとの Python の依存関係がある場合があります。 さらに、個々の SynapseSparkStep オブジェクトでは、正確な Azure Synapse Apache Spark 構成を指定できます。 これを示すため、次のコード サンプルでは、azureml-core パッケージのバージョンが 1.20.0 以上である必要があることを指定しています。 前に説明したように、FileDataset を入力として使うには、azureml-core パッケージについてのこの要件が必要です。

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

このコードでは、Azure Machine Learning パイプラインの 1 つのステップが指定されています。 このコードの environment の値では特定の azureml-core バージョンが設定され、必要に応じて他の conda または pip の依存関係をコードで追加できます。

SynapseSparkStep は、ローカル コンピューターの ./code サブディレクトリを ZIP に圧縮してアップロードします。 コンピューティング サーバー上にそのディレクトリが再作成され、ステップによってそのディレクトリから dataprep.py スクリプトが実行されます。 そのステップの inputsoutputs は、前に説明した step1_input1step1_input2step1_output オブジェクトです。 dataprep.py スクリプト内でこれらの値にアクセスする最も簡単な方法は、これらを arguments という名前に関連付けることです。

SynapseSparkStep コンストラクターに対する次の引数セットは、Apache Spark を制御します。 compute_target は、以前にコンピューティング先としてアタッチされた 'link1-spark01' です。 その他のパラメーターでは、使用するメモリとコアが指定されます。

サンプル ノートブックでは、dataprep.py に対して次のコードを使います。

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

この "データ準備" スクリプトでは実際のデータ変換は行われませんが、データの取得、Spark データフレームへの変換、基本的な Apache Spark 操作の方法がわかります。 Azure Machine Learning スタジオで出力を確認するには、次のスクリーンショットで示すように、子ジョブを開き、[出力とログ] タブを選んで、logs/azureml/driver/stdout ファイルを開きます。

Screenshot of Studio showing stdout tab of child job

パイプラインで SynapseSparkStep を使用する

次の例では、前のセクションで作成された SynapseSparkStep の出力を使っています。 パイプライン内の他のステップには、それら独自の環境があり、これらは手持ちのタスクに適したさまざまなコンピューティング リソース上で実行される場合があります。 サンプル ノートブックでは、小規模な CPU クラスターで "トレーニング ステップ" が実行されます。

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

このコードでは、必要に応じて新しいコンピューティング リソースが作成されます。 その後、step1_output の結果がトレーニング ステップの入力に変換されます。 as_download() オプションは、データがコンピューティング リソースに移動されるため、アクセスが高速になることを意味します。 データが大きすぎてローカル コンピューティングのハード ドライブに収まらない場合は、as_mount() オプションを使って、FUSE ファイル システムでデータをストリーミングする必要があります。 この 2 番目のステップの compute_target'cpucluster' で、データ準備ステップで使用した 'link1-spark01' リソースではありません。 このステップでは、前のステップで使った dataprep.py スクリプトの代わりに、簡単な train.py スクリプトを使います。 サンプル ノートブックには、詳細な train.py スクリプトが含まれています。

すべてのステップを定義した後は、パイプラインを作成して実行できます。

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

このコードでは、Azure Synapse Analytics (step_1) を利用する Apache Spark プール上のデータ準備ステップと、トレーニング ステップ (step_2) で構成されるパイプラインが作成されます。 Azure は、ステップ間のデータの依存関係を調べて、実行グラフを計算します。 この場合は、単純な依存関係が 1 つだけあります。 ここでは、step2_input には step1_output が必ず必要です。

pipeline.submit の呼び出しにより、必要に応じて synapse-pipeline という名前の実験が作成され、その中でジョブが非同期的に開始されます。 パイプライン内の個々のステップは、このメイン ジョブの子ジョブとして実行され、スタジオの [実験] ページでそれらのステップを監視および確認できます。

次のステップ