Executar previsões em lote em grandes conjuntos de dados com o Serviço do Azure Machine LearningRun batch predictions on large data sets with Azure Machine Learning service

Neste artigo, você aprenderá como realizar previsões assíncronas em grandes quantidades de dados usando o Serviço do Azure Machine Learning.In this article, you'll learn how to make predictions on large quantities of data asynchronously, by using the Azure Machine Learning service.

A previsão em lote (ou pontuação do lote) fornece inferência econômica com taxa de transferência incomparável para aplicativos assíncronos.Batch prediction (or batch scoring) provides cost-effective inference, with unparalleled throughput for asynchronous applications. Os pipelines de previsão em lote podem ser dimensionados para realizar inferências em terabytes de dados de produção.Batch prediction pipelines can scale to perform inference on terabytes of production data. A previsão em lote é otimizada para previsões tipo fire-and-forget de alta taxa de transferência para uma grande coleção de dados.Batch prediction is optimized for high throughput, fire-and-forget predictions for a large collection of data.

Dica

Se o sistema exigir processamento de baixa latência (para processar rapidamente um único documento ou um pequeno conjunto de documentos), use a pontuação em tempo real em vez da previsão em lote.If your system requires low-latency processing (to process a single document or small set of documents quickly), use real-time scoring instead of batch prediction.

Nas etapas a seguir, você criará um pipeline de aprendizado de máquina para registrar um modelo de visão computacional pré-treinado (Inception-V3).In the following steps, you create a machine learning pipeline to register a pretrained computer vision model (Inception-V3). Em seguida, você usa o modelo pré-treinado pra fazer pontuação do lote em imagens disponíveis na sua conta de armazenamento de Blob do Azure.Then you use the pretrained model to do batch scoring on images available in your Azure Blob storage account. Essas imagens usadas na pontuação são imagens não rotuladas do conjunto de dados ImageNet.These images used for scoring are unlabeled images from the ImageNet dataset.

Pré-requisitosPrerequisites

Configurar recursos de aprendizado de máquinaSet up machine learning resources

As etapas a seguir configurarão os recursos necessários para executar um pipeline:The following steps set up the resources you need to run a pipeline:

  • Acesse o repositório de dados que já possui o modelo previamente treinado, rótulos de entrada e imagens para pontuação (isso já está configurado para você).Access the datastore that already has the pretrained model, input labels, and images to score (this is already set up for you).
  • Configure um repositório de dados para armazenar os resultados.Set up a datastore to store your outputs.
  • Configure objetos DataReference para apontar para os dados nos armazenamentos de dados anteriores.Configure DataReference objects to point to the data in the preceding datastores.
  • Configure as máquinas ou os clusters de computação em que as etapas do pipeline serão executadas.Set up compute machines or clusters where the pipeline steps will run.

Acessar os repositórios de dadosAccess the datastores

Primeiro, acesse o repositório de dados que possui o modelo, os rótulos e as imagens.First, access the datastore that has the model, labels, and images.

Você usará um contêiner de blob público nomeado sampledata na conta pipelinedata que contém imagens do conjunto de avaliação do ImageNet.You'll use a public blob container, named sampledata, in the pipelinedata account that holds images from the ImageNet evaluation set. O nome do repositório de dados desse contêiner público é images_datastore.The datastore name for this public container is images_datastore. Registre esse repositório de dados em seu espaço de trabalho:Register this datastore with your workspace:

# Public blob container details
account_name = "pipelinedata"
datastore_name="images_datastore"
container_name="sampledata"
 
batchscore_blob = Datastore.register_azure_blob_container(ws,
                      datastore_name=datastore_name,
                      container_name= container_name,
                      account_name=account_name,
                      overwrite=True)

Em seguida, configure para usar o armazenamento de dados padrão para as saídas.Next, set up to use the default datastore for the outputs.

Ao criar seu espaço de trabalho, Arquivos do Azure e armazenamento de Blob são conectados ao espaço de trabalho por padrão.When you create your workspace, Azure Files and Blob storage are attached to the workspace by default. Arquivos do Azure é o armazenamento de dados padrão para um espaço de trabalho, mas também é possível usar o armazenamento de Blob como um armazenamento de dados.Azure Files is the default datastore for a workspace, but you can also use Blob storage as a datastore. Para obter mais informações, consulte Opções de armazenamento do Azure.For more information, see Azure storage options.

def_data_store = ws.get_default_datastore()

Configurar referências de dadosConfigure data references

Agora, faça referência aos dados em seu pipeline como entradas para as etapas do pipeline.Now, reference the data in your pipeline as inputs to pipeline steps.

Uma fonte de dados em um pipeline é representada por um objeto DataReference .A data source in a pipeline is represented by a DataReference object. O objeto DataReference aponta para dados que residem ou são acessados a partir de um armazenamento de dados.The DataReference object points to data that lives in, or is accessible from, a datastore. Você precisa de objetosDataReference para o diretório usado para imagens de entrada, o diretório no qual o modelo pré-treinado está armazenado, o diretório de rótulos e o diretório de saída.You need DataReference objects for the directory used for input images, the directory in which the pretrained model is stored, the directory for labels, and the output directory.

input_images = DataReference(datastore=batchscore_blob, 
                             data_reference_name="input_images",
                             path_on_datastore="batchscoring/images",
                             mode="download")
                           
model_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_model",
                          path_on_datastore="batchscoring/models",
                          mode="download")                          
                         
label_dir = DataReference(datastore=batchscore_blob, 
                          data_reference_name="input_labels",
                          path_on_datastore="batchscoring/labels",
                          mode="download")                          
                         
output_dir = PipelineData(name="scores", 
                          datastore=def_data_store, 
                          output_path_on_compute="batchscoring/results")

Configurar destino de computaçãoSet up compute target

No Azure Machine Learning, computação (ou destino de computação) refere-se a computadores ou clusters que executam as etapas computacionais no pipeline de aprendizado de máquina.In Azure Machine Learning, compute (or compute target) refers to the machines or clusters that perform the computational steps in your machine learning pipeline. Por exemplo, você pode criar uma Azure Machine Learning compute.For example, you can create an Azure Machine Learning compute.

compute_name = "gpucluster"
compute_min_nodes = 0
compute_max_nodes = 4
vm_size = "STANDARD_NC6"

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('Found compute target. just use it. ' + compute_name)
else:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
                     vm_size = vm_size, # NC6 is GPU-enabled
                     vm_priority = 'lowpriority', # optional
                     min_nodes = compute_min_nodes, 
                     max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, 
                        compute_name, 
                        provisioning_config)
    
    compute_target.wait_for_completion(
                     show_output=True, 
                     min_node_count=None, 
                     timeout_in_minutes=20)

Preparar o modeloPrepare the model

Antes de usar o modelo previamente treinado, você precisa fazer o download do modelo e registrá-lo em seu espaço de trabalho.Before you can use the pretrained model, you'll need to download the model and register it with your workspace.

Fazer o download do modelo previamente treinadoDownload the pretrained model

Faça o download do modelo de visão computacional previamente treinado (InceptionV3) de http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz.Download the pretrained computer vision model (InceptionV3) from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz. Em seguida, extraia-o para a subpasta models.Then extract it to the models subfolder.

import os
import tarfile
import urllib.request

model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

url="http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz"
response = urllib.request.urlretrieve(url, "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall(model_dir)

Registre o modeloRegister the model

Veja como registrar o modelo:Here's how to register the model:

import shutil
from azureml.core.model import Model

# register downloaded model 
model = Model.register(
        model_path = "models/inception_v3.ckpt",
        model_name = "inception", # This is the name of the registered model
        tags = {'pretrained': "inception"},
        description = "Imagenet trained tensorflow inception",
        workspace = ws)

Gravar seu script de pontuaçãoWrite your scoring script

Aviso

O código a seguir é apenas um exemplo do que está contido no batch_score.py usado pelo bloco de anotações de exemplo.The following code is only a sample of what is contained in the batch_score.py used by the sample notebook. Será preciso criar seu próprio script de pontuação para o seu cenário.You’ll need to create your own scoring script for your scenario.

O script batch_score.py pega imagens de entrada em  dataset_path, modelos previamente treinados em model_dir, e gera results-label.txt em output_dir.The batch_score.py script takes input images in dataset_path, pretrained models in model_dir, and outputs results-label.txt to output_dir.

# Snippets from a sample scoring script
# Refer to the accompanying batch-scoring Notebook
# https://github.com/Azure/MachineLearningNotebooks/blob/master/pipeline/pipeline-batch-scoring.ipynb
# for the implementation script

# Get labels
def get_class_label_dict(label_file):
  label = []
  proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()
  for l in proto_as_ascii_lines:
    label.append(l.rstrip())
  return label

class DataIterator:
  # Definition of the DataIterator here
  
def main(_):
    # Refer to batch-scoring Notebook for implementation.
    label_file_name = os.path.join(args.label_dir, "labels.txt")
    label_dict = get_class_label_dict(label_file_name)
    classes_num = len(label_dict)
    test_feeder = DataIterator(data_dir=args.dataset_path)
    total_size = len(test_feeder.labels)

    # get model from model registry
    model_path = Model.get_model_path(args.model_name)
    with tf.Session() as sess:
        test_images = test_feeder.input_pipeline(batch_size=args.batch_size)
        with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
            input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])
            logits, _ = inception_v3.inception_v3(input_images,
                                                        num_classes=classes_num,
                                                        is_training=False)
            probabilities = tf.argmax(logits, 1)

        sess.run(tf.global_variables_initializer())
        sess.run(tf.local_variables_initializer())
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        saver = tf.train.Saver()
        saver.restore(sess, model_path)
        out_filename = os.path.join(args.output_dir, "result-labels.txt")
            
        # copy the file to artifacts
        shutil.copy(out_filename, "./outputs/")

Compilar e executar o pipeline de pontuação em loteBuild and run the batch scoring pipeline

Você tem tudo o que precisa para compilar o pipeline, agora é só juntar tudo.You have everything you need to build the pipeline, so now put it all together.

Prepare o ambiente de execuçãoPrepare the run environment

Especifique as dependências conda de seu script.Specify the conda dependencies for your script. Você precisará deste objeto mais tarde quando criar a etapa do pipeline.You'll need this object later, when you create the pipeline step.

from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.10.0", "azureml-defaults"])

# Runconfig
amlcompute_run_config = RunConfiguration(conda_dependencies=cd)
amlcompute_run_config.environment.docker.enabled = True
amlcompute_run_config.environment.docker.gpu_support = True
amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE
amlcompute_run_config.environment.spark.precache_packages = False

Especifique o parâmetro para o pipelineSpecify the parameter for your pipeline

Crie um parâmetro de pipeline usando um objeto PipelineParameter com um valor padrão.Create a pipeline parameter by using a PipelineParameter object with a default value.

batch_size_param = PipelineParameter(
                    name="param_batch_size", 
                    default_value=20)

Criar a etapa do pipelineCreate the pipeline step

Crie a etapa do pipeline usando o script, a configuração do ambiente e os parâmetros.Create the pipeline step by using the script, environment configuration, and parameters. Especifique o destino de computação que você já anexou ao seu espaço de trabalho como o destino de execução do script.Specify the compute target you already attached to your workspace as the target of execution of the script. Use PythonScriptStep para criar a etapa do pipeline.Use PythonScriptStep to create the pipeline step.

inception_model_name = "inception_v3.ckpt"

batch_score_step = PythonScriptStep(
    name="batch_scoring",
    script_name="batch_score.py",
    arguments=["--dataset_path", input_images, 
               "--model_name", "inception",
               "--label_dir", label_dir, 
               "--output_dir", output_dir, 
               "--batch_size", batch_size_param],
    compute_target=compute_target,
    inputs=[input_images, label_dir],
    outputs=[output_dir],
    runconfig=amlcompute_run_config,
    source_directory=scripts_folder

Executar o pipelineRun the pipeline

Agora, execute o pipeline e analise a saída produzida.Now run the pipeline, and examine the output it produced. A saída terá uma pontuação correspondente a cada imagem de entrada.The output has a score corresponding to each input image.

# Run the pipeline
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_params={"param_batch_size": 20})

# Wait for the run to finish (this might take several minutes)
pipeline_run.wait_for_completion(show_output=True)

# Download and review the output
step_run = list(pipeline_run.get_children())[0]
step_run.download_file("./outputs/result-labels.txt")

import pandas as pd
df = pd.read_csv("result-labels.txt", delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
df.head()

Publicar o pipelinePublish the pipeline

Quando estiver satisfeito com o resultado da execução, publique o pipeline para que você possa executá-lo com diferentes valores de entrada mais tarde.After you're satisfied with the outcome of the run, publish the pipeline so you can run it with different input values later. Ao publicar um pipeline, você obtém um ponto de extremidade REST.When you publish a pipeline, you get a REST endpoint. Esse ponto de extremidade aceita invocação do pipeline com o conjunto de parâmetros já incorporado, usando PipelineParameter.This endpoint accepts invoking of the pipeline with the set of parameters you have already incorporated by using PipelineParameter.

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", 
    description="Batch scoring using Inception v3 model", 
    version="1.0")

Executar novamente o pipeline usando o ponto de extremidade RESTRerun the pipeline by using the REST endpoint

Para executar o pipeline novamente, será necessário um token de cabeçalho de autenticação do Azure Active Directory, conforme descrito na classe AzureCliAuthentication.To rerun the pipeline, you'll need an Azure Active Directory authentication header token, as described in AzureCliAuthentication class.

from azureml.pipeline.core import PublishedPipeline

rest_endpoint = published_pipeline.endpoint
# specify batch size when running the pipeline
response = requests.post(rest_endpoint, 
        headers=aad_token, 
        json={"ExperimentName": "batch_scoring",
               "ParameterAssignments": {"param_batch_size": 50}})

# Monitor the run
from azureml.pipeline.core.run import PipelineRun
published_pipeline_run = PipelineRun(ws.experiments["batch_scoring"], run_id)

RunDetails(published_pipeline_run).show()

Próximas etapasNext steps

Para ver isso funcionando de ponta a ponta, experimente o bloco de anotações de pontuação do lote no GitHub.To see this working end-to-end, try the batch scoring notebook in GitHub.

Saiba como executar blocos de anotações, seguindo o artigo Usar os blocos de anotações do Jupyter para explorar esse serviço.Learn how to run notebooks by following the article, Use Jupyter notebooks to explore this service.