Självstudie: Skapa en Azure Machine Learning-pipeline för batchbedömning

I den här avancerade självstudien lär du dig att skapa en Azure Machine Learning pipeline för att köra ett batchbedömningsjobb. Pipelines för maskininlärning optimerar arbetsflödet med hastighet, portabilitet och återanvändning, så att du kan fokusera på maskininlärning i stället för infrastruktur och automatisering. När du har byggt och publicerat en pipeline konfigurerar du en REST-slutpunkt som du kan använda för att utlösa pipelinen från alla HTTP-bibliotek på valfri plattform.

I exemplet används en tränad modell för neuralt nätverk med Inception-V3 som implementerats i Tensorflow för att klassificera omärkta bilder.

I den här självstudien slutför du följande uppgifter:

  • Konfigurera arbetsyta
  • Ladda ned och lagra exempeldata
  • Skapa datauppsättningsobjekt för att hämta och mata ut data
  • Ladda ned, förbered och registrera modellen på din arbetsyta
  • Etablera beräkningsmål och skapa ett bedömningsskript
  • Använda klassen ParallelRunStep för asynkron batchbedömning
  • Skapa, köra och publicera en pipeline
  • Aktivera en REST-slutpunkt för pipelinen

Om du inte har någon Azure-prenumeration kan du skapa ett kostnadsfritt konto innan du börjar. Prova den kostnadsfria eller betalda versionen Azure Machine Learning idag.

Förutsättningar

  • Slutför snabbstarten: Kom igång med Azure Machine Learning om du inte redan har en Azure Machine Learning arbetsyta eller en beräkningsinstans.
  • När du har slutfört snabbstarten:
    1. Välj Notebooks i studio.
    2. Välj fliken Exempel.
    3. Öppna anteckningsboken tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb.

Om du vill köra installationskursen i din egen lokala miljökan du komma åt självstudien på GitHub. Kör pip install azureml-sdk[notebooks] azureml-pipeline-core azureml-pipeline-steps pandas requests för att hämta de nödvändiga paketen.

Konfigurera arbetsytan och skapa ett datalager

Skapa ett arbetsyteobjekt från den befintliga Azure Machine Learning arbetsytan.

from azureml.core import Workspace
ws = Workspace.from_config()

Viktigt

Det här kodfragmentet förväntar sig att arbetsytans konfiguration sparas i den aktuella katalogen eller dess överordnade katalog. Mer information om hur du skapar en arbetsyta finns i Skapa och hantera Azure Machine Learning arbetsytor. Mer information om hur du sparar konfigurationen i filen finns i Skapa en konfigurationsfil för arbetsytan.

Skapa ett datalager för exempelbilder

På kontot pipelinedata hämtar du imagenet-utvärderingens offentliga dataexempel från den offentliga sampledata blobcontainern. Anropa register_azure_blob_container() för att göra data tillgängliga för arbetsytan under namnet images_datastore . Ange sedan arbetsytans standarddatalager som utdatalager. Använd utdatalagringen för att poängse utdata i pipelinen.

Mer information om hur du kommer åt data finns i Så här kommer du åt data.

from azureml.core.datastore import Datastore

batchscore_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name="images_datastore", 
                      container_name="sampledata", 
                      account_name="pipelinedata", 
                      overwrite=True)

def_data_store = ws.get_default_datastore()

Skapa datauppsättningsobjekt

När du skapar pipelines används objekt för att läsa data från arbetsytans datalager och objekt används för att överföra Dataset OutputFileDatasetConfig mellanliggande data mellan pipelinesteg.

Viktigt

Batchbedömningsexempel i den här självstudien använder bara ett pipelinesteg. I användningsfall som har flera steg innehåller det vanliga flödet följande steg:

  1. Använd Dataset objekt som indata för att hämta rådata, utföra viss transformering och sedan mata ut med ett OutputFileDatasetConfig -objekt.

  2. Använd OutputFileDatasetConfig utdataobjektet i föregående steg som ett indataobjekt. Upprepa det för efterföljande steg.

I det här scenariot skapar du objekt som motsvarar datalagerkatalogerna för både indatabilderna och klassificeringsetiketterna Dataset (y-test-värden). Du kan också skapa ett OutputFileDatasetConfig -objekt för batchbedömning av utdata.

from azureml.core.dataset import Dataset
from azureml.data import OutputFileDatasetConfig

input_images = Dataset.File.from_files((batchscore_blob, "batchscoring/images/"))
label_ds = Dataset.File.from_files((batchscore_blob, "batchscoring/labels/"))
output_dir = OutputFileDatasetConfig(name="scores")

Registrera datauppsättningarna på arbetsytan om du vill återanvända dem senare. Det här är valfritt.


input_images = input_images.register(workspace = ws, name = "input_images")
label_ds = label_ds.register(workspace = ws, name = "label_ds")

Ladda ned och registrera modellen

Ladda ned den förtränade Tensorflow-modellen för att använda den för batchbedömning i en pipeline. Skapa först en lokal katalog där du lagrar modellen. Ladda sedan ned och extrahera modellen.

import os
import tarfile
import urllib.request

if not os.path.isdir("models"):
    os.mkdir("models")
    
response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall("models")

Registrera sedan modellen på din arbetsyta så att du enkelt kan hämta modellen i pipelineprocessen. I den register() statiska funktionen är model_name parametern den nyckel som du använder för att hitta din modell i hela SDK:n.

from azureml.core.model import Model
 
model = Model.register(model_path="models/inception_v3.ckpt",
                       model_name="inception",
                       tags={"pretrained": "inception"},
                       description="Imagenet trained tensorflow inception",
                       workspace=ws)

Skapa och koppla fjärrbearbetningsmålet

Maskininlärningspipelines kan inte köras lokalt, så du kör dem på molnresurser eller fjärrbearbetningsmål. Ett fjärrbearbetningsmål är en återanvändbar virtuell beräkningsmiljö där du kör experiment och arbetsflöden för maskininlärning.

Kör följande kod för att skapa ett GPU-aktiverat AmlCompute mål och koppla det sedan till din arbetsyta. Mer information om beräkningsmål finns i konceptuella artikeln.

from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException
compute_name = "gpu-cluster"

# checks to see if compute target already exists in workspace, else create it
try:
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
    config = AmlCompute.provisioning_configuration(vm_size="STANDARD_NC6",
                                                   vm_priority="lowpriority", 
                                                   min_nodes=0, 
                                                   max_nodes=1)

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Skriva ett bedömningsskript

Om du vill göra bedömningsbedömningen skapar du ett batchbedömningsskript med namnet batch_scoring.py och skriver det sedan till den aktuella katalogen. Skriptet tar indatabilder, tillämpar klassificeringsmodellen och matar sedan ut förutsägelserna till en resultatfil.

Skriptet batch_scoring.py tar följande parametrar, som skickas från du ParallelRunStep skapar senare:

  • --model_name: Namnet på den modell som används.
  • --labels_dir: Platsen för labels.txt filen.

Pipelineinfrastrukturen använder klassen ArgumentParser för att skicka parametrar till pipelinesteg. I följande kod får till exempel det första argumentet --model_name egenskapsidentifieraren model_name . I funktionen init() används för att komma åt den här Model.get_model_path(args.model_name) egenskapen.

%%writefile batch_scoring.py

import os
import argparse
import datetime
import time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3

from azureml.core import Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset

slim = tf.contrib.slim

image_size = 299
num_channel = 3


def get_class_label_dict(labels_dir):
    label = []
    labels_path = os.path.join(labels_dir, 'labels.txt')
    proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines()
    for l in proto_as_ascii_lines:
        label.append(l.rstrip())
    return label


def init():
    global g_tf_sess, probabilities, label_dict, input_images

    parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    parser.add_argument('--labels_dir', dest="labels_dir", required=True)
    args, _ = parser.parse_known_args()

    label_dict = get_class_label_dict(args.labels_dir)
    classes_num = len(label_dict)

    with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
        input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel])
        logits, _ = inception_v3.inception_v3(input_images,
                                              num_classes=classes_num,
                                              is_training=False)
        probabilities = tf.argmax(logits, 1)

    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    g_tf_sess = tf.Session(config=config)
    g_tf_sess.run(tf.global_variables_initializer())
    g_tf_sess.run(tf.local_variables_initializer())

    model_path = Model.get_model_path(args.model_name)
    saver = tf.train.Saver()
    saver.restore(g_tf_sess, model_path)


def file_to_tensor(file_path):
    image_string = tf.read_file(file_path)
    image = tf.image.decode_image(image_string, channels=3)

    image.set_shape([None, None, None])
    image = tf.image.resize_images(image, [image_size, image_size])
    image = tf.divide(tf.subtract(image, [0]), [255])
    image.set_shape([image_size, image_size, num_channel])
    return image


def run(mini_batch):
    result_list = []
    for file_path in mini_batch:
        test_image = file_to_tensor(file_path)
        out = g_tf_sess.run(test_image)
        result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]})
        result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]])
    return result_list

Tips

Pipelinen i den här självstudien har bara ett steg och skriver utdata till en fil. För pipelines med flera steg använder du också för att ArgumentParser definiera en katalog för att skriva utdata för indata till efterföljande steg. Ett exempel på hur du kan skicka data mellan flera pipelinesteg med hjälp av ArgumentParser designmönstret finns i anteckningsboken.

Skapa pipelinen

Innan du kör pipelinen skapar du ett objekt som definierar Python-miljön och skapar de beroenden som skriptet batch_scoring.py kräver. Det huvudsakliga beroendet som krävs är Tensorflow, men du installerar azureml-core och som krävs av azureml-dataprep[fuse] ParallelRunStep. Ange också stöd för Docker och Docker-GPU.

from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.15.2",
                                            "azureml-core", "azureml-dataprep[fuse]"])
env = Environment(name="parallelenv")
env.python.conda_dependencies = cd
env.docker.base_image = DEFAULT_GPU_IMAGE

Skapa konfigurationen för att omsluta skriptet

Skapa pipelinesteget med hjälp av skriptet, miljökonfigurationen och parametrarna. Ange beräkningsmålet som du redan har kopplat till din arbetsyta.

from azureml.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    environment=env,
    entry_script="batch_scoring.py",
    source_directory=".",
    output_action="append_row",
    mini_batch_size="20",
    error_threshold=1,
    compute_target=compute_target,
    process_count_per_node=2,
    node_count=1
)

Skapa pipelinesteget

Ett pipelinesteg är ett objekt som kapslar in allt du behöver för att köra en pipeline, inklusive:

  • Miljö- och beroendeinställningar
  • Beräkningsresursen som pipelinen ska köras på
  • Indata och utdata samt eventuella anpassade parametrar
  • Referens till ett skript eller EN SDK-logik som ska köras under steget

Flera klasser ärver från den överordnade klassen PipelineStep . Du kan välja klasser för att använda specifika ramverk eller stackar för att skapa ett steg. I det här exemplet använder du klassen ParallelRunStep för att definiera steglogiken med hjälp av ett anpassat Python-skript. Om ett argument till skriptet antingen är indata till steget eller utdata från steget, måste argumentet definieras både i matrisen och antingen arguments input output parametern eller .

I scenarier där det finns mer än ett steg blir en objektreferens i outputs matrisen tillgänglig som indata för ett efterföljande pipelinesteg.

from azureml.pipeline.steps import ParallelRunStep
from datetime import datetime

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

label_config = label_ds.as_named_input("labels_input")

batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--model_name", "inception",
               "--labels_dir", label_config],
    side_inputs=[label_config],
    parallel_run_config=parallel_run_config,
    allow_reuse=False
)

En lista över alla klasser som du kan använda för olika stegtyper finns i stegpaketet.

Skicka pipelinen

Kör nu pipelinen. Skapa först ett Pipeline -objekt med hjälp av arbetsytans referens och pipelinesteget som du skapade. Parametern steps är en matris med steg. I det här fallet finns det bara ett steg för batchbedömning. Om du vill skapa pipelines som har flera steg placerar du stegen i ordning i den här matrisen.

Använd sedan funktionen Experiment.submit() för att skicka pipelinen för körning. Funktionen wait_for_completion matar ut loggar under pipeline-byggprocessen. Du kan använda loggarna för att se den aktuella förloppet.

Viktigt

Den första pipelinekörningen tar ungefär 15 minuter. Alla beroenden måste laddas ned, en Docker-avbildning skapas och Python-miljön etableras och skapas. Det tar betydligt kortare tid att köra pipelinen igen eftersom resurserna återanvänds i stället för att skapas. Den totala körningstiden för pipelinen beror dock på arbetsbelastningen för dina skript och de processer som körs i varje pipelinesteg.

from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'Tutorial-Batch-Scoring').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

Ladda ned och granska utdata

Kör följande kod för att ladda ned utdatafilen som skapats från batch_scoring.py skriptet. Utforska sedan bedömningsresultaten.

import pandas as pd

batch_run = next(pipeline_run.get_children())
batch_output = batch_run.get_output_data("scores")
batch_output.download(local_path="inception_results")

for root, dirs, files in os.walk("inception_results"):
    for file in files:
        if file.endswith("parallel_run_step.txt"):
            result_file = os.path.join(root, file)

df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
print("Prediction has ", df.shape[0], " rows")
df.head(10)

Publicera och köra från en REST-slutpunkt

Kör följande kod för att publicera pipelinen till din arbetsyta. På arbetsytan i Azure Machine Learning Studio kan du se metadata för pipelinen, inklusive körningshistorik och varaktigheter. Du kan också köra pipelinen manuellt från studio.

När du publicerar pipelinen kan du använda en REST-slutpunkt för att köra pipelinen från val annat HTTP-bibliotek på valfri plattform.

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", description="Batch scoring using Inception v3 model", version="1.0")

published_pipeline

Om du vill köra pipelinen från REST-slutpunkten behöver du ett autentiseringshuvud av OAuth2-typ av bearer-typ. I följande exempel används interaktiv autentisering (i illustrationssyfte), men för de flesta produktionsscenarier som kräver automatisk eller huvudlös autentisering använder du autentisering med tjänstens huvudnamn enligt beskrivningen i den här artikeln.

Autentisering med tjänstens huvudnamn innebär att du skapar en appregistrering i Azure Active Directory. Först genererar du en klienthemlighet och sedan ger du tjänstens huvudnamn rollåtkomst till maskininlärningsarbetsytan. Använd klassen ServicePrincipalAuthentication för att hantera ditt autentiseringsflöde.

Både InteractiveLoginAuthentication och ServicePrincipalAuthentication ärver från AbstractAuthentication . I båda fallen använder du get_authentication_header() funktionen på samma sätt för att hämta -huvudet:

from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

Hämta REST-URL:en från endpoint egenskapen för det publicerade pipelineobjektet. Du hittar även REST-URL:en på din arbetsyta i Azure Machine Learning Studio.

Skapa en HTTP POST-begäran till slutpunkten. Ange ditt autentiseringshuvud i begäran. Lägg till ett JSON-nyttolastobjekt som har experimentnamnet.

Skicka en begäran om att utlösa körningen. Inkludera kod för att komma Id åt nyckeln från svarsordlistan för att hämta värdet för körnings-ID:t.

import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "Tutorial-Batch-Scoring",
                               "ParameterAssignments": {"process_count_per_node": 6}})
run_id = response.json()["Id"]

Använd körnings-ID:t för att övervaka status för den nya körningen. Den nya körningen tar ytterligare 10–15 minuter att slutföra.

Den nya körningen ser ut ungefär som den pipeline som du körde tidigare i självstudien. Du kan välja att inte visa fullständiga utdata.

from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments["Tutorial-Batch-Scoring"], run_id)
RunDetails(published_pipeline_run).show()

Rensa resurser

Slutför inte det här avsnittet om du planerar att köra andra Azure Machine Learning självstudier.

Stoppa beräkningsinstansen

Om du använde en beräkningsinstans stoppar du den virtuella datorn när du inte använder den för att minska kostnaderna.

  1. I din arbetsyta väljer du Beräkna.

  2. I listan väljer du namnet på beräkningsinstansen.

  3. Välj Stoppa.

  4. När du är redo att använda servern igen väljer du Starta.

Ta bort allt

Om du inte planerar att använda de resurser som du skapade, tar du bort dem så att du inte debiteras:

  1. I den Azure Portal menyn till vänster väljer du Resursgrupper.
  2. I listan över resursgrupper väljer du den resursgrupp som du skapade.
  3. Välj Ta bort resursgrupp.
  4. Ange resursgruppsnamnet. Välj sedan Ta bort.

Du kan också behålla resursgruppen men ta bort en enstaka arbetsyta. Visa arbetsytans egenskaper och välj sedan Ta bort.

Nästa steg

I den här självstudien om pipelines för maskininlärning har du gjort följande:

  • Skapat en pipeline med miljöberoenden som ska köras på en fjärransluten GPU-beräkningsresurs.
  • Skapat ett bedömningsskript för att köra batchförutsägelser med hjälp av en förtränad Tensorflow-modell.
  • Publicerat en pipeline och aktiverat den för att köras från en REST-slutpunkt.

Fler exempel på hur du skapar pipelines med hjälp av Machine Learning SDK finns i notebook-lagringsplatsen.