Publique e rastreie pipelines de aprendizado de máquina

APLICA-SE A:Python SDK azureml v1

Este artigo mostrará como compartilhar um pipeline de aprendizado de máquina com seus colegas ou clientes.

Os pipelines de aprendizado de máquina são fluxos de trabalho reutilizáveis para tarefas de aprendizado de máquina. Um benefício dos pipelines é o aumento da colaboração. Você também pode fazer pipelines de versão, permitindo que os clientes usem o modelo atual enquanto você está trabalhando em uma nova versão.

Pré-requisitos

Publicar um pipeline

Depois de ter um pipeline instalado e em execução, você pode publicar um pipeline para que ele seja executado com entradas diferentes. Para que o ponto de extremidade REST de um pipeline já publicado aceite parâmetros, você deve configurar seu pipeline para usar PipelineParameter objetos para os argumentos que variarão.

  1. Para criar um parâmetro de pipeline, use um objeto PipelineParameter com um valor padrão.

    from azureml.pipeline.core.graph import PipelineParameter
    
    pipeline_param = PipelineParameter(
      name="pipeline_arg",
      default_value=10)
    
  2. Adicione este PipelineParameter objeto como um parâmetro a qualquer uma das etapas no pipeline da seguinte maneira:

    compareStep = PythonScriptStep(
      script_name="compare.py",
      arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param],
      inputs=[ comp_data1, comp_data2],
      outputs=[out_data3],
      compute_target=compute_target,
      source_directory=project_folder)
    
  3. Publique este pipeline que aceitará um parâmetro quando invocado.

    published_pipeline1 = pipeline_run1.publish_pipeline(
         name="My_Published_Pipeline",
         description="My Published Pipeline Description",
         version="1.0")
    
  4. Depois de publicar seu pipeline, você pode verificá-lo na interface do usuário. O ID do pipeline é a identificação exclusiva do pipeline publicado.

    Screenshot showing published pipeline detail.

Executar um pipeline publicado

Todos os pipelines publicados têm um ponto de extremidade REST. Com o ponto de extremidade do pipeline, você pode disparar uma execução do pipeline de qualquer sistema externo, incluindo clientes não-Python. Esse ponto de extremidade permite a "repetibilidade gerenciada" em cenários de pontuação e retreinamento em lote.

Importante

Se você estiver usando o controle de acesso baseado em função do Azure (Azure RBAC) para gerenciar o acesso ao seu pipeline, defina as permissões para seu cenário de pipeline (treinamento ou pontuação).

Para invocar a execução do pipeline anterior, você precisa de um token de cabeçalho de autenticação do Microsoft Entra. A obtenção desse token é descrita na referência de classe AzureCliAuthentication e no bloco de anotações Authentication in Azure Machine Learning .

from azureml.pipeline.core import PublishedPipeline
import requests

response = requests.post(published_pipeline1.endpoint,
                         headers=aad_token,
                         json={"ExperimentName": "My_Pipeline",
                               "ParameterAssignments": {"pipeline_arg": 20}})

O json argumento para a solicitação POST deve conter, para a ParameterAssignments chave, um dicionário contendo os parâmetros do pipeline e seus valores. Além disso, o json argumento pode conter as seguintes chaves:

Chave Descrição
ExperimentName O nome do experimento associado a esse ponto de extremidade
Description Texto de forma livre descrevendo o ponto de extremidade
Tags Pares chave-valor de forma livre que podem ser usados para rotular e anotar solicitações
DataSetDefinitionValueAssignments Dicionário usado para alterar conjuntos de dados sem reciclagem (ver discussão abaixo)
DataPathAssignments Dicionário usado para alterar caminhos de dados sem reciclagem (veja a discussão abaixo)

Executar um pipeline publicado usando C#

O código a seguir mostra como chamar um pipeline de forma assíncrona a partir de C#. O trecho de código parcial mostra apenas a estrutura da chamada e não faz parte de um exemplo da Microsoft. Ele não mostra classes completas ou tratamento de erros.

[DataContract]
public class SubmitPipelineRunRequest
{
    [DataMember]
    public string ExperimentName { get; set; }

    [DataMember]
    public string Description { get; set; }

    [DataMember(IsRequired = false)]
    public IDictionary<string, string> ParameterAssignments { get; set; }
}

// ... in its own class and method ... 
const string RestEndpoint = "your-pipeline-endpoint";

using (HttpClient client = new HttpClient())
{
    var submitPipelineRunRequest = new SubmitPipelineRunRequest()
    {
        ExperimentName = "YourExperimentName", 
        Description = "Asynchronous C# REST api call", 
        ParameterAssignments = new Dictionary<string, string>
        {
            {
                // Replace with your pipeline parameter keys and values
                "your-pipeline-parameter", "default-value"
            }
        }
    };

    string auth_key = "your-auth-key"; 
    client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", auth_key);

    // submit the job
    var requestPayload = JsonConvert.SerializeObject(submitPipelineRunRequest);
    var httpContent = new StringContent(requestPayload, Encoding.UTF8, "application/json");
    var submitResponse = await client.PostAsync(RestEndpoint, httpContent).ConfigureAwait(false);
    if (!submitResponse.IsSuccessStatusCode)
    {
        await WriteFailedResponse(submitResponse); // ... method not shown ...
        return;
    }

    var result = await submitResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
    var obj = JObject.Parse(result);
    // ... use `obj` dictionary to access results
}

Executar um pipeline publicado usando Java

O código a seguir mostra uma chamada para um pipeline que requer autenticação (consulte Configurar a autenticação para recursos e fluxos de trabalho do Azure Machine Learning). Se o pipeline for implantado publicamente, você não precisará das chamadas que produzem authKey. O trecho de código parcial não mostra a classe Java e o clichê de manipulação de exceções. O código usa Optional.flatMap para encadear funções que podem retornar um vazio Optional. O uso de flatMap encurta e esclarece o código, mas note que getRequestBody() engole exceções.

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
// JSON library
import com.google.gson.Gson;

String scoringUri = "scoring-endpoint";
String tenantId = "your-tenant-id";
String clientId = "your-client-id";
String clientSecret = "your-client-secret";
String resourceManagerUrl = "https://management.azure.com";
String dataToBeScored = "{ \"ExperimentName\" : \"My_Pipeline\", \"ParameterAssignments\" : { \"pipeline_arg\" : \"20\" }}";

HttpClient client = HttpClient.newBuilder().build();
Gson gson = new Gson();

HttpRequest tokenAuthenticationRequest = tokenAuthenticationRequest(tenantId, clientId, clientSecret, resourceManagerUrl);
Optional<String> authBody = getRequestBody(client, tokenAuthenticationRequest);
Optional<String> authKey = authBody.flatMap(body -> Optional.of(gson.fromJson(body, AuthenticationBody.class).access_token);;
Optional<HttpRequest> scoringRequest = authKey.flatMap(key -> Optional.of(scoringRequest(key, scoringUri, dataToBeScored)));
Optional<String> scoringResult = scoringRequest.flatMap(req -> getRequestBody(client, req));
// ... etc (`scoringResult.orElse()`) ... 

static HttpRequest tokenAuthenticationRequest(String tenantId, String clientId, String clientSecret, String resourceManagerUrl)
{
    String authUrl = String.format("https://login.microsoftonline.com/%s/oauth2/token", tenantId);
    String clientIdParam = String.format("client_id=%s", clientId);
    String resourceParam = String.format("resource=%s", resourceManagerUrl);
    String clientSecretParam = String.format("client_secret=%s", clientSecret);

    String bodyString = String.format("grant_type=client_credentials&%s&%s&%s", clientIdParam, resourceParam, clientSecretParam);

    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(authUrl))
        .POST(HttpRequest.BodyPublishers.ofString(bodyString))
        .build();
    return request;
}

static HttpRequest scoringRequest(String authKey, String scoringUri, String dataToBeScored)
{
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(scoringUri))
        .header("Authorization", String.format("Token %s", authKey))
        .POST(HttpRequest.BodyPublishers.ofString(dataToBeScored))
        .build();
    return request;

}

static Optional<String> getRequestBody(HttpClient client, HttpRequest request) {
    try {
        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        if (response.statusCode() != 200) {
            System.out.println(String.format("Unexpected server response %d", response.statusCode()));
            return Optional.empty();
        }
        return Optional.of(response.body());
    }catch(Exception x)
    {
        System.out.println(x.toString());
        return Optional.empty();
    }
}

class AuthenticationBody {
    String access_token;
    String token_type;
    int expires_in;
    String scope;
    String refresh_token;
    String id_token;
    
    AuthenticationBody() {}
}

Alterar conjuntos de dados e caminhos de dados sem reciclagem

Você pode querer treinar e inferir em diferentes conjuntos de dados e caminhos de dados. Por exemplo, você pode querer treinar em um conjunto de dados menor, mas inferência no conjunto de dados completo. Você alterna conjuntos de dados com a DataSetDefinitionValueAssignments chave no argumento da json solicitação. Você alterna caminhos de dados com DataPathAssignmentso . A técnica para ambos é semelhante:

  1. No script de definição de pipeline, crie um PipelineParameter para o conjunto de dados. Crie um DatasetConsumptionConfig ou DataPath a partir do PipelineParameter:

    tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
    tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
    tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
    
  2. No script ML, acesse o conjunto de dados especificado dinamicamente usando Run.get_context().input_datasets:

    from azureml.core import Run
    
    input_tabular_ds = Run.get_context().input_datasets['tabular_dataset']
    dataframe = input_tabular_ds.to_pandas_dataframe()
    # ... etc ...
    

    Observe que o script ML acessa o valor especificado para o () e não o DatasetConsumptionConfig valor do PipelineParameter (tabular_datasettabular_ds_param).

  3. No script de definição de pipeline, defina o DatasetConsumptionConfig parâmetro as a como :PipelineScriptStep

    train_step = PythonScriptStep(
        name="train_step",
        script_name="train_with_dataset.py",
        arguments=["--param1", tabular_ds_consumption],
        inputs=[tabular_ds_consumption],
        compute_target=compute_target,
        source_directory=source_directory)
    
    pipeline = Pipeline(workspace=ws, steps=[train_step])
    
  4. Para alternar conjuntos de dados dinamicamente em sua chamada REST de inferência, use DataSetDefinitionValueAssignments:

    tabular_ds1 = Dataset.Tabular.from_delimited_files('path_to_training_dataset')
    tabular_ds2 = Dataset.Tabular.from_delimited_files('path_to_inference_dataset')
    ds1_id = tabular_ds1.id
    d22_id = tabular_ds2.id
    
    response = requests.post(rest_endpoint, 
                             headers=aad_token, 
                             json={
                                "ExperimentName": "MyRestPipeline",
                               "DataSetDefinitionValueAssignments": {
                                    "tabular_ds_param": {
                                        "SavedDataSetReference": {"Id": ds1_id #or ds2_id
                                    }}}})
    

Os blocos de anotações Showcasing Dataset e PipelineParameter e Showcasing DataPath e PipelineParameter têm exemplos completos dessa técnica.

Criar um ponto de extremidade de pipeline versionado

Você pode criar um Pipeline Endpoint com vários pipelines publicados por trás dele. Essa técnica oferece um ponto de extremidade REST fixo à medida que você itera e atualiza seus pipelines de ML.

from azureml.pipeline.core import PipelineEndpoint

published_pipeline = PublishedPipeline.get(workspace=ws, id="My_Published_Pipeline_id")
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
                                            pipeline=published_pipeline, description="Test description Notebook")

Enviar um trabalho para um ponto de extremidade de pipeline

Você pode enviar um trabalho para a versão padrão de um ponto de extremidade de pipeline:

pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment")
print(run_id)

Você também pode enviar um trabalho para uma versão específica:

run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
print(run_id)

O mesmo pode ser feito usando a API REST:

rest_endpoint = pipeline_endpoint_by_name.endpoint
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "PipelineEndpointExperiment",
                               "RunSource": "API",
                               "ParameterAssignments": {"1": "united", "2":"city"}})

Usar pipelines publicados no estúdio

Você também pode executar um pipeline publicado a partir do estúdio:

  1. Entre no estúdio do Azure Machine Learning.

  2. Veja o seu espaço de trabalho.

  3. À esquerda, selecione Pontos de extremidade.

  4. Na parte superior, selecione Pontos de extremidade de pipeline. list of machine learning published pipelines

  5. Selecione um pipeline específico para executar, consumir ou revisar os resultados de execuções anteriores do ponto de extremidade do pipeline.

Desativar um pipeline publicado

Para ocultar um pipeline da sua lista de pipelines publicados, desative-o, no estúdio ou no SDK:

# Get the pipeline by using its ID from Azure Machine Learning studio
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()

Você pode ativá-lo novamente com p.enable(). Para obter mais informações, consulte PublishedPipeline class reference.

Próximos passos