نشر وتتبع خطوط أنابيب التعلم الآلي

ستوضح لك هذه المقالة كيفية مشاركة خط أنابيب التعلم الآلي مع زملائك أو عملائك.

خطوط أنابيب التعلم الآلي هي مهام سير عمل قابلة لإعادة الاستخدام لمهام التعلم الآلي. إحدى فوائد خطوط الأنابيب هي زيادة التعاون. يمكنك أيضا إصدار خطوط الأنابيب، مما يسمح للعملاء باستخدام النموذج الحالي أثناء العمل على إصدار جديد.

المتطلبات الأساسية

نشر خط أنابيب

بمجرد تشغيل خط أنابيب وتشغيله، يمكنك نشر خط أنابيب بحيث يعمل بمدخلات مختلفة. بالنسبة لنقطة نهاية REST لخط أنابيب منشور بالفعل لقبول المعلمات، يجب تكوين خط الأنابيب الخاص بك لاستخدام PipelineParameter الكائنات للوسيطات التي ستختلف.

  1. لإنشاء معلمة خط أنابيب، استخدم كائن PipelineParameter بقيمة افتراضية.

    from azureml.pipeline.core.graph import PipelineParameter
    
    pipeline_param = PipelineParameter(
      name="pipeline_arg",
      default_value=10)
    
  2. إضافة هذا PipelineParameter الكائن كمعلمة إلى أي من الخطوات في خط الأنابيب كما يلي:

    compareStep = PythonScriptStep(
      script_name="compare.py",
      arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param],
      inputs=[ comp_data1, comp_data2],
      outputs=[out_data3],
      compute_target=compute_target,
      source_directory=project_folder)
    
  3. نشر خط الأنابيب هذا الذي سيقبل معلمة عند استدعائها.

    published_pipeline1 = pipeline_run1.publish_pipeline(
         name="My_Published_Pipeline",
         description="My Published Pipeline Description",
         version="1.0")
    

تشغيل خط أنابيب منشور

تحتوي جميع خطوط الأنابيب المنشورة على نقطة نهاية REST. باستخدام نقطة نهاية خط الأنابيب، يمكنك تشغيل خط الأنابيب من أي أنظمة خارجية، بما في ذلك عملاء غير Python. تتيح نقطة النهاية هذه "التكرار المدار" في سيناريوهات تسجيل الدفعات وإعادة التدريب.

هام

إذا كنت تستخدم التحكم في الوصول المستند إلى دور Azure (Azure RBAC) لإدارة الوصول إلى خط الأنابيب، فقم بتعيين أذونات سيناريو خط الأنابيب (التدريب أو التسجيل).

لاستدعاء تشغيل خط الأنابيب السابق، تحتاج إلى رمز مميز لرأس مصادقة Azure Active Directory. يتم وصف الحصول على مثل هذا الرمز المميز في مرجع فئة AzureCliAuthentication وفي دفتر الملاحظات المصادقة في Azure التعلم الآلي.

from azureml.pipeline.core import PublishedPipeline
import requests

response = requests.post(published_pipeline1.endpoint,
                         headers=aad_token,
                         json={"ExperimentName": "My_Pipeline",
                               "ParameterAssignments": {"pipeline_arg": 20}})

يجب أن تحتوي json الوسيطة الخاصة بطلب POST ، بالنسبة للمفتاح ParameterAssignments ، على قاموس يحتوي على معلمات خط الأنابيب وقيمها. بالإضافة إلى ذلك، قد تحتوي الوسيطة json على المفاتيح التالية:

المفتاح الوصف
ExperimentName اسم التجربة المرتبطة بنقطة النهاية هذه
Description نص حر يصف نقطة النهاية
Tags أزواج القيم الرئيسية ذات الشكل الحر التي يمكن استخدامها لتسمية الطلبات والتعليق عليها
DataSetDefinitionValueAssignments القاموس المستخدم لتغيير مجموعات البيانات دون إعادة التدريب (انظر المناقشة أدناه)
DataPathAssignments القاموس المستخدم لتغيير مسارات البيانات دون إعادة التدريب (انظر المناقشة أدناه)

تشغيل خط أنابيب منشور باستخدام C #

توضح التعليمة البرمجية التالية كيفية استدعاء خط أنابيب بشكل غير متزامن من C #. يعرض مقتطف التعليمات البرمجية الجزئي بنية المكالمة فقط وليس جزءا من عينة Microsoft. لا يعرض فصولا كاملة أو معالجة للأخطاء.

[DataContract]
public class SubmitPipelineRunRequest
{
    [DataMember]
    public string ExperimentName { get; set; }

    [DataMember]
    public string Description { get; set; }

    [DataMember(IsRequired = false)]
    public IDictionary<string, string> ParameterAssignments { get; set; }
}

// ... in its own class and method ... 
const string RestEndpoint = "your-pipeline-endpoint";

using (HttpClient client = new HttpClient())
{
    var submitPipelineRunRequest = new SubmitPipelineRunRequest()
    {
        ExperimentName = "YourExperimentName", 
        Description = "Asynchronous C# REST api call", 
        ParameterAssignments = new Dictionary<string, string>
        {
            {
                // Replace with your pipeline parameter keys and values
                "your-pipeline-parameter", "default-value"
            }
        }
    };

    string auth_key = "your-auth-key"; 
    client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", auth_key);

    // submit the job
    var requestPayload = JsonConvert.SerializeObject(submitPipelineRunRequest);
    var httpContent = new StringContent(requestPayload, Encoding.UTF8, "application/json");
    var submitResponse = await client.PostAsync(RestEndpoint, httpContent).ConfigureAwait(false);
    if (!submitResponse.IsSuccessStatusCode)
    {
        await WriteFailedResponse(submitResponse); // ... method not shown ...
        return;
    }

    var result = await submitResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
    var obj = JObject.Parse(result);
    // ... use `obj` dictionary to access results
}

تشغيل خط أنابيب منشور باستخدام Java

تعرض التعليمة البرمجية التالية استدعاء إلى خط أنابيب يتطلب المصادقة (راجع إعداد المصادقة ل Azure التعلم الآلي الموارد ومهام سير العمل). إذا تم نشر خط الأنابيب الخاص بك بشكل عام ، فلن تحتاج إلى المكالمات التي تنتج authKey. لا يعرض مقتطف التعليمات البرمجية الجزئي فئة Java ولوحة الغلايات التي تتعامل مع الاستثناءات. يستخدم الرمز Optional.flatMap لتسلسل الدالات معا التي قد ترجع فارغة Optional. استخدام flatMap يقصر ويوضح الرمز ، ولكن لاحظ أن getRequestBody() يبتلع الاستثناءات.

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
// JSON library
import com.google.gson.Gson;

String scoringUri = "scoring-endpoint";
String tenantId = "your-tenant-id";
String clientId = "your-client-id";
String clientSecret = "your-client-secret";
String resourceManagerUrl = "https://management.azure.com";
String dataToBeScored = "{ \"ExperimentName\" : \"My_Pipeline\", \"ParameterAssignments\" : { \"pipeline_arg\" : \"20\" }}";

HttpClient client = HttpClient.newBuilder().build();
Gson gson = new Gson();

HttpRequest tokenAuthenticationRequest = tokenAuthenticationRequest(tenantId, clientId, clientSecret, resourceManagerUrl);
Optional<String> authBody = getRequestBody(client, tokenAuthenticationRequest);
Optional<String> authKey = authBody.flatMap(body -> Optional.of(gson.fromJson(body, AuthenticationBody.class).access_token);;
Optional<HttpRequest> scoringRequest = authKey.flatMap(key -> Optional.of(scoringRequest(key, scoringUri, dataToBeScored)));
Optional<String> scoringResult = scoringRequest.flatMap(req -> getRequestBody(client, req));
// ... etc (`scoringResult.orElse()`) ... 

static HttpRequest tokenAuthenticationRequest(String tenantId, String clientId, String clientSecret, String resourceManagerUrl)
{
    String authUrl = String.format("https://login.microsoftonline.com/%s/oauth2/token", tenantId);
    String clientIdParam = String.format("client_id=%s", clientId);
    String resourceParam = String.format("resource=%s", resourceManagerUrl);
    String clientSecretParam = String.format("client_secret=%s", clientSecret);

    String bodyString = String.format("grant_type=client_credentials&%s&%s&%s", clientIdParam, resourceParam, clientSecretParam);

    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(authUrl))
        .POST(HttpRequest.BodyPublishers.ofString(bodyString))
        .build();
    return request;
}

static HttpRequest scoringRequest(String authKey, String scoringUri, String dataToBeScored)
{
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(scoringUri))
        .header("Authorization", String.format("Token %s", authKey))
        .POST(HttpRequest.BodyPublishers.ofString(dataToBeScored))
        .build();
    return request;

}

static Optional<String> getRequestBody(HttpClient client, HttpRequest request) {
    try {
        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        if (response.statusCode() != 200) {
            System.out.println(String.format("Unexpected server response %d", response.statusCode()));
            return Optional.empty();
        }
        return Optional.of(response.body());
    }catch(Exception x)
    {
        System.out.println(x.toString());
        return Optional.empty();
    }
}

class AuthenticationBody {
    String access_token;
    String token_type;
    int expires_in;
    String scope;
    String refresh_token;
    String id_token;
    
    AuthenticationBody() {}
}

تغيير مجموعات البيانات ومسارات البيانات دون إعادة التدريب

قد ترغب في التدريب والاستدلال على مجموعات البيانات ومسارات البيانات المختلفة. على سبيل المثال ، قد ترغب في التدريب على مجموعة بيانات أصغر ولكن الاستدلال على مجموعة البيانات الكاملة. تبديل مجموعات البيانات باستخدام DataSetDefinitionValueAssignments المفتاح الموجود في وسيطة json الطلب. تبديل مسارات البيانات باستخدام DataPathAssignments. التقنية لكليهما متشابهة:

  1. في البرنامج النصي لتعريف خط الأنابيب، قم بإنشاء مجموعة PipelineParameter بيانات. إنشاء DatasetConsumptionConfig أو DataPath من PipelineParameter:

    tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
    tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
    tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
    
  2. في البرنامج النصي ML الخاص بك، قم بالوصول إلى مجموعة البيانات المحددة ديناميكيا باستخدام Run.get_context().input_datasets:

    from azureml.core import Run
    
    input_tabular_ds = Run.get_context().input_datasets['tabular_dataset']
    dataframe = input_tabular_ds.to_pandas_dataframe()
    # ... etc ...
    

    لاحظ أن البرنامج النصي ML يصل إلى القيمة المحددة ل DatasetConsumptionConfig () وليس قيمة PipelineParameter (tabular_datasettabular_ds_param).

  3. في البرنامج النصي لتعريف خط الأنابيب، قم بتعيين المعلمة DatasetConsumptionConfig كمعلمة إلى PipelineScriptStep:

    train_step = PythonScriptStep(
        name="train_step",
        script_name="train_with_dataset.py",
        arguments=["--param1", tabular_ds_consumption],
        inputs=[tabular_ds_consumption],
        compute_target=compute_target,
        source_directory=source_directory)
    
    pipeline = Pipeline(workspace=ws, steps=[train_step])
    
  4. للتبديل بين مجموعات البيانات ديناميكيا في مكالمة REST الاستدلالية، استخدم DataSetDefinitionValueAssignments:

    tabular_ds1 = Dataset.Tabular.from_delimited_files('path_to_training_dataset')
    tabular_ds2 = Dataset.Tabular.from_delimited_files('path_to_inference_dataset')
    ds1_id = tabular_ds1.id
    d22_id = tabular_ds2.id
    
    response = requests.post(rest_endpoint, 
                             headers=aad_token, 
                             json={
                                "ExperimentName": "MyRestPipeline",
                               "DataSetDefinitionValueAssignments": {
                                    "tabular_ds_param": {
                                        "SavedDataSetReference": {"Id": ds1_id #or ds2_id
                                    }}}})
    

تحتوي دفاتر الملاحظات التي تعرض مجموعة البيانات و PipelineParameter و Displaying DataPath و PipelineParameter على أمثلة كاملة لهذه التقنية.

إنشاء نقطة نهاية خط أنابيب تم إصدارها

يمكنك إنشاء نقطة نهاية خط أنابيب مع العديد من خطوط الأنابيب المنشورة خلفها. تمنحك هذه التقنية نقطة نهاية REST ثابتة أثناء تكرار خطوط أنابيب ML وتحديثها.

from azureml.pipeline.core import PipelineEndpoint

published_pipeline = PublishedPipeline.get(workspace=ws, name="My_Published_Pipeline")
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
                                            pipeline=published_pipeline, description="Test description Notebook")

إرسال مهمة إلى نقطة نهاية خط أنابيب

يمكنك إرسال مهمة إلى الإصدار الافتراضي من نقطة نهاية خط الأنابيب:

pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment")
print(run_id)

يمكنك أيضا إرسال وظيفة إلى إصدار معين:

run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
print(run_id)

يمكن تحقيق الشيء نفسه باستخدام واجهة برمجة تطبيقات REST:

rest_endpoint = pipeline_endpoint_by_name.endpoint
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "PipelineEndpointExperiment",
                               "RunSource": "API",
                               "ParameterAssignments": {"1": "united", "2":"city"}})

استخدام خطوط الأنابيب المنشورة في الاستوديو

يمكنك أيضا تشغيل خط أنابيب منشور من الاستوديو:

  1. سجّل الدخول إلى استوديو التعلم الآلي من Azure.

  2. عرض مساحة العمل الخاصة بك.

  3. على يمين الصفحة، اختر نقاط النهاية.

  4. في الجزء العلوي، حدد نقاط نهاية خط الأنابيب. list of machine learning published pipelines

  5. حدد خط أنابيب معين لتشغيل أو استهلاك أو مراجعة نتائج عمليات التشغيل السابقة لنقطة نهاية خط الأنابيب.

تعطيل خط أنابيب منشور

لإخفاء خط أنابيب من قائمة خطوط الأنابيب المنشورة، يمكنك تعطيله، إما في الاستوديو أو من SDK:

# Get the pipeline by using its ID from Azure Machine Learning studio
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()

يمكنك تمكينه مرة أخرى باستخدام p.enable(). لمزيد من المعلومات، راجع مرجع فئة PublishPipeline .

الخطوات التالية