本文章是由機器翻譯。

TPL

平行處理工作的水平延展性

Jesus Aguilar

下載代碼示例

任務並行庫 (TPL),介紹了在 Microsoft.NET 框架中 4,授權應用程式開發人員創建的解決方案,利用並行處理的多核的電腦的電源。然而,在許多情況下,垂直方向 (通過添加更多內核) 擴展能力限制由多項因素,包括成本和主辦的限制。在這種情況下,如果需要,可擴充性最好分發處理跨陣列伺服器 ; 雲託管是這樣一個示例。在本文中我將描述概念的解決方案來實現此目的使用多個.NET 框架 4.5 的新功能的關鍵方面 (包括執行)。

基本假設

我將描述的方法需要幾種技術超越協力廠商物流,其中包括:

  • 任務並行庫 (TPL)
  • Windows Communication Foundation (WCF)
  • 託管的擴充性框架 (MEF)

請注意我將討論這些只有在我想要解決的問題。我假設你有好的理解,這些技術。

用戶端任務遠端、 任務協調員和任務執行節點

遠端任務用戶端是將隱藏引起的語義使用分散式的環境的複雜性的用戶端層。遠端任務用戶端直接交互的任務協調員,然後成為底層基礎架構的進入點。在高級別任務協調員具有下列屬性:

  1. 它是接觸的唯一與客戶點。
  2. 它公開必要的服務,要求執行的任務上的可擴展的平臺,以及取消的給定任務。
  3. 它可以處理頻寬限制和佇列的任務執行請求,它支援環境的健康運行。

任務執行節點是任務將在其中運行的進程的主機。將由協力廠商物流執行的任務的實際實現駐留在任務執行節點中。

下面是這些邏輯層的主要方面和資訊的流動:

  1. 遠端任務用戶端請求一個或多個任務的執行。
  2. 任務協調員提交的任務執行節點的請求。
  3. 任務執行節點執行的任務,並更新任務協調員在每個請求的狀態。
  4. 任務協調員更新用戶端的每個請求執行的結果。
  5. 任務執行節點駐留在負載平衡器後面,這樣可以添加更多節點,根據需要,提供橫向擴展的能力。

圖 1 描述了邏輯層和資訊的流動。

Scaling Tasks Horizontally
圖 1 水準縮放任務

請注意如何任務執行節點更新的任務協調員,然後更新遠端任務用戶端。我要描述基於在用戶端與任務協調員和任務協調員和任務執行的節點之間的雙向交流的實現。WCF 而言,這意味著允許任務執行節點回電話的任務協調員,以及隨後的任務協調員,更新用戶端,這樣做的雙工通道的使用。我將展示 WebSockets 來實現這一方針雙向通信的使用。WebSockets 運輸作為新綁定.NET 框架 4.5 中實現的可供 Windows 8。你會發現在綁定詳細資訊 bit.ly/SOLNiU

在用戶端和任務協調員

現在,您已瞭解三個主要的邏輯層 — — 遠端任務用戶端、 任務協調員和任務執行節點 — — 讓我們首先討論執行遠端任務的用戶端。請注意在使用時在整篇文章的"客戶"一詞,我所指遠端任務用戶端。

正如我剛才所說,用戶端的價值主張是隱藏的基礎元件的複雜性的能力。它實現這一點的一種方法是通過提供一個 API,給本地執行的任務,儘管他們可能會執行其他地方的印象。中的代碼圖 2 顯示 RemoteTaskClient 類的公共方法。

圖 2 公共方法的類 RemoteTaskClient

public class RemoteTaskClient<TResult> : IDisposable
{
  public void AddRequest(string typeName, 
    string[] parameters, CancellationToken tk)
  {...}
  public void AddRequest(string typeName, string[] parameters)
  {...}
  public Task<TResult>[] SubmitRequests()
  {...}
  public RemoteTaskClient(string taskCoodinatorEndpointAddress)
  {...}
  public void Dispose()
  {...}
}

您可以使用 AddRequest 方法添加遠端執行的請求。 您需要為每個請求指定的類型名稱 (這是實際執行包含基礎設施將作為協力廠商物流的任務遠端運行的委託的類型) 和關聯的參數。 然後,您可以提交的請求是通過 SubmitRequest 方法。 提交請求的結果是陣列的協力廠商物流的任務,一個用於每個請求。 這種方法將允許您管理由此產生的協力廠商物流任務,如果他們是在本地一樣。 例如,可以提交各種請求和等他們到完整,像這樣:

using (var c = new RemoteTaskClient<int>("..."))
  {
    c.AddRequest("...", null);
    c.AddRequest("...", null);
    var ts = c.SubmitRequests();
    Task.WaitAll(ts);
    foreach (var t in ts)
      Console.WriteLine(t.Result);
  }

RemoteTaskClient 的執行細節之前,讓我們看看服務操作和任務協調員公開的資料合同。 審查執行情況 RemoteTaskClient 之前瞭解這些合同將給你額外的上下文,因為依賴這些服務的用戶端執行。

中的代碼圖 3 顯示服務操作的任務協調員公開給用戶端。 SubmitRequest 操作,通過用戶端要求的執行一個或多個協力廠商物流的任務的能力。 用戶端還可以請求特定的協力廠商物流任務的並不完整,通過 CancelTask 操作被的取消。 請注意 UpdateStatus 操作是一個回檔。 它是通過用戶端執行的任務協調員將在用戶端狀態更新此回檔合同。

圖 3 服務操作

[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskCoordinator
  {
    [OperationContract(IsOneWay = true)]
    void SubmitRequest(List<STask> stask);
    [OperationContract]
    bool CancelTask(string Id);       
  }
  public interface ITaskUpdateCallback
  {
    [OperationContract (IsOneWay = true)]
    void UpdateStatus(string id, STaskStatus status, string result);
  }

讓我們看看資料合同,表示任務執行請求。 這是用戶端將發送到的任務協調員,反過來會將發生的實際執行的任務執行節點請求提交的資料實體。 中所示的類 STask, 圖 4,模特任務執行請求。 用戶端可以使用 STaskTypeName 和 STaskParameters 的屬性,設置它想要使用相關的參數執行的任務類型。 任務協調員將使用 Id 屬性作為邏輯層可用於將請求與實際系統中運行的協力廠商物流任務相關聯的唯一識別碼。

圖 4 STask 類

[DataContract]
  public class STask
  {
    [DataMember]
    public string Id
    { get; set; }
    [DataMember]
    public string STaskTypeName
    { get; set; }
    [DataMember]
    public string[] STaskParameters
    { get; set; }
  }

現在讓我們回到 RemoteTaskClient,並討論如何,我正打算將本地的協力廠商物流任務與任務執行的節點中執行的結果關聯起來。TPL 具有方便的類,TaskCompletionSource <TResult>,我可以使用創建協力廠商物流的任務,並控制其生命週期。這一機制讓我當給定的任務完成、 被取消或出現故障時的信號。在這裡的含義是必須對 TaskCompletionSource 的實例的相關轉到任務執行節點 (通過任務協調員) 的每個請求。為此,我實施中所示的類 ClientRequestInfo, 圖 5

圖 5 類 ClientRequestInfo

internal class ClientRequestInfo<TResult>
{
  internal STask TaskExecutionRequest
  { get; set; }
  internal TaskCompletionSource<TResult> CompletionSource
  { get; set; }
  internal ClientRequestInfo(string typeName, string[] args)
  {
    TaskExecutionRequest = new STask()
      {Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
        STaskParameters = args };
    CompletionSource = new TaskCompletionSource<TResult>();
  }
}

圖 6 顯示此類的建構函式的實現。

圖 6 ClientRequestInfo 建構函式

ITaskCoordinator _client;           
ConcurrentDictionary<string, ClientRequestInfo<TResult>> 
  _requests = new ConcurrentDictionary<string, 
  ClientRequestInfo<TResult>>();
  public RemoteTaskClient(string taskCoordinatorEndpointAddress)
  {           
    var factory = new DuplexChannelFactory<ITaskCoordinator>
       (new InstanceContext(new CallbackHandler<TResult>(_requests)),
       new NetHttpBinding(),
       new EndpointAddress(taskCoordinatorEndpointAddress));
    _client = factory.CreateChannel();
    ((IClientChannel)_client).Open();
  }

請注意我打開任務協調員的雙工通道並創建回檔 CallbackHandler 類型的實例。CallbackHandler 接收作為參數 _requests,其中包含 ClientRequestInfo 的實例。理由是 _requests 字典包含所有活動的情況下,用戶端的請求 (和與它們相關聯的 TaskCompletionSource 的實例),CallbackHandler 將處理中的更新任務協調員。因為多個服務請求將更新 _requests 字典,我需要確保執行緒安全,因此,需要創建此實例的 ConcurrentDictionary。

圖 7 顯示 CallbackHandler 類的實現。

圖 7 CallbackHandler 類

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
  ConcurrentDictionary<string, ClientRequestInfo<TResult>> _requests;
  public void UpdateStatus(string id, STaskStatus status, Object result)
    {
      ClientRequestInfo<TResult> info;
      if (_requests.TryRemove(id, out info))
      {                           
        switch (status)
        {
          case STaskStatus.
Completed: info.CompletionSource.SetResult(
              (TResult)result);
            break;
          case STaskStatus.Canceled: 
            info.CompletionSource.SetCanceled();
            break;
          case STaskStatus.Faulted: 
            info.CompletionSource.SetException(
              (Exception)result);
            break;
        }
      }
}
  internal CallbackHandler(ConcurrentDictionary<string,
    ClientRequestInfo<TResult>> requests)
  {
    requests = requests;
  }
}

接下來,讓我們看的 AddRequest 和 SubmitRequest 的方法,執行,如中所示圖 8

圖 8 AddRequest 和 SubmitRequest 方法

public void AddRequest(string typeName, string[] parameters, 
  CancellationToken tk)
{
  var info = new ClientRequestInfo<TResult>(typeName, args);
  _buffer.Add(info);
  tk.Register(()=> _client.CancelTask(info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
  {
    _buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
  }
public Task<TResult>[] SubmitRequests()
  {
    if (_buffer.Count == 0)
      return null;
    var req = _buffer.Select((r) =>
    {
      _requests.TryAdd(r.TaskExecutionRequest.Id, r);
      return r.TaskExecutionRequest;                
    });
    _client.SubmitRequest(req.ToList<STask>());
    var ret =  _buffer.Select(r =>
      r.CompletionSource.Task).ToArray<Task<TResult>>();
    _buffer.Clear();
    return ret;
  }

跟蹤用戶端的請求

正如你看到在上一節中,僅與任務協調員進行交互的用戶端和它是任務協調員來處理來自用戶端的請求以及隨後的協力廠商物流任務的執行的結果更新用戶端的責任。 隨著用戶端,這就要求堅持以某種形式的原始請求。 它還需要跟蹤的相應的回檔實例 (它允許與用戶端通信) ; (需要,您將看到以後,在取消方案) ; 連接相關聯的任務執行節點到通道 團體與任務執行節點 (以確定不再需要該頻道的時候) ; 對單個調用關聯的所有任務執行請求的唯一識別碼 以及地位和執行的結果。 圖 9 顯示 STaskInfo 類,將此資訊的實體的定義。 此外,我將使用 ConcurrentDictionary < TKey、 TValue > 的單個實例 作為持久性機制。

圖 9,STaskInfo 和 CoordinatorCoNtext 的類

public class STaskInfo
  {
    public string ExecutionRequestId
    { get; set; }
    public STask ClientRequest
    { get; set; }
    public ITaskUpdateCallback CallbackChannel
    { get; private set; }
    public ITaskExecutionNode ExecutionRequestChannel
    { get; set; }
    public STaskInfo(ITaskUpdateCallback callback)
    {
      CallbackChannel = callback;
    }
  }
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string, STaskInfo> 
  _submissionTracker =
  new ConcurrentDictionary<string, STaskInfo>();
...
}

最後,注意,_submissionTracker 包含在 CoordinatorCoNtext 類中。我將使用此類執行任務協調員的主要功能。

處理用戶端請求

任務協調員是唯一條目的客戶,這意味著它必須能夠處理作為許多用戶端同時保證任務執行節點變得飽和 (在資源) 盡可能請求。這並不像看起來那麼簡單。為了更好地解釋潛在的挑戰,讓我們看看一個簡單的解決方案:

  1. 任務協調員公開通過該用戶端提交任務執行請求的服務操作。
  2. 任務協調員提交這些請求執行它們的任務執行節點和跟蹤的這些請求 — — 也就是說,它仍然存在狀態。

圖 10 演示此提交過程的基本實現。

圖 10 執行提交的過程

public class TaskCoordinatorService : ITaskCoordinator
{
...
public void SubmitRequest(List<STask> stasks)
  {    
    CoordinatorContext.SendTasksToTaskHandler(stasks);
  }
...
}
public static class CoordinatorContext
{
...
internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
  {
  var clientFactory = //Client factory creation logic.. 
  var channel = clientFactory.CreateChannel();
  foreach (var stask in stasks)
    _submissionTracker.TryAdd(stask.Id, stask);
  try
  {
    ((IClientChannel)channel).Open();
    channel.Start(stasks);
  }
  catch (CommunicationException ex)
  {   
    // Error handling and logging ...
}
  finally
  {
    if (((IClientChannel)channel).State != CommunicationState.Faulted)               
       ((IClientChannel)channel).Close();
  }
}
...
}

但是,這種過於簡單的實現不會很好工作在某些情況下:

  • 如果用戶端提交大量的單個請求中的任務,他們都將最終在單個任務執行節點中,從而導致 (假設有一個以上的任務執行節點) 的可用資源不均衡的利用率。
  • 在峰值負載情況下,系統可能會排氣任務執行節點中的可用資源,如果執行的協力廠商物流任務的數目超過了這些資源可以處理。什麼作為協力廠商物流的任務執行綁定到特定資源 (如記憶體),在繁忙的情況下可以增加可能使系統停止回應時,這可能是這種情況。

油門

應付這種挑戰的方法是以某種方式"管理"任務執行請求,只得通過系統。在這種情況下,您可以認為任務協調員作為節流控制器。討論節流過程之前,然而,讓我們回顧油門的語義,連同節流過程,我將使用減輕這些風險。

第一種方案可以緩解的上限任務協調員可以提交一個請求中的任務執行節點的任務執行請求的數量。我會打電話給這個節氣門 maxSTasks­PerRequest。使用此方法,負載平衡器演算法將能夠完成其工作的負載均衡的可用任務執行節點。

第二種情況是更具挑戰性。看似合理的解決辦法是帽的任務執行節點將在特定號碼執行的任務數。我會把這個節氣門稱為 maxNumberOfTasks。

除此油門,該解決方案可受益于有限制的任務正在執行基於其類型的數量的另一個節氣。要解釋為什麼這是非常有用,讓我們考慮方案中的任務執行節點有兩種類型的部署,T1 和 T2 的任務。是 CPU 綁定的 T1 和 T2 是 O 綁定的磁片。在這種情況下,提交的 T1 任務的執行請求的用戶端的輸送量是更有可能會受受約束的相同類型的活動任務 — — T1 次數如此越高任務較大影響。T2 任務受不同的約束,因為他們對 T1 任務產生的影響並不相同。有能力通過類型手段限制執行的任務可以控制可以運行多少個 T1 任務,在任何特定時間,讓我以最大限度的 CPU 資源,因此,總輸送量。我將這個節氣稱為 maxNumberOfTasksByType。

佇列和頻寬限制

現在,你瞭解的油門和油門如何能有效地維持任務執行節點的健康運作的語義,讓我們看看當達到油門指定的限制時,會發生什麼 — — 就是實際的節流過程。

一種選擇是只是引發異常。然而,這會影響整體輸送量的解決方案,因為用戶端將引發檢查特定的錯誤或故障,然後重新提交請求,直到任務協調員可以成功地處理它們的開銷。另一種方法是使用伺服器端佇列暫時從用戶端和一個監視器類似過程 (提交者的進程),按固定時間間隔將從佇列讀取請求,並提交給任務執行節點請求。我將使用審批過程來執行實際的限制,因為提交者從請求佇列讀取通過考慮下列規則:

  1. 蓋,可以是 de-queued 至 maxSTasksPerRequest 的請求的數量。
  2. 如果達到節流 maxNumberOfTasks,則阻止德排隊請求的請求佇列將保持原樣。
  3. 如果達到節流 maxNumberOfTasksByType,類似,然後再進行排隊請求回到到請求佇列。請求進行排隊再次允許繼續處理其他類型的任務。此策略提供平等機會的執行佇列中的所有任務。然而,在某些情況下,可能會考慮使用優先順序佇列。你會發現很好的參考,在 bit.ly/NF0xQq

圖 11 說明了此過程。

The Submission Process
圖 11 提交的過程

我會開始通過顯示代碼描述這一進程的執行情況 (見圖 12) 為排隊的請求在請求中的佇列,它接收來自用戶端的請求的 SubmitRequest 服務操作。

圖 12 SubmitRequest 服務操作

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
  {
    public void SubmitRequest(List<STask> stasks)
    {           
      CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
    }   
    ...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var stask in stasks)               
      _requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
  }
...
}

接下來,讓我們看一下中顯示的審批過程執行圖 13

圖 13 提交者執行

public static class CoordinatorContext
{
  ...
static CoordinatorContext()
  {
    Submitter(...);
  }
  private static async void Submitter(int interval)
  {
    while (true)
    {
      await Task.Delay(interval);
      SendTaskRequestToTaskExecutionNode(
      GetTasksFromRequestQ());
    }
  }
  ...
}

圖 12圖 13,你可以看到在請求佇列和提交者請求任務 de-queuing (讀) 從請求佇列服務操作 enqueing (書面)。在這種情況下,您需要確保資料的基礎結構 — — 佇列 — —­是執行緒安全的。幸運的是,有正是針對這一點,<T> ConcurrentQueue 一類。所以我將請求作為基礎存儲庫中使用這種類型的單個實例。

public static class CoordinatorContext
{
  ...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
  new ConcurrentQueue<STaskInfo>();
  ...
}

現在,讓我們回顧 GetTasksFromRequestQ,讀取任務時執行間隔方法的實現。正是在這種方法節流過程發生與上文所述的油門應用的地方。圖 14 演示如何實現這一進程。

圖 14 GetTasksFromRequestQ 執行

public static class CoordinatorContext
{
  ...internal static List<STaskInfo> GetTasksFromRequestQ()
{
  var ret = new List<STaskInfo>();
  var maxSTasksPerRequest = //From a configuration
  var maxNumberOfTasks = //From a configuration
  var count =  // Count of submitted or executing tasks
  var countByType = // Enumerable of count by type
  for (int i = 0; i < maxSTasksPerRequest; i++)
  {
    STaskInfo info;
    if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
      return ret;
    var countTT = // Count of submitted or executing tasks of
                  // the type of the current item
    if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
    { _requestQ.Enqueue(info); }
    else ret.Add(info);
  }
  return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
  // Logic to read from a configuration repository the value by task type name
}
...
}

在執行的目標圖 14 獲得允許進程以評估頻寬限制條件的數位。 圖 15 顯示似是而非的 LINQ 查詢,可以執行針對 _submissionTracker,以及一個包含返回的專案 (ret) 獲取這些值的清單。 請注意,這種做法可能會成功,這會降低性能。 如果是這樣,作為替代您可以實施一套的執行緒安全計數器遞增或遞減為專案添加或刪除從提交跟蹤器實例並使用這些計數器而不是直接查詢的並行的詞典。

圖 15 的頻寬限制值

var countByType = (from t in _submissionTracker.Values
                   group t by t.ClientRequest.STaskTypeName into g
                   select new
                   {
                      TypeName =  g.Key,
                      Count = g.Count()
                   });
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
               where tt.TypeName == info.ClientRequest.STaskTypeName
               select tt.Count).SingleOrDefault()+ 
                   ret.Where((rt) => rt.ClientRequest.STaskTypeName == 
                   info.ClientRequest.STaskTypeName)
                   .Count();

向任務執行節點發送請求並處理結果

到目前為止我討論的任務協調員管理請求的方式。 讓我們看看如何任務協調員提交到現在考慮節流過程的任務執行節點的請求。 為了提供更好的上下文,讓我們首先查看任務執行節點 (通過負載平衡器) 公開的服務操作:

[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskExecutionNode
  {
    [OperationContract]
    void Start(List<STask> stask);
    [OperationContract]
    void Cancel(string Id);
  }

正如其名稱表明,這些行動的目的是要啟動的任務執行請求的清單,並請求取消某一任務。 服務合同利用相同的回檔合同,要更新的任務協調員通過執行合同。

圖 16 顯示更新的任務協調員在哪裡存儲 STaskInfo 的 SendTaskToTaskExecutionNode 方法實現中 _submissionTracker 實例並調用任務執行的節點上啟動服務操作。

圖 16 SendTaskToTaskExecutionNode 和支援方法

internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
  {
  if (staskInfos.Count() == 0)
    return;
  var channel = new DuplexChannelFactory<ITaskExecutionNode>(
                new InstanceContext(new CallbackHandler()),
                new NetHttpBinding(), new EndpointAddress(“http://.../”))
                .CreateChannel();
  try
  {
    var requestId = Guid.NewGuid().ToString();
    var reqs = staskInfos.Select(s => AddRequestToTracker(requestId,s, channel))
      .Where(s => s != null);
    ((IChannel)channel).Open();
    channel.Start(reqs.ToList<STask>());
  }
  catch (CommunicationException ex)
  {
    foreach (var stask in staskInfos)
      HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
  }
  }
private static STask AddRequestToTracker(string requestId,
  STaskInfo info, ITaskExecutionNode channel)
{
  info.ExecutionRequestId = requestId;
  info.ExecutionRequestChannel = channel;
  if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
    return info.ClientRequest;
  HandleClientUpdate(info.ClientRequest.Id, STaskStatus.Faulted,
    new Exception(“Failed to add “));
  return null;
}

請注意 SendTaskToTaskExecutionNode 方法創建回檔實例來處理任務執行節點中的任務執行的結果:

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
  public class CallbackHandler : ITaskUpdateCallback
  {
    public void UpdateStatus(string id, STaskStatus status, string result)
    {
      CoordinatorContext.HandleClientUpdate (id, status, result);
    }
  }

CallbackHandler 通過調用 HandleClientUpdate 方法處理回檔操作。 此方法檢索和從 submitterTracker 中刪除相應的 STaskInfo 實例並執行回檔到用戶端,更新結果。 此外,如果這是組中的最後一個請求,則關閉任務協調員和任務執行節點之間的通道。 圖 17 顯示 HandleClientUpdate 方法的實現。

圖 17 HandleClientUpdate 和支援方法

internal async static void HandleClientUpdate(
  string staskId, STaskStatus status, object result)
  {
    STaskInfo info;
    if (!_submissionTracker.TryGetValue(staskId, out info))
      throw new Exception(“Could not get task from the tracker”);
try
  {
    await Task.Run(() =>
      info.CallbackChannel.UpdateStatus(info.ClientRequest.Id, status, result));
    RemoveComplete(info.ClientRequest.Id);           
  }
catch(AggregateException ex)
  {
  // ...
}
  }
private static void RemoveComplete(string staskId)
  {
    STaskInfo info;
    if (!_submissionTracker.TryRemove(staskId, out info))
      throw new Exception(“Failed to be removed from the tracking collection”);
    if (_submissionTracker.Values.Where((t) => t.ExecutionRequestId ==
      info.ExecutionRequestId).Count() == 0)
      CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
  }
  private static void CloseTaskRequestChannel(IChannel channel)
  {
    if (channel != null && channel.State != CommunicationState.Faulted)
      channel.Close();
  }

任務執行者

在用戶端代碼中,類型名稱是必需的參數之一添加請求時。 此值最終使得任務執行節點。 TypeName 值執行公開一個函數委託,它封裝打算作為並行任務運行的功能並駐留在任務執行的所有節點中的介面的類型名稱。 我就會調用此介面 IRunnableTask。 此介面的實現者應期望作為參數接收的解除標記和一個參數陣列,從用戶端。 委託還應該返回結果的任務。 這裡是介面:

public interface IRunnableTask
{
  Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}

在任務執行節點啟動任務

在高級別任務執行節點是負責"改造"成協力廠商物流可以執行的實際任務的任務執行請求 — — 即,啟動協力廠商物流任務。 圖 18 演示如何實現此過程中,我將討論。

圖 18 開始任務

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{              
  public void Start(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var t in stasks)
      TaskExecutionContext.Start(t,callback);           
  }
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
  try
  {
    // Step 1.a
    var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
    // Step 1.b
    var rtask = from t in rtasks
                where t.Value.GetType().FullName == stask.STaskTypeName
                select t.Value;
    // Step 2
    var cs = new CancellationTokenSource();
    var ct = cs.Token;
    TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
    // Step 3 
    Task<Object>
      .Run(rtask.First().Run(ct, stask.STaskParameters), ct)
      .ContinueWith(tes => UpdateStatus(tes, stask, callback));
  }
  catch (Exception ex)
  {
    ...
}
}
...
}

步驟 1 (b):在此階段,需要創建實例的 IRunnableTask,將返回一個委託,將作為類型的用戶端請求的任務運行任務執行節點。 為此,我利用 MEF 和.NET 框架 4.5,允許無屬性的配置方法中的新功能。 中的代碼圖 19 創建單個容器實例,匯出所有的 IRunnableTask 實現位於目錄"擴展。"關於 MEF 有關的詳細資訊和無屬性的配置方法,請參見 2012 年 6 月 MSDN 雜誌文章,"無屬性的方法對配置 MEF"在 msdn.microsoft.com/magazine/jj133818

圖 19 創建容器

internal static  class CompositionUtil
{
  private readonly static Lazy<CompositionContainer> _container =
    new Lazy<CompositionContainer>(() =>
    {
      var builder = new RegistrationBuilder();
      builder.ForTypesDerivedFrom<IRunnableTask>()                         
             .Export<IRunnableTask>()
             .SetCreationPolicy(CreationPolicy.NonShared);
      var cat = new DirectoryCatalog(“extensions”, builder);
      return new CompositionContainer(cat, true, null);
    }
  ,true);
  internal static CompositionContainer ContainerInstance
  {
    get { return _container.Value; }
  } 
}

現在我們再回中的代碼圖 18。 該代碼使用容器獲取出口的 IRunnableTask,類型,然後選擇具有匹配的用戶端請求的類型名稱的實例。 請注意我作出關鍵假設是只有一個任務實例對應的用戶端請求的類型。 這是我使用 LINQ 查詢返回的第一個實例的原因。

쭮힗 2좬其實創建該協力廠商物流任務之前, 的代碼創建解除標記源和解除標記。 我會跟蹤的 ConcurrentDictionary < TKey、 TValue > 的單個實例中的取消源。 當用戶端請求取消任務執行節點將使用此取消源的清單。 下面是此實例的定義:

public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string, 
  CancellationTokenSource> _cancellationSources =
  new ConcurrentDictionary<string, CancellationTokenSource>();
...
}

쭮힗 3좬在這一點上,我運行該任務,與我剛才創建的解除標記。 任務之後繼續執行任務。繼續進行任務需要因為必須更新任務協調員,通過進行服務調用,與協力廠商物流任務完成後 (成功或故障) 執行的結果。 作為圖 20 演示,我將封裝在委託協力廠商物流任務、 任務執行要求和任務協調員的回檔實例作為參數接收的任務協調員,更新的過程。

圖 20 封裝更新過程

private static Action<Task<Object>, STask, 
  ITaskUpdateCallback> UpdateStatus = (t, st, cb) =>
  {
    try
    {
      STaskStatus s;
      Object r = null;
      switch (t.Status)
      {
        case TaskStatus.Canceled: s = STaskStatus.Canceled;
          break;
        case TaskStatus.Faulted:
          s = STaskStatus.Faulted;
          r = t.Exception.Flatten();
          break;
        case TaskStatus.RanToCompletion:
          s = STaskStatus.Completed;
          r = t.Result;
          break;
        default:
          s = STaskStatus.Faulted;
          r = new Exception("Invalid Status");
          break;
      }
      CancellationTokenSource cs;
      TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
      cb.UpdateStatus(st.Id, s, r);
    }
    catch (Exception ex)
  {
  // Error handling
  }
};

要求並處理取消通知

TPL 提供為執行任務取消的機制。 為此,委託封裝作為協力廠商物流的任務運行的實際過程需要取消請求作出回應,並終止執行。 任務取消有關的更多資訊,參閱 MSDN 庫文章,"任務取消,"在 bit.ly/NYVTO0

IRunnableTask 介面中的參數之一是解除標記。 任務執行節點將創建的每個任務的標記,這取決於介面的實施者可以確定何時取消請求檢查和優雅地終止該進程。 中的代碼圖 21 顯示了一個簡單的任務,同時檢查是否要求的取消計算的範圍,偶數數。

圖 21 檢查取消

public class MySimpleCTask : IRunnableTask
{
  public Func<Object> Run(Nullable<CancellationToken> ct, 
    params string[] taskArgs)
  {
    var j = int.Parse(taskArgs[0]);
    var z = 0;
    return (() =>
  {
      for (int i = 0; i < j; i++)
      {
        if (i % 2 != 0)
        {
          z++;
          ct.Value.ThrowIfCancellationRequested();
        }
      }
      return z;
   });
  }
}

正如你看到我討論用戶端時,您可以添加與解除標記的請求和內部用戶端執行必要的訂購。 所以當引發取消時,取消要求傳送到任務協調員。 在收到取消請求,任務協調員檢查請求是否已提交任務執行節點,並且發送取消請求。 任務執行節點,然後查找取消源對應的任務要求的用戶端 id。 提交取消請求到任務執行節點是相對簡單 — — 你只需要找到對應于該請求的通道位置的任務協調員最初提交任務執行請求。 這些管道需要保持打開更新執行請求的狀態的回檔。

圖 22 顯示服務行動的執行中的任務協調員。

圖 22 實施服務操作的任務協調員

public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
  {
    return CoordinatorContext.CancelTask(Id);
  }
  ...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
  Id, out info) && info.ExecutionRequestChannel != null)
{
  info.ExecutionRequestChannel.Cancel(Id);
  return true;
}
return false;
}
  ...
}

最後, 圖 23 顯示服務行動的執行任務中執行的節點。

圖 23 在任務執行節點執行服務操作

class CancellationHandler : ICancellationHandler
  {
  public void Cancel(STask stask)
  {
    TaskExecutionContext.CanceTask(stask.Id);
  }
  }
public static class TaskExecutionContext
{
...
internal static void CancelTask(string Id)
  {
    CancellationTokenSource tknSrc;
    if (_cancellationSources.TryGetValue(Id, out tknSrc))
      tknSrc.Cancel(); }
...
}

可伸縮性的任務協調員和其他注意事項

值得注意此實現假定在單個節點上的運行任務協調員,但很有可能以橫向擴展 (這需要,至少,進行以下更改) 的任務協調員:

  • 負載平衡器用於訪問任務協調員將需要引入。
  • 如上文所述,頻寬限制的關鍵辦法有的總數,並按類型正在運行的任務數準確計數。在具有多個節點作為協調員任務運行的情況下,這些計數器將需要集中維護 (例如,在資料庫中) 同時仍能要更新或讀取 (避免競爭條件、 鎖死等) 以同步方式。

最後,讓我注意作為與任何發展方法,風險和價值需要權衡其他替代品,可以滿足您的需要和所提供現成的。例如,您可能需要考慮技術,如微軟 HPC 伺服器作為許多情況下,否則可能會認為解決的合理解決方案基於本文中描述的方法。

優化資源

TPL 提供了必要的基礎設施來實現內單多芯電腦的 CPU 資源的最優利用,也是執行縮放跨電腦邊界的方法非常有用。這能説明工作量自動化和批次處理方案並行凡需要不只是在一個單一的多核伺服器,但是跨多個伺服器以及。

要達到這種水準的可擴充性,需要考慮到幾個結構上的考慮。其中的關鍵:需要有能力將更多的資源添加到現有的農場和油門的語義需要執行的任務的資源的能力時在現有的資源之間平衡負載。微軟開發工具和技術提供了必要的構造塊,以實現體系結構,考慮到這些重要的考慮因素。

Jesus Aguilar 工程方面的開發人員作為高級應用發展經理總理支援微軟。

由於下面的技術專家對本文的審閱:Ryan 漿果、 史蒂夫 · 凱斯、 裡克 · 克洛德 · 歲喬希