Movendo dados para e entre as etapas de pipeline de ML (Python)

APLICA-SE A:azureml do SDK do Python v1

Este artigo fornece código para importação, transformação e movimentação de dados entre etapas em um pipeline de Azure Machine Learning. Para obter uma visão geral de como os dados funcionam no Azure Machine Learning, consulte Acessar dados nos serviços de armazenamento do Azure. Para obter os benefícios e a estrutura de pipelines de Azure Machine Learning, confira O que são pipelines de Azure Machine Learning?

Este artigo mostra como:

  • Usar Dataset objetos para dados preexistentes
  • Acessar dados em suas etapas
  • Dividir Dataset dados em subconjuntos, como subconjuntos de treinamento e validação
  • Criar OutputFileDatasetConfig objetos para transferir dados para a próxima etapa do pipeline
  • Usar OutputFileDatasetConfig objetos como entrada para etapas de pipeline
  • Criar novos Dataset objetos a partir dos OutputFileDatasetConfig quais deseja persistir

Pré-requisitos

Você precisa de:

Usar Dataset objetos para dados preexistentes

A maneira preferida de ingerir dados em um pipeline é usar um objeto DataSet. Dataset os objetos representam dados persistentes disponíveis em um workspace.

Há várias maneiras de criar e registrar Dataset objetos. Os conjuntos de dados tabulares são para a disponibilidade delimitada em um ou mais arquivos. Os conjuntos de dados de arquivos são para dados binários (como imagens) ou para dados que você analisa. As maneiras programáticas mais simples de criar Dataset objetos são usar BLOBs existentes no armazenamento do espaço de trabalho ou URLs públicas:

datastore = Datastore.get(workspace, 'training_data')
iris_dataset = Dataset.Tabular.from_delimited_files(DataPath(datastore, 'iris.csv'))

datastore_path = [
    DataPath(datastore, 'animals/dog/1.jpg'),
    DataPath(datastore, 'animals/dog/2.jpg'),
    DataPath(datastore, 'animals/cat/*.jpg')
]
cats_dogs_dataset = Dataset.File.from_files(path=datastore_path)

Para obter mais opções sobre como criar conjuntos de dados com diferentes opções e de fontes diferentes, registrá-los e revisá-los na interface do usuário do Azure Machine Learning, entender como o tamanho dos dados interage com a capacidade de computação e versões deles, consulte Criar conjuntos de dados Azure Machine Learning.

Passar conjuntos de dados para o script

Para passar o caminho do conjuntos de dados para o script, use Dataset o método do as_named_input() objeto. Você pode passar o DatasetConsumptionConfigobjeto resultante para o script como um argumento ou, usando o inputsargumento para o script de pipeline, você pode recuperar o conjuntos de dados usando Run.get_context().input_datasets[].

Depois de criar uma entrada nomeada, você pode escolher seu modo de acesso (para FileDataset apenas): as_mount() ou as_download(). Se o script processa todos os arquivos em seu conjuntos de dados e o disco em seu recurso de computação for grande o suficiente para o conjuntos de dados, o modo de acesso de download será a melhor opção. O modo de acesso por download evita a sobrecarga de streaming dos dados em runtime. Se o script acessar um subconjunto do conjuntos de dados ou for muito grande para sua computação, use o modo de acesso de montagem. Para obter mais informações, leia Montar versus Baixar

Para passar um conjuntos de dados para a etapa do pipeline:

  1. Use TabularDataset.as_named_input() ou FileDataset.as_named_input() (nenhum 's' no final) para criar um DatasetConsumptionConfig objeto
  2. Para FileDataset apenas:. Use as_mount() ou as_download() para definir o modo de acesso. TabularDataset não dá suporte à definição do modo de acesso.
  3. Passe os conjuntos de dados para as etapas do pipeline usando o arguments ou o inputs argumento

O snippet a seguir mostra o padrão comum de combinação dessas etapas no construtor PythonScriptStep, usando iris_dataset (TabularDataset):


train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[iris_dataset.as_named_input('iris')]
)

Observação

Você precisaria substituir os valores de todos esses argumentos (ou seja, "train_data""train.py", cluster e iris_dataset) por seus próprios dados. O snippet acima mostra apenas a forma da chamada e não faz parte de um exemplo da Microsoft.

Você também pode usar métodos como random_split() e take_sample() para criar várias entradas ou reduzir a quantidade de dados passados para a etapa do pipeline:

seed = 42 # PRNG seed
smaller_dataset = iris_dataset.take_sample(0.1, seed=seed) # 10%
train, test = smaller_dataset.random_split(percentage=0.8, seed=seed)

train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[train.as_named_input('train'), test.as_named_input('test')]
)

Acessar conjuntos de dados em seu script

Entradas nomeadas para o script de etapa de pipeline estão disponíveis como um dicionário dentro do Run objeto. Recupere o Run objeto ativo usando Run.get_context() e, em seguida, recupere o dicionário de entradas nomeadas usando input_datasets. Se você passou o DatasetConsumptionConfig objeto usando o arguments argumento em vez do inputs argumento, acesse os dados usando o ArgParser código. Ambas as técnicas são demonstradas nos trechos a seguir:

O script de definição do pipeline

# Code for demonstration only: It would be very confusing to split datasets between `arguments` and `inputs`
train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    # datasets passed as arguments
    arguments=['--training-folder', train.as_named_input('train').as_download()],
    # datasets passed as inputs
    inputs=[test.as_named_input('test').as_download()]
)

O script train.py referenciado a partir do PythonScriptStep

# In pipeline script
parser = argparse.ArgumentParser()
# Retreive the dataset passed as an argument
parser.add_argument('--training-folder', type=str, dest='train_folder', help='training data folder mounting point')
args = parser.parse_args()
training_data_folder = args.train_folder
# Retrieve the dataset passed as an input
testing_data_folder = Run.get_context().input_datasets['test']

O valor passado é o caminho para o(s) arquivo(s) do conjunto de dados.

Também é possível acessar um registrado Dataset diretamente. Como os conjuntos de dado registrados são persistentes e compartilhados em um espaço de trabalho, você pode recuperá-los diretamente:

run = Run.get_context()
ws = run.experiment.workspace
ds = Dataset.get_by_name(workspace=ws, name='mnist_opendataset')

Observação

Os trechos de código anteriores mostram a forma das chamadas e não fazem parte de um exemplo da Microsoft. Você deve substituir os vários argumentos por valores de seu próprio projeto.

Usar OutputFileDatasetConfig para dados intermediários

Embora Dataset os objetos representem apenas dados persistentes, os objetos OutputFileDatasetConfig podem ser usados para saída de dados temporários de etapas de pipeline e dados de saída persistentes. OutputFileDatasetConfig dá suporte à gravação de dados no armazenamento de BLOBs, FileShare, adlsgen1 ou adlsgen2. Ele dá suporte ao modo de montagem e ao modo de carregamento. No modo de montagem, os arquivos gravados no diretório montado são armazenados permanentemente quando o arquivo é fechado. No modo de carregamento, os arquivos gravados no diretório de saída são carregados no final do trabalho. Se o trabalho falhar ou for cancelado, o diretório de saída não será carregado.

OutputFileDatasetConfig o comportamento padrão do objeto é gravar no repositório de armazenamento padrão do espaço de trabalho. Passe seus OutputFileDatasetConfig objetos para o PythonScriptStep com o arguments parâmetro.

from azureml.data import OutputFileDatasetConfig
dataprep_output = OutputFileDatasetConfig()
input_dataset = Dataset.get_by_name(workspace, 'raw_data')

dataprep_step = PythonScriptStep(
    name="prep_data",
    script_name="dataprep.py",
    compute_target=cluster,
    arguments=[input_dataset.as_named_input('raw_data').as_mount(), dataprep_output]
    )

Observação

As gravações simultâneas em um OutputFileDatasetConfig falharão. Não tente usar um único OutputFileDatasetConfig simultaneamente. Não compartilhe um único em OutputFileDatasetConfig uma situação de multiprocessamento, como ao usar o treinamento distribuído.

Usar OutputFileDatasetConfig como saídas de uma etapa de treinamento

No PythonScriptStep do pipeline, você pode recuperar os caminhos de saída disponíveis usando os argumentos do programa. Se esta etapa for a primeira e iniciar os dados de saída, você deve criar o diretório no caminho especificado. Em seguida, você pode gravar os arquivos que você deseja que fiquem no OutputFileDatasetConfig.

parser = argparse.ArgumentParser()
parser.add_argument('--output_path', dest='output_path', required=True)
args = parser.parse_args()

# Make directory for file
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
    f.write("Step 1's output")

Ler OutputFileDatasetConfig como entradas para etapas não iniciais

Depois que a etapa inicial do pipeline grava alguns dados no OutputFileDatasetConfig caminho e se torna uma saída dessa etapa inicial, ela pode ser usada como uma entrada para uma etapa posterior.

No seguinte código:

  • step1_output_data indica que a saída do PythonScriptStep é escrita no armazenamento de dados step1 do ADLS Gen 2, my_adlsgen2 no modo de acesso de upload. Saiba mais sobre como configurar permissões de função para gravar dados de volta em datastores do ADLS Gen 2.

  • Após step1 a conclusão e a saída ser escrita no destino indicado por step1_output_data, a etapa 2 estará pronta para uso step1_output_data como entrada.

# get adls gen 2 datastore already registered with the workspace
datastore = workspace.datastores['my_adlsgen2']
step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()

step1 = PythonScriptStep(
    name="generate_data",
    script_name="step1.py",
    runconfig = aml_run_config,
    arguments = ["--output_path", step1_output_data]
)

step2 = PythonScriptStep(
    name="read_pipeline_data",
    script_name="step2.py",
    compute_target=compute,
    runconfig = aml_run_config,
    arguments = ["--pd", step1_output_data.as_input()]

)

pipeline = Pipeline(workspace=ws, steps=[step1, step2])

Dica

A leitura dos dados no script python step2.py é igual à documentada anteriormente em Acessar conjuntos de dados em seu script; use ArgumentParser para adicionar um argumento --pd no seu script para acessar os dados.

Registrar OutputFileDatasetConfig objetos para reutilização

Se você quiser disponibilizar o OutputFileDatasetConfig por mais tempo do que a duração do experimento, registre-o em seu workspace para compartilha-lo e reutiliza-lo entre experimentos.

step1_output_ds = step1_output_data.register_on_complete(
    name='processed_data', 
    description = 'files from step1'
)

Excluir OutputFileDatasetConfig conteúdo quando não for mais necessário

O Azure não exclui automaticamente os dados intermediários gravados com OutputFileDatasetConfig. Para evitar encargos de armazenamento para grandes quantidades de dados indevidamente, você deve:

Cuidado

Exclua somente dados intermediários após 30 dias da última data de alteração dos dados. Excluir os dados anteriormente pode fazer com que a execução do pipeline falhe porque o pipeline assumirá que os dados intermediários existem dentro do período de 30 dias para reutilização.

  • Exclua programaticamente os dados intermediários no final de um trabalho de pipeline, quando eles não forem mais necessários.
  • Usar o armazenamento de blob com uma política de armazenamento de curto prazo para dados intermediários (consulte Otimizar custos automatizando as camadas de acesso de Armazenamento de Blobs do Azure). Essa política só pode ser definida como um armazenamento de dados não padrão de um workspace. Use OutputFileDatasetConfig para exportar dados intermediários para outro armazenamento de dados que não seja o padrão.
    # Get adls gen 2 datastore already registered with the workspace
    datastore = workspace.datastores['my_adlsgen2']
    step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()
    
  • Revisar e excluir regularmente os dados que não são mais necessários.

Para obter mais informações, consulte Planejar e gerenciar custos para Azure Machine Learning.

Próximas etapas