Upgrade kroku paralelního spuštění na sadu SDK v2
V sadě SDK v2 se "Krok paralelního spuštění" konsoliduje do konceptu úlohy jako parallel job
. Paralelní úloha udržuje stejný cíl, aby uživatelé mohli zrychlit provádění úloh prostřednictvím distribuce opakovaných úloh na výkonných výpočetních clusterech s více uzly. Kromě kroku paralelního spuštění nabízí paralelní úloha v2 další výhody:
- Flexibilní rozhraní, které umožňuje uživateli definovat více vlastních vstupů a výstupů pro paralelní úlohu. Můžete je propojit s dalšími kroky pro využívání nebo správu jejich obsahu ve vstupním skriptu.
- Zjednodušte si vstupní schéma, které nahradí
Dataset
jako vstup pomocí konceptu v2data asset
. Jako vstupy pro paralelní úlohu můžete snadno použít místní soubory nebo identifikátor URI adresáře objektů blob. - Výkonnější funkce jsou vyvíjeny pouze v paralelních úlohách v2. Obnovte například neúspěšnou nebo zrušenou paralelní úlohu, abyste mohli pokračovat ve zpracování neúspěšných nebo nezpracovaných minidávek opětovným použitím úspěšného výsledku, abyste ušetřili duplicitní úsilí.
Pokud chcete upgradovat aktuální krok paralelního spuštění sady SDK verze 1 na verzi 2, budete muset provést
- Slouží
parallel_run_function
k vytvoření paralelní úlohy nahrazenímParallelRunConfig
aParallelRunStep
ve verzi 1. - Upgradujte kanál v1 na verzi 2. Pak vyvolejte paralelní úlohu v2 jako krok v kanálu v2. Podrobnosti o upgradu kanálu najdete v tématu Upgrade kanálu z verze 1 na v2 .
Poznámka: Skript pro zadávání uživatele je kompatibilní mezi krokem paralelního spuštění v1 a paralelní úlohou v2. Při upgradu úlohy paralelního spuštění tak můžete dál používat stejný entry_script.py.
Tento článek obsahuje porovnání scénářů v sadě SDK v1 a SDK v2. V následujících příkladech vytvoříme paralelní úlohu, která bude predikovat vstupní data v úloze kanálů. Dozvíte se, jak vytvořit paralelní úlohu a jak ji použít v úloze kanálu pro sadu SDK v1 i SDK v2.
Požadavky
- Příprava prostředí sady SDK v2: Instalace sady Azure Machine Learning SDK v2 pro Python
- Vysvětlení základu kanálu sady SDK v2: Vytvoření kanálu Služby Azure Machine Learning pomocí sady Python SDK v2
Vytvoření paralelního kroku
Sada SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SADA SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Použití paralelního kroku v kanálu
Sada SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SADA SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mapování klíčových funkcí v sadě SDK v1 a SDK v2
Funkce v sadě SDK v1 | Hrubé mapování v sadě SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Výstup |
as_mount datové sady | Vstup |
Mapování a konfigurace paralelních úloh
Sada SDK v1 | SADA SDK v2 | Description |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Prostředí, ve které se bude spouštět trénovací úloha. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Uživatelský skript, který se spustí paralelně na více uzlech. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Počet neúspěšných mini dávek, které by bylo možné v této paralelní úloze ignorovat. Pokud je počet neúspěšných minidávek vyšší než tato prahová hodnota, bude paralelní úloha označena jako neúspěšná. -1 je výchozí číslo, což znamená ignorovat všechny neúspěšné minidávce během paralelní úlohy. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Agregujte všechny výnosy z každého spuštění minidávkové dávky a vypíšete je do tohoto souboru. Může odkazovat na jeden z výstupů paralelní úlohy pomocí výrazu ${{outputs.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Volitelný počet instancí nebo uzlů používaných cílovým výpočetním objektem Výchozí hodnota je 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Maximální paralelismus, který má každá výpočetní instance. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Definujte velikost jednotlivých minidávek pro rozdělení vstupu. Pokud je input_data složka nebo sada souborů, definuje toto číslo počet souborů pro každou minidávku. Například 10, 100. Pokud input_data jsou tabulková data z mltable , toto číslo definuje fyzickou velikost proximát pro každou minidávku. Výchozí jednotka je Bajt a hodnota může přijímat řetězec, jako je 100 kB, 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Místní nebo vzdálená cesta směřující na zdrojový kód. |
ParallelRunConfig.description | parallel_run_function.description | Popisný popis paralel |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Řetězec názvu úrovně protokolování, který je definován v "protokolování". Možné hodnoty jsou WARNING, INFO a DEBUG. (Volitelné, výchozí hodnota je INFO.) Tuto hodnotu je možné nastavit prostřednictvím pipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Časový limit v sekundách pro spuštění vlastní funkce run(). Pokud je doba provádění vyšší než tato prahová hodnota, minidávka se přeruší a označí se jako neúspěšná minidávka, aby se aktivovalo opakování. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Počet opakování při selhání minidávky nebo vypršení časového limitu. Pokud dojde k selhání všech opakovaných pokusů, bude minidávka označena jako neúspěšná při mini_batch_error_threshold výpočtu. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | V kombinaci s append_row_to nastavením. |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Počet neúspěšných mini dávek, které by bylo možné v této paralelní úloze ignorovat. Pokud je počet neúspěšných minidávek vyšší než tato prahová hodnota, bude paralelní úloha označena jako neúspěšná. -1 je výchozí číslo, což znamená ignorovat všechny neúspěšné minidávce během paralelní úlohy. |
ParallelRunConfig.allowed_failed_percent | sada parallel_run_function.task.program_arguments--allowed_failed_percent |
Podobně jako "allowed_failed_count", ale toto nastavení používá procento neúspěšných minidávek místo počtu selhání minidávkové dávky. Rozsah tohoto nastavení je [0, 100]. "100" je výchozí číslo, což znamená ignorovat všechny neúspěšné minidávce během paralelní úlohy. |
ParallelRunConfig.partition_keys | Probíhá vývoj. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Slovník názvů a hodnot proměnných prostředí. Tyto proměnné prostředí se nastavují v procesu, ve kterém se spouští uživatelský skript. |
ParallelRunStep.name | parallel_run_function.name | Název vytvořené paralelní úlohy nebo komponenty |
ParallelRunStep.inputs | parallel_run_function.inputs | Dikt vstupů používaných touto paralelou |
-- | parallel_run_function.input_data | Deklarujte, že data se mají rozdělit a zpracovat paralelně. |
ParallelRunStep.output | parallel_run_function.outputs | Výstupy této paralelní úlohy. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Definováno společně s inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Argumenty paralelní úlohy. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Určete, zda bude paralel vrátit stejný výstup se stejným vstupem. |
Další kroky
Další informace najdete v dokumentaci tady: