次の方法で共有


Apache Spark プールを使用したデータ ラングリング (非推奨)

適用対象:Python SDK azureml v1

警告

Python SDK v1 で利用できる Azure Machine Learning との Azure Synapse Analytics の統合は非推奨になっています。 ユーザーは、Azure Machine Learning に登録された Synapse ワークスペースをリンク サービスとして引き続き使用できます。 ただし、新しい Synapse ワークスペースは、リンクされたサービスとして Azure Machine Learning に登録できなくなります。 CLI v2 と Python SDK v2 で利用できるサーバーレス Spark コンピューティングと、アタッチされた Synapse Spark プールの使用をお勧めします。 詳細については、https://aka.ms/aml-spark を参照してください。

この記事では、Azure Synapse Analytics を利用して、Jupyter ノートブック内の専用 Synapse セッション内で、データ ラングリング タスクを対話的に実行する方法を学習します。 これらのタスクは Azure Machine Learning Python SDK を利用します。 Azure Machine Learning パイプラインの詳細については、「機械学習パイプライン内で (Azure Synapse Analytics を利用する) Apache Spark を使用する方法 (プレビュー)」を参照してください。 Synapse ワークスペースで Azure Synapse Analytics を使用する方法の詳細については、「Azure Synapse Analytics の概要シリーズ」を参照してください。

Azure Machine Learning と Azure Synapse Analytics の統合

Azure Synapse Analytics の Azure Machine Learning との統合 (プレビュー) を使用すると、対話的なデータ探索とデータ準備のための (Azure Synapse を利用する) Apache Spark プールをアタッチすることができます。 この統合により、機械学習モデルのトレーニングに使用する Python ノートブック内からそのすべてを利用することができる、大規模なデータ ラングリング用の専用のコンピューティング リソースを確保できます。

前提条件

データ ラングリング タスク用に Synapse Spark プールを起動する

Apache Spark プールを使用したデータ準備を開始するには、アタッチされた Spark Synapse のコンピューティング名を指定します。 この名前は、Azure Machine Learning スタジオの [アタッチされたコンピューティング] タブで確認できます。

get attached compute name

重要

引き続き Apache Spark プールを使用するためには、データ ラングリング タスクでどのコンピューティング リソースを使用するのかを指定する必要があります。 単一コード行には %synapse を使用し、複数行には %%synapse を使用します。

%synapse start -c SynapseSparkPoolAlias

セッションの開始後、以下のようにセッションのメタデータを確認できます。

%synapse meta

Apache Spark セッション中に使用する Azure Machine Learning 環境を指定できます。 この環境に指定された Conda 依存関係のみが有効になります。 Docker イメージはサポートされていません。

警告

環境 Conda 依存関係に指定された Python 依存関係は、Apache Spark プールではサポートされません。 現時点では、固定の Python バージョンのみがサポートされています。Python のバージョンを確認するにはスクリプトに sys.version_info を含めてください

次のコードは、環境変数 myenv を作成し、セッションの開始前に azureml-core のバージョン 1.20.0 および numpy のバージョン 1.17.0 をインストールします。 その後、Apache Spark セッションの start ステートメントにこの環境を含めることができます。


from azureml.core import Workspace, Environment

# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)

カスタム環境内の Apache Spark プールを使用したデータ準備を開始するには、Apache Spark プール名と Apache Spark セッション中に使用する環境を指定します。 サブスクリプション ID、機械学習ワークスペースのリソース グループ、機械学習ワークスペースの名前を指定することができます。

重要

リンクされた Synapse ワークスペースで [セッション レベル パッケージの許可] を必ず有効にしてください。

enable session level packages

%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

ストレージからデータを読み込む

Apache Spark セッションが開始されたら、準備したいデータを読み取ります。 データの読み込みは、Azure Blob Storage および Azure Data Lake Storage Generation 1 および 2 でサポートされます。

これらのストレージ サービスからデータを読み込むには、以下の 2 つの選択肢があります。

  • Hadoop 分散ファイル システム (HDFS) パスを使用して、ストレージから直接データを読み込む

  • 既存の Azure Machine Learning データセットからデータを読み取る

これらのストレージ サービスにアクセスするには、ストレージ BLOB データ閲覧者のアクセス許可が必要です。 これらのストレージ サービスにデータを書き戻すには、ストレージ BLOB データ共同作成者アクセス許可が必要となります。 詳細については、ストレージのアクセス許可とロールに関するページを参照してください。

Hadoop Distributed Files System (HDFS) パスを使用してデータを読み込む

対応する HDFS パスを使用してストレージからデータの読み込みと読み取りを行うには、データ アクセス認証の資格情報が利用できる必要があります。 これらの資格情報は、ストレージの種類によって異なります。 次のコード サンプルは、Shared Access Signature (SAS) トークンまたはアクセス キーを使用して、Azure Blob Storage から Spark データフレームにデータを読み取る方法を示しています。

%%synapse

# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")

# read from blob 
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")

次のコード サンプルは、サービス プリンシパル資格情報を使用して、Azure Data Lake Storage Generation 1 (ADLS Gen 1) からデータを読み取る方法を示しています。

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")

次のコード サンプルは、サービス プリンシパル資格情報を使用して、Azure Data Lake Storage Generation 2 (ADLS Gen 2) からデータを読み取る方法を示しています。

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")

登録済みデータセットからデータを読み取る

Spark データフレームに変換するのであれば、既存の登録済みデータセットをワークスペースに配置し、その上でデータ準備を実行することもできます。 次の例では、ワークスペースに対して認証を行い、BLOB ストレージ内のファイルを参照する登録済みの TabularDataset (blob_dset) を取得し、それを Spark データフレームに変換します。 データセットを Spark データフレームに変換する際は、pyspark のデータ探索ライブラリとデータ準備ライブラリを使用できます。

%%synapse

from azureml.core import Workspace, Dataset

subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()

データ ラングリング タスクを実行する

データを取得して探索した後、データ ラングリング タスクを実行することができます。 次のコード サンプルは、前のセクションの HDFS の例を拡張したものです。 これは、Survivor 列に基づいて、Spark データフレーム df 内のデータのフィルター処理を行い、そのリストを Age によってグループ化します。

%%synapse

from pyspark.sql.functions import col, desc

df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

df.show()

データをストレージに保存して Spark セッションを停止する

データ探索とデータ準備が完了したら、後で使用できるよう準備済みのデータを Azure 上のストレージ アカウントに保存します。 次のコード サンプルでは、準備済みのデータを Azure Blob Storage に書き戻し、training_data ディレクトリにある元の Titanic.csv ファイルを上書きします。 ストレージへの書き戻しを行うには、ストレージ BLOB データ共同作成者のアクセス許可が必要となります。 詳細については、「BLOB データにアクセスするための Azure ロールの割り当て」を参照してください。

%% synapse

df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")

データ準備が完了し、準備済みのデータをストレージに保存したら、次のコマンドを使用して Apache Spark プールの使用を終了します。

%synapse stop

準備済みのデータを表すデータセットを作成する

準備済みのデータをモデル トレーニングに使用する準備ができたら、Azure Machine Learning データストアを使用してストレージに接続し、Azure Machine Learning データセットで使用したいファイルを指定します。

次のコード例は

  • 準備済みのデータを保存したストレージ サービスに接続するデータストアが作成済みであることを前提とします
  • get() メソッドを使用して、既存のデータストア (mydatastore) をワークスペース ws から取得します。
  • mydatastoretraining_data ディレクトリ内にある準備済みのデータ ファイルを参照する FileDataset (train_ds) を作成します
  • 変数 input1 を作成します。 後ほど、この変数を使用して、トレーニング タスクのためにコンピューティング先が利用できる train_ds データセットのデータ ファイルを作成します。
from azureml.core import Datastore, Dataset

datastore = Datastore.get(ws, datastore_name='mydatastore')

datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()

ScriptRunConfig を使用して Synapse Spark プールに実験の実行を送信する

データ ラングリング タスクを自動化して作成する準備ができたら、ScriptRunConfig オブジェクトを使用して、アタッチした Synapse Spark プールに実験の実行を送信できます。 同様に、Azure Machine Learning パイプラインがある場合は、パイプラインでのデータ準備ステップのために、コンピューティング先として Synapse Spark プールを指定するための SynapseSparkStep を使用することができます。 Synapse Spark プールでデータを利用できるかどうかはデータセットの種類で決まります。

  • FileDataset の場合は、as_hdfs() メソッドを使用できます。 実行が送信されると、データセットは Hadoop 分散ファイル システム (HFDS) として Synapse Spark プールで利用できるようになります
  • TabularDataset に対しては、as_named_input() メソッドを使用できます

次のコード サンプルは

  • 前のコード例で作成した FileDataset train_ds から変数 input2 を作成します
  • HDFSOutputDatasetConfiguration クラスを使用して変数 output を作成します。 実行が完了すると、このクラスによって、実行の出力をデータセット test としてデータストア mydatastore 内に保存できるようになります。 Azure Machine Learning ワークスペースでは、test データセットは registered_dataset という名前で登録されます
  • Synapse Spark プールでの実行のために、実行で使用するべき設定を構成します
  • ScriptRunConfig パラメーターを以下のように定義します
    • 実行に dataprep.py スクリプトを使用する
    • 入力として使用するデータと、そのデータを Synapse Spark プールで利用できるようにする方法を指定する
    • 出力データ output を保存する場所を指定する
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config)

run_config.spark.configuration と一般的な Spark 構成に関する詳細は、「SparkConfiguration クラス」と「Apache Spark の構成ドキュメント」を参照してください。

ScriptRunConfig オブジェクトの設定が完了したら、実行を送信できます。

from azureml.core import Experiment 

exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

この例で使用する dataprep.py スクリプトに関する情報を含め、詳細については、ノートブック例を参照してください。

データの準備が完了すると、データをトレーニング ジョブの入力として使用できるようになります。 前述のコード例では、トレーニング ジョブの入力データとして registered_dataset を指定しています。

サンプルの Notebook

より詳しい概念や Azure Synapse Analytics と Azure Machine Learning の統合機能については、以下のノートブック例を参照してください。

次のステップ