Upgrade kroku paralelního spuštění na sadu SDK v2

V sadě SDK v2 se "Krok paralelního spuštění" konsoliduje do konceptu úlohy jako parallel job. Paralelní úloha udržuje stejný cíl, aby uživatelé mohli zrychlit provádění úloh prostřednictvím distribuce opakovaných úloh na výkonných výpočetních clusterech s více uzly. Kromě kroku paralelního spuštění nabízí paralelní úloha v2 další výhody:

  • Flexibilní rozhraní, které umožňuje uživateli definovat více vlastních vstupů a výstupů pro paralelní úlohu. Můžete je propojit s dalšími kroky pro využívání nebo správu jejich obsahu ve vstupním skriptu.
  • Zjednodušte si vstupní schéma, které nahradí Dataset jako vstup pomocí konceptu v2 data asset . Jako vstupy pro paralelní úlohu můžete snadno použít místní soubory nebo identifikátor URI adresáře objektů blob.
  • Výkonnější funkce jsou vyvíjeny pouze v paralelních úlohách v2. Obnovte například neúspěšnou nebo zrušenou paralelní úlohu, abyste mohli pokračovat ve zpracování neúspěšných nebo nezpracovaných minidávek opětovným použitím úspěšného výsledku, abyste ušetřili duplicitní úsilí.

Pokud chcete upgradovat aktuální krok paralelního spuštění sady SDK verze 1 na verzi 2, budete muset provést

  • Slouží parallel_run_function k vytvoření paralelní úlohy nahrazením ParallelRunConfig a ParallelRunStep ve verzi 1.
  • Upgradujte kanál v1 na verzi 2. Pak vyvolejte paralelní úlohu v2 jako krok v kanálu v2. Podrobnosti o upgradu kanálu najdete v tématu Upgrade kanálu z verze 1 na v2 .

Poznámka: Skript pro zadávání uživatele je kompatibilní mezi krokem paralelního spuštění v1 a paralelní úlohou v2. Při upgradu úlohy paralelního spuštění tak můžete dál používat stejný entry_script.py.

Tento článek obsahuje porovnání scénářů v sadě SDK v1 a SDK v2. V následujících příkladech vytvoříme paralelní úlohu, která bude predikovat vstupní data v úloze kanálů. Dozvíte se, jak vytvořit paralelní úlohu a jak ji použít v úloze kanálu pro sadu SDK v1 i SDK v2.

Požadavky

Vytvoření paralelního kroku

  • Sada SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SADA SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Použití paralelního kroku v kanálu

  • Sada SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SADA SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Mapování klíčových funkcí v sadě SDK v1 a SDK v2

Funkce v sadě SDK v1 Hrubé mapování v sadě SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Výstup
as_mount datové sady Vstup

Mapování a konfigurace paralelních úloh

Sada SDK v1 SADA SDK v2 Description
ParallelRunConfig.environment parallel_run_function.task.environment Prostředí, ve které se bude spouštět trénovací úloha.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Uživatelský skript, který se spustí paralelně na více uzlech.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Počet neúspěšných mini dávek, které by bylo možné v této paralelní úloze ignorovat. Pokud je počet neúspěšných minidávek vyšší než tato prahová hodnota, bude paralelní úloha označena jako neúspěšná.

-1 je výchozí číslo, což znamená ignorovat všechny neúspěšné minidávce během paralelní úlohy.
ParallelRunConfig.output_action parallel_run_function.append_row_to Agregujte všechny výnosy z každého spuštění minidávkové dávky a vypíšete je do tohoto souboru. Může odkazovat na jeden z výstupů paralelní úlohy pomocí výrazu ${{outputs.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Volitelný počet instancí nebo uzlů používaných cílovým výpočetním objektem Výchozí hodnota je 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Maximální paralelismus, který má každá výpočetní instance.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Definujte velikost jednotlivých minidávek pro rozdělení vstupu.

Pokud je input_data složka nebo sada souborů, definuje toto číslo počet souborů pro každou minidávku. Například 10, 100.

Pokud input_data jsou tabulková data z mltable, toto číslo definuje fyzickou velikost proximát pro každou minidávku. Výchozí jednotka je Bajt a hodnota může přijímat řetězec, jako je 100 kB, 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code Místní nebo vzdálená cesta směřující na zdrojový kód.
ParallelRunConfig.description parallel_run_function.description Popisný popis paralel
ParallelRunConfig.logging_level parallel_run_function.logging_level Řetězec názvu úrovně protokolování, který je definován v "protokolování". Možné hodnoty jsou WARNING, INFO a DEBUG. (Volitelné, výchozí hodnota je INFO.) Tuto hodnotu je možné nastavit prostřednictvím pipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Časový limit v sekundách pro spuštění vlastní funkce run(). Pokud je doba provádění vyšší než tato prahová hodnota, minidávka se přeruší a označí se jako neúspěšná minidávka, aby se aktivovalo opakování.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries Počet opakování při selhání minidávky nebo vypršení časového limitu. Pokud dojde k selhání všech opakovaných pokusů, bude minidávka označena jako neúspěšná při mini_batch_error_threshold výpočtu.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to V kombinaci s append_row_to nastavením.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Počet neúspěšných mini dávek, které by bylo možné v této paralelní úloze ignorovat. Pokud je počet neúspěšných minidávek vyšší než tato prahová hodnota, bude paralelní úloha označena jako neúspěšná.

-1 je výchozí číslo, což znamená ignorovat všechny neúspěšné minidávce během paralelní úlohy.
ParallelRunConfig.allowed_failed_percent sada parallel_run_function.task.program_arguments
--allowed_failed_percent
Podobně jako "allowed_failed_count", ale toto nastavení používá procento neúspěšných minidávek místo počtu selhání minidávkové dávky.

Rozsah tohoto nastavení je [0, 100]. "100" je výchozí číslo, což znamená ignorovat všechny neúspěšné minidávce během paralelní úlohy.
ParallelRunConfig.partition_keys Probíhá vývoj.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Slovník názvů a hodnot proměnných prostředí. Tyto proměnné prostředí se nastavují v procesu, ve kterém se spouští uživatelský skript.
ParallelRunStep.name parallel_run_function.name Název vytvořené paralelní úlohy nebo komponenty
ParallelRunStep.inputs parallel_run_function.inputs Dikt vstupů používaných touto paralelou
-- parallel_run_function.input_data Deklarujte, že data se mají rozdělit a zpracovat paralelně.
ParallelRunStep.output parallel_run_function.outputs Výstupy této paralelní úlohy.
ParallelRunStep.side_inputs parallel_run_function.inputs Definováno společně s inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Argumenty paralelní úlohy.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Určete, zda bude paralel vrátit stejný výstup se stejným vstupem.

Další kroky

Další informace najdete v dokumentaci tady: