Aracılığıyla paylaş


Apache Spark'ı (Azure Synapse Analytics tarafından desteklenen) makine öğrenmesi işlem hattınızda kullanma (kullanım dışı)

ŞUNUN IÇIN GEÇERLIDIR:Python SDK azureml v1

Uyarı

Python SDK v1'de kullanılabilen Azure Machine Learning ile Azure Synapse Analytics tümleştirmesi kullanım dışıdır. Kullanıcılar, Azure Machine Learning'e kayıtlı Synapse çalışma alanını bağlı hizmet olarak kullanmaya devam edebilir. Öte yandan artık yeni Synapse çalışma alanları Azure Machine Learning'e bağlı hizmet olarak kaydedilemez. CLI v2 ve Python SDK v2'de kullanılabilen sunucusuz Spark işlem ve bağlı Synapse Spark havuzları kullanmanızı öneririz. Daha fazla bilgi için https://aka.ms/aml-spark adresini ziyaret edin.

Bu makalede, Azure Machine Learning işlem hattında veri hazırlama adımı için işlem hedefi olarak Azure Synapse Analytics tarafından desteklenen Apache Spark havuzlarını kullanmayı öğreneceksiniz. Tek bir işlem hattının veri hazırlama veya eğitim gibi belirli bir adıma uygun işlem kaynaklarını nasıl kullanabileceğini öğrenirsiniz. Ayrıca Verilerin Spark adımı için nasıl hazırlandığını ve bir sonraki adıma nasıl geçtiğini öğreneceksiniz.

Önkoşullar

Apache Spark havuzlarınızı bir Azure Synapse Analytics çalışma alanında oluşturup yönetirsiniz. Apache Spark havuzunu bir Azure Machine Learning çalışma alanıyla tümleştirmek için Azure Synapse Analytics çalışma alanına bağlanmanız gerekir. Azure Machine Learning çalışma alanınızı ve Azure Synapse Analytics çalışma alanlarınızı bağladıktan sonra ile bir Apache Spark havuzu ekleyebilirsiniz

  • Azure Machine Learning Studio

  • Python SDK'sı, daha sonra açıklandığı gibi

  • Azure Resource Manager (ARM) şablonu. Daha fazla bilgi için Örnek ARM şablonu'nu ziyaret edin

    • KOMUT satırını kullanarak ARM şablonunu izleyebilir, bağlı hizmeti ekleyebilir ve apache Spark havuzunu şu kod örneğiyle ekleyebilirsiniz:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Önemli

Synapse çalışma alanına başarıyla bağlanmak için Synapse çalışma alanının Sahip rolüne sahip olmanız gerekir. Azure portalda erişiminizi denetleyin.

Bağlı hizmet, oluşturma zamanında sistem tarafından atanan bir yönetilen kimlik (SAI) alır. Spark işini gönderebilmesi için bu bağlantı hizmeti SAI'ye Synapse Studio'dan "Synapse Apache Spark yöneticisi" rolünü atamanız gerekir (bkz . Synapse Studio'da Synapse RBAC rol atamalarını yönetme).

Azure Machine Learning çalışma alanının kullanıcısını kaynak yönetiminin Azure portalından "Katkıda Bulunan" rolüne de sahip olmanız gerekir.

Bu kod, çalışma alanınızda bağlı hizmetlerin nasıl alınacaklarını gösterir:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

İlk olarak, Workspace.from_config() dosyadaki yapılandırmayla Azure Machine Learning çalışma alanınıza erişir config.json . (Daha fazla bilgi için Çalışma alanı yapılandırma dosyası oluşturun). Ardından kod, çalışma alanında kullanılabilen tüm bağlı hizmetleri yazdırır. Son olarak, LinkedService.get() adlı 'synapselink1'bağlı bir hizmeti alır.

Apache spark havuzunuzu Azure Machine Learning için işlem hedefi olarak ekleme

Apache spark havuzunuzu kullanarak makine öğrenmesi işlem hattınızdaki bir adımı desteklemek için, bu kod örneğinde gösterildiği gibi ComputeTarget bunu işlem hattı adımına eklemeniz gerekir:

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

Kod ilk olarak öğesini yapılandırıyor SynapseCompute. linked_service Bağımsız değişken, önceki adımda oluşturduğunuz veya aldığınız nesnedirLinkedService. type Bağımsız değişken olmalıdırSynapseSpark. içindeki SynapseCompute.attach_configuration() bağımsız değişkeninin pool_name Azure Synapse Analytics çalışma alanınızdaki mevcut havuzla eşleşmesi gerekir. Azure Synapse Analytics çalışma alanında Apache spark havuzu oluşturma hakkında daha fazla bilgi için Hızlı Başlangıç: Synapse Studio kullanarak sunucusuz Apache Spark havuzu oluşturma adresini ziyaret edin. Türü attach_config şeklindedir ComputeTargetAttachConfiguration.

Yapılandırma oluşturulduktan sonra ve değerlerini ve ComputeTargetAttachConfiguration makine öğrenmesi ComputeTarget çalışma alanında işlem için başvurmak istediğiniz adı geçirerek Workspace bir makine öğrenmesi oluşturun. çağrısı ComputeTarget.attach() zaman uyumsuz olduğundan, çağrı tamamlanana kadar örnek engellenir.

Bağlantılı Apache Spark havuzunu kullanan bir SynapseSparkStep oluşturma

Apache Spark havuzundaki örnek not defteri Spark işi, basit bir makine öğrenmesi işlem hattını tanımlar. İlk olarak, not defteri önceki adımda tanımlanan tarafından synapse_compute desteklenen bir veri hazırlama adımı tanımlar. Ardından not defteri, eğitim için daha uygun bir işlem hedefi tarafından desteklenen bir eğitim adımı tanımlar. Örnek not defteri, veri girişini ve çıkışını göstermek için Titanic hayatta kalma veritabanını kullanır. Aslında verileri temizlemez veya tahmine dayalı bir model oluşturmaz. Bu örnek gerçekten eğitim içermediğinden, eğitim adımı ucuz, CPU tabanlı bir işlem kaynağı kullanır.

Veriler, tablo verilerini veya dosya kümelerini tutabilen nesneler aracılığıyla DatasetConsumptionConfig bir makine öğrenmesi işlem hattına akar. Veriler genellikle çalışma alanı veri deposundaki blob depolamadaki dosyalardan gelir. Bu kod örneği, makine öğrenmesi işlem hattı için giriş oluşturan tipik kodu gösterir:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

Kod örneği, dosyanın Titanic.csv blob depolama alanında olduğunu varsayar. Kod, dosyanın hem de olarak TabularDatasetFileDatasetnasıl okunmasını gösterir. Bu kod yalnızca tanıtım amaçlıdır, çünkü girişleri yinelemek veya tek bir veri kaynağını hem tablo içeren bir kaynak olarak hem de kesinlikle dosya olarak yorumlamak kafa karıştırıcı olabilir.

Önemli

Giriş olarak kullanmak FileDataset için en az 1.20.0sürümünü azureml-core kullanmanız gerekir. Bunu daha sonra açıklandığı gibi sınıfıyla Environment belirtebilirsiniz. Bir adım tamamlandığında, çıkış verilerini şu kod örneğinde gösterildiği gibi depolayabilirsiniz:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

Bu kod örneğinde datastore , verileri adlı testbir dosyada depolar. Veriler makine öğrenmesi çalışma alanında adıyla registered_datasetolarak Datasetkullanılabilir.

Bir işlem hattı adımında verilere ek olarak adım başına Python bağımlılıkları da olabilir. Ayrıca, tek tek SynapseSparkStep nesneler tam Azure Synapse Apache Spark yapılandırmasını belirtebilir. Bunu göstermek için aşağıdaki kod örneği, paket sürümünün azureml-core en az 1.20.0olması gerektiğini belirtir. Daha önce belirtildiği gibi, giriş olarak kullanmak FileDataset için paket için azureml-core bu gereksinim gereklidir.

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

Bu kod, Azure Machine Learning işlem hattında tek bir adım belirtir. environment Bu kodun değeri belirli azureml-core bir sürümü ayarlar ve kod gerektiğinde diğer conda veya pip bağımlılıklarını ekleyebilir.

Alt SynapseSparkStep dizini yerel bilgisayardan sıkıştırıp karşıya yükler ./code . Bu dizin işlem sunucusunda yeniden oluşturulur ve adım bu dizinden betiği çalıştırır dataprep.py . inputs Bu adımın ve outputs sayısı, daha önce ele alınan , step1_input2ve step1_output nesneleridirstep1_input1. Betik içindeki bu değerlere erişmenin dataprep.py en kolay yolu, bunları adlı argumentsile ilişkilendirmektir.

Oluşturucunun sonraki bağımsız değişken SynapseSparkStep kümesi Apache Spark'ı denetler. compute_target, 'link1-spark01' daha önce işlem hedefi olarak eklediğimiz hedeftir. Diğer parametreler kullanmak istediğimiz belleği ve çekirdekleri belirtir.

Örnek not defteri için dataprep.pybu kodu kullanır:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Bu "veri hazırlama" betiği herhangi bir gerçek veri dönüştürme işlemi gerçekleştirmez, ancak verileri alma, Spark veri çerçevesine dönüştürme ve bazı temel Apache Spark işleme işlemlerinin nasıl yapılacağını gösterir. çıktıyı Azure Machine Learning stüdyosu bulmak için alt işi açın, Çıkışlar + günlükler sekmesini seçin ve bu ekran görüntüsünde gösterildiği gibi dosyayı açınlogs/azureml/driver/stdout:

Screenshot of Studio showing stdout tab of child job

İşlem hattında kullanma SynapseSparkStep

Sonraki örnek, önceki bölümde oluşturulan çıktıyı SynapseSparkStepkullanır. İşlem hattındaki diğer adımlar kendi benzersiz ortamlarına sahip olabilir ve eldeki göreve uygun farklı işlem kaynaklarında çalıştırılabilir. Örnek not defteri küçük bir CPU kümesinde "eğitim adımını" çalıştırır:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

Bu kod gerekirse yeni işlem kaynağını oluşturur. Ardından, sonucu eğitim adımının girişine dönüştürür step1_output . bu as_download() seçenek, verilerin işlem kaynağına taşınarak daha hızlı erişime neden olduğu anlamına gelir. Veriler yerel işlem sabit sürücüsüne sığmayacak kadar büyükse, dosya sistemiyle FUSE veri akışı yapmak için seçeneğini kullanmanız as_mount() gerekir. compute_target Bu ikinci adımın adı, 'cpucluster'veri hazırlama adımında kullandığınız kaynak değildir'link1-spark01'. Bu adım, önceki adımda kullandığınız betik yerine dataprep.py basit train.py bir betik kullanır. Örnek not defterinde betiğin train.py ayrıntıları bulunur.

Tüm adımlarınızı tanımladıktan sonra işlem hattınızı oluşturabilir ve çalıştırabilirsiniz.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

Bu kod, Azure Synapse Analytics () ve eğitim adımı (step_1step_2) tarafından desteklenen Apache Spark havuzlarında veri hazırlama adımından oluşan bir işlem hattı oluşturur. Azure, yürütme grafını hesaplama adımları arasındaki veri bağımlılıklarını inceler. Bu durumda yalnızca bir basit bağımlılık vardır. Burada, step2_input mutlaka gerektirir step1_output.

Çağrı pipeline.submit , gerekirse adlı synapse-pipelinebir Deneme oluşturur ve zaman uyumsuz olarak içinde bir İş başlatır. İşlem hattındaki tek tek adımlar bu ana işin Alt İşleri olarak çalıştırılır ve Studio'nun Denemeler sayfası bu adımları izleyebilir ve gözden geçirebilir.

Sonraki adımlar