TPL

タスクの並列実行を水平方向にスケール変換する

Jesus Aguilar

コード サンプルのダウンロード

アプリケーションの開発者は、Microsoft .NET Framework 4 で導入されたタスク並列ライブラリ (TPL) を使って、マルチコア コンピューターの並列処理能力を活かしたソリューションを作成できるようになりました。ところが、(コアを追加することによって) 垂直方向にスケール変換する能力は、コストやホスティングの制限など、多くの要因によって制約を受けることがよくあります。このような場合にスケーラビリティが必要なときは、多数のサーバーに処理を分散するのが理想です。クラウド ホスティングなどはその一例です。今回は、.NET Framework 4.5 に追加された多くの新機能を使用して処理の分散を実現する概念的なソリューションの重要な側面 (実装など) について説明します。

基本的な前提

今回説明するアプローチには、TPL だけでなく、以下のような複数のテクノロジが必要です。

  • タスク並列ライブラリ (TPL)
  • Windows Communication Foundation (WCF)
  • Managed Extensibility Framework (MEF)

これらのテクノロジは、解決しようとしている問題に関係する場合にのみ説明し、これらのテクノロジに十分知識があることを前提に話を進めます。

リモート タスク クライアント、タスク コーディネーター、タスク実行ノード

リモート タスク クライアントはクライアント側の層で、分散環境を使用することによって生じる複雑なセマンティクスを隠ぺいします。リモート タスク クライアントは、タスク コーディネーターと直接やり取りします。タスク コーディネーターは、基盤となるインフラストラクチャへのエントリ ポイントになります。大まかには、タスク コーディネーターには以下の特性があります。

  1. クライアントとの唯一の接点として機能する
  2. スケーラブルなプラットフォームでのタスクの実行や、特定のタスクのキャンセルを要求するために必要なサービスを公開する
  3. 環境の正常な運用を支える、タスク実行要求の絞込みやキューを処理する

タスク実行ノードは、タスクが実行されるプロセスのホストです。TPL によって実行されるタスクの実際の実装は、タスク実行ノードに常駐します。

以下に、これらの論理層の主な側面と情報の流れを示します。

  1. リモート タスク クライアントから、1 つ以上のタスクの実行を要求します。
  2. タスク コーディネーターから、タスク実行ノードに要求を送信します。
  3. タスク実行ノードがタスクを実行し、タスク コーディネーターで各要求の状態を更新します。
  4. タスク コーディネーターは、各要求の実行結果によって、クライアントを更新します。
  5. タスク実行ノードは、ロード バランサーの管理下におかれているため、必要なだけノードを追加でき、水平方向のスケール変換を実現します。

論理層と情報の流れを図 1 に示します。

Scaling Tasks Horizontally
図 1 タスクの水平方向のスケール変換

タスク実行ノードがタスク コーディネーターを更新し、その後タスク コーディネーターがリモート タスク クライアントを更新している点に注目してください。ここからは、クライアントとタスク コーディネーターの間と、タスク コーディネーターとタスク実行ノードの間の双方向通信に基づく実装について説明します。WCF では、タスク実行のノードからタスク コーディネーターにコールバックして、その後にタスク コーディネーターがクライアントを更新するためにコールバックを行えるようにする、二重チャネルを使用することを意味します。この双方向通信アプローチを実現するために WebSocket を使用する方法を紹介します。WebSocket 転送は、.NET Framework 4.5 で新しいバインドとして実装され、Windows 8 で利用可能です。bit.ly/SOLNiU (英語) で、バインドの詳細について説明されています。

クライアントとタスク コーディネーター

3 つの主要論理層として、リモート タスク クライアント、タスク コーディネーター、タスク実行ノードについて理解したところで、まずはリモート タスク クライアントの実装について説明します。ここからは、リモート タスク クライアントを単に「クライアント」と呼ぶことにします。

前述のように、クライアントの価値は、基盤となるコンポーネントの複雑さを隠ぺいするところにあります。これをクライアントが実現している方法の 1 つが、ローカルではない場所で実行されているにもかかわらず、タスクがローカルで実行されているような印象を与える API を提供することです。図 2 のコードに、RemoteTaskClient クラスのパブリック メソッドを示しています。

図 2 RemoteTaskClient クラスのパブリック メソッド

public class RemoteTaskClient<TResult> : IDisposable
{
  public void AddRequest(string typeName, 
    string[] parameters, CancellationToken tk)
  {...}
  public void AddRequest(string typeName, string[] parameters)
  {...}
  public Task<TResult>[] SubmitRequests()
  {...}
  public RemoteTaskClient(string taskCoodinatorEndpointAddress)
  {...}
  public void Dispose()
  {...}
}

AddRequest メソッドを使用すると、リモート実行に要求を追加することができます。各要求では、typeName (インフラストラクチャが TPL タスクとしてリモートに実行するデリゲートを含む実際の実装の型) と、関連付けられているパラメーターを指定する必要があります。指定したら、SubmitRequest メソッドを呼び出して要求を送信できます。要求を送信すると、TPL タスクの配列が、要求ごとに 1 つ生成されます。このアプローチによって、生成された TPL タスクをまるでローカル タスクのように管理できるようになります。たとえば以下のように、さまざまな要求を送信して、完了を待機することが可能です。

using (var c = new RemoteTaskClient<int>("..."))
  {
    c.AddRequest("...", null);
    c.AddRequest("...", null);
    var ts = c.SubmitRequests();
    Task.WaitAll(ts);
    foreach (var t in ts)
      Console.WriteLine(t.Result);
  }

RemoteTaskClient の実装の詳細に入る前に、タスク コーディネーターが公開する、サービス操作とデータ コントラクトについて見ておきましょう。RemoteTaskClient の実装について確認する前にこれらのコントラクトを理解しておけば、さらにコンテキストを把握できます。クライアントの実装は、これらのサービスに依存しています。

図 3 のコードは、タスク コーディネーターがクライアントに公開するサービス操作です。クライアントは、SubmitRequest 操作によって、1 つ以上の TPL タスクの実行を要求できます。また、CancelTask 操作を使用すれば、完了していない特定の TPL タスクのキャンセルを要求することもできます。UpdateStatus 操作はコールバックであることに注意してください。このコールバック コントラクトのクライアント側の実装によって、タスク コーディネーターが、クライアントで状態を更新します。

図 3 サービス操作

[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskCoordinator
  {
    [OperationContract(IsOneWay = true)]
    void SubmitRequest(List<STask> stask);
    [OperationContract]
    bool CancelTask(string Id);       
  }
  public interface ITaskUpdateCallback
  {
    [OperationContract (IsOneWay = true)]
    void UpdateStatus(string id, STaskStatus status, string result);
  }

タスクの実行要求を表すデータ コントラクトについて見てみましょう。これは、クライアントからタスク コーディネーターに送信するデータ エンティティです。タスク コーディネーターはこれを受け、実際の実行が行われるタスク実行ノードに要求を送信します。図 4 の STask クラスは、タスクの実行要求をモデル化しています。クライアントは、STaskTypeName プロパティと STaskParameters プロパティを使用することで、実行するタスクの型を関連パラメーターと共に設定できます。タスク コーディネーターは、一意識別子として Id プロパティを使用します。論理層はこのプロパティを使用して、システムで実行されている実際の TPL タスクと要求を相互に関連付けることができます。

図 4 STask クラス

[DataContract]
  public class STask
  {
    [DataMember]
    public string Id
    { get; set; }
    [DataMember]
    public string STaskTypeName
    { get; set; }
    [DataMember]
    public string[] STaskParameters
    { get; set; }
  }

では、RemoteTaskClient に戻り、ローカルの TPL タスクを、タスク実行ノードの実行結果とどのように関連付けようとしているかについて説明します。TPL には、TaskCompletionSource<TResult> という便利なクラスがあります。これを使えば、TPL タスクを作成し、そのライフサイクルを管理できます。このメカニズムによって、特定のタスクの完了、キャンセル、または失敗が生じたときに通知を行えます。つまり、(タスク コーディネーターによって) タスク実行ノードに送信される各要求は、TaskCompletionSource のインスタンスに関連付ける必要があります。このため、ClientRequestInfo というクラスを実装しました (図 5 参照)。

図 5 ClientRequestInfo クラス

internal class ClientRequestInfo<TResult>
{
  internal STask TaskExecutionRequest
  { get; set; }
  internal TaskCompletionSource<TResult> CompletionSource
  { get; set; }
  internal ClientRequestInfo(string typeName, string[] args)
  {
    TaskExecutionRequest = new STask()
      {Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
        STaskParameters = args };
    CompletionSource = new TaskCompletionSource<TResult>();
  }
}

図 6 に、このクラスのコンストラクターの実装を示します。

図 6 ClientRequestInfo コンストラクター

ITaskCoordinator _client;           
ConcurrentDictionary<string, ClientRequestInfo<TResult>> 
  _requests = new ConcurrentDictionary<string, 
  ClientRequestInfo<TResult>>();
  public RemoteTaskClient(string taskCoordinatorEndpointAddress)
  {           
    var factory = new DuplexChannelFactory<ITaskCoordinator>
       (new InstanceContext(new CallbackHandler<TResult>(_requests)),
       new NetHttpBinding(),
       new EndpointAddress(taskCoordinatorEndpointAddress));
    _client = factory.CreateChannel();
    ((IClientChannel)_client).Open();
  }

タスク コーディネーターに対して二重チャネルを開き、CallbackHandler 型のコールバック インスタンスを作成していることに注目してください。CallbackHandler は、パラメーターを、ClientRequestInfo のインスタンスを含む _requests として受け取ります。この理由は、_requests ディクショナリが、クライアント要求のアクティブなすべてのインスタンス (および、それらに関連付けられている TaskCompletionSource のインスタンス) を保持していて、CallbackHandler がタスク コーディネーターからの更新を処理するためです。複数のサービス要求が _requests ディクショナリを更新するので、スレッド セーフ性を保証しなければなりません。したがって、_requests を ConcurrentDictionary のインスタンスとして作成する必要があります。

図 7 に、CallbackHandler クラスの実装を示します。

図 7 CallbackHandler クラス

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
  ConcurrentDictionary<string, ClientRequestInfo<TResult>> _requests;
  public void UpdateStatus(string id, STaskStatus status, Object result)
    {
      ClientRequestInfo<TResult> info;
      if (_requests.TryRemove(id, out info))
      {                           
        switch (status)
        {
          case STaskStatus.
            Completed: info.CompletionSource.SetResult(
              (TResult)result);
            break;
          case STaskStatus.Canceled: 
            info.CompletionSource.SetCanceled();
            break;
          case STaskStatus.Faulted: 
            info.CompletionSource.SetException(
              (Exception)result);
            break;
        }
      }
}
  internal CallbackHandler(ConcurrentDictionary<string,
    ClientRequestInfo<TResult>> requests)
  {
    requests = requests;
  }
}

今度は、図 8 に示す AddRequest メソッドと SubmitRequest メソッドの実装を見てみましょう。

図 8 AddRequest メソッドと SubmitRequest メソッド

public void AddRequest(string typeName, string[] parameters, 
  CancellationToken tk)
{
  var info = new ClientRequestInfo<TResult>(typeName, args);
  _buffer.Add(info);
  tk.Register(()=> _client.CancelTask(info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
  {
    _buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
  }
public Task<TResult>[] SubmitRequests()
  {
    if (_buffer.Count == 0)
      return null;
    var req = _buffer.Select((r) =>
    {
      _requests.TryAdd(r.TaskExecutionRequest.Id, r);
      return r.TaskExecutionRequest;                
    });
    _client.SubmitRequest(req.ToList<STask>());
    var ret =  _buffer.Select(r =>
      r.CompletionSource.Task).ToArray<Task<TResult>>();
    _buffer.Clear();
    return ret;
  }

クライアントの要求を追跡する

前述のように、クライアントはタスク コーディネーターとのみやり取りします。クライアントからの要求を処理し、TPL タスクの実行結果によってクライアントを更新するのは、タスク コーディネーターの役割です。クライアントと同様に、コーディネーターでも同じ形式で元の要求を永続化する必要があります。また、実行の状態と結果を追跡することも必要です。さらに、対応するコールバック インスタンス (クライアントとの通信を可能にする)、接続と関連付けられているタスク実行ノードへのチャネル (キャンセルのシナリオで必要、後述)、および、タスク実行ノードの 1 回の呼び出しと関連付けられているすべてのタスク実行要求をまとめる一意識別子 (チャネルが不要になるタイミングを決定する) も追跡する必要があります。図 9 は、この情報を保持するエンティティ (STaskInfo クラス) の定義です。さらに、永続化のメカニズムとして、ConcurrentDictionary<TKey,TValue> の単一のインスタンスを使用します。

図 9 STaskInfo クラスと CoordinatorContext クラス

public class STaskInfo
  {
    public string ExecutionRequestId
    { get; set; }
    public STask ClientRequest
    { get; set; }
    public ITaskUpdateCallback CallbackChannel
    { get; private set; }
    public ITaskExecutionNode ExecutionRequestChannel
    { get; set; }
    public STaskInfo(ITaskUpdateCallback callback)
    {
      CallbackChannel = callback;
    }
  }
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string, STaskInfo> 
  _submissionTracker =
  new ConcurrentDictionary<string, STaskInfo>();
...
}

最後に、_submissionTracker が CoordinatorContext クラスに含まれていることに注目してください。このクラスを使用して、タスク コーディネーターの主な機能を実装します。

クライアント要求を処理する

タスク コーディネーターは、クライアントの唯一のエントリ ポイントです。つまり、タスク実行ノードが (リソースの点で) 飽和状態になるのを防ぎながら、できる限り多くのクライアント要求を処理する必要があります。これは、言うほど簡単ではありません。どのような課題が存在しているかを理解するため、以下の単純なソリューションを考えてみます。

  1. タスク コーディネーターは、クライアントがタスク実行要求の送信に使用するサービス操作を公開します。
  2. タスク コーディネーターは、これらの要求を実行するためにタスク実行ノードに送信し、その要求を追跡します。つまり、状態を永続化します。

図 10 に、この送信プロセスの基本的な実装を示します。

図 10 送信プロセスの実装

public class TaskCoordinatorService : ITaskCoordinator
{
...
  public void SubmitRequest(List<STask> stasks)
  {    
    CoordinatorContext.SendTasksToTaskHandler(stasks);
  }
...
}
public static class CoordinatorContext
{
...
  internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
  {
  var clientFactory = //Client factory creation logic.. 
  var channel = clientFactory.CreateChannel();
  foreach (var stask in stasks)
    _submissionTracker.TryAdd(stask.Id, stask);
  try
  {
    ((IClientChannel)channel).Open();
    channel.Start(stasks);
  }
  catch (CommunicationException ex)
  {   
    // Error handling and logging ...
  }
  finally
  {
    if (((IClientChannel)channel).State != CommunicationState.Faulted)               
       ((IClientChannel)channel).Close();
  }
}
...
}

ところが、この単純な実装がうまく機能しないシナリオがいくつかあります。以下にそれらを示します。

  • クライアントが 1 つの要求の中で多数のタスクを送信すると、そのすべてが単一のタスク実行ノードになるため、使用可能なリソースが均等に使用されません (使用可能なタスク実行ノードが 1 つより多くあると仮定しています)。
  • ピーク時の負荷のシナリオでは、実行中の TPL タスクの数が多すぎてタスク実行ノード内の使用可能なリソースでは対処できない場合、リソースをシステムが使い果たす場合があります。TPL タスクとして実行されていたものが特定のリソース (メモリなど) を拘束する場合がこれに当てはまります。ピーク時には、これによって、システムが応答しなくなることがあります。

調整

このような課題に対処する方法の 1 つは、タスク実行要求がシステムを経由するときに、なんらかの方法で要求を "管理" することです。このコンテキストでは、タスク コーディネーターを調整コントローラーと見なすことができます。調整のプロセスについて説明する前に、調整のセマンティクスについて確認しましょう。このセマンティクスは、先ほど挙げたリスクを軽減するために、調整プロセスと共に使用します。

1 つ目のシナリオのリスクを軽減するには、タスク コーディネーターが 1 つの要求の中でタスク実行ノードに送信できるタスク実行要求の数を制限します。この調整を、maxSTasksPerRequest と呼びます。このアプローチを使用すれば、ロード バランサー アルゴリズムが、使用可能なタスク実行ノード間で負荷のバランスを取るという役割を遂行できます。

これ以上に対処が困難なのが 2 つ目のシナリオです。妥当なソリューションは、タスク実行ノードで特定の時点に実行するタスク数を制限することです。この調整を maxNumberOfTasks と呼びます。

また、実行されているタスク数を型に基づいて制限する別の調整があれば、ソリューションがさらに便利になります。この理由を説明するために、タスク実行ノードに T1 および T2 という 2 つのタスク型があるシナリオを考えます。T1 は CPU を集中的に使用し、T2 はディスク I/O を集中的に使用します。このシナリオでは、T1 タスクの実行要求を送信するクライアントのスループットは、同じ型の制約に拘束されるアクティブなタスクによって影響を受ける可能性が高いので、T1 タスクの数が多ければ、影響も大きくなります。T2 タスクは別の制約に拘束されるので、T1 タスクに同じ影響は及ぼしません。型によってタスクの実行を制限できれば、特定の時間に実行可能な T1 タスクの数を制御できるので、CPU リソースを最大限に活用することが可能になります。これにより、スループット全体も最大化されます。この調整を maxNumberOfTasksByType と呼びます。

キューと調整

調整のセマンティクスと、タスク実行ノードの正常な操作を維持するのに調整が便利な理由を理解したら、今度は、調整によって指定される制限が限界に達したときの対処方法について考えます。つまり、実際の調整プロセスを考えます。

1 つ目の選択肢は、単に例外をスローすることですが、これは、ソリューションのスループット全体に影響を与えます。というのも、特定のエラーや失敗を確認し、タスク コーディネーターがそれらを適切に処理できるまで要求を再送信するというオーバーヘッドが、クライアントで生じるためです。2 つ目の選択肢は、クライアントからの要求を一時的に保持するためにサーバー側のキューを使用し、定期的にキューから要求を読み取ってタスク実行ノードに送信するモニターのようなプロセス (送信側のプロセス) を使用することです。送信側は、以下の規則を考慮しながら要求キューから読み取りを行うため、ここでは実際の調整を実行するために送信側のプロセスを使用します。

  1. キューから取り出せる要求の数を maxSTasksPerRequest に制限します。
  2. maxNumberOfTasks が限界に達したら、要求をキューから取り出すのを停止します。要求キューはそのままの状態に維持されます。
  3. maxNumberOfTasksByType が限界に達したら、要求をキューから取り出します。その後、要求キューに要求を戻します。要求を再度キューに登録すれば、別の型のタスクの処理を継続できます。この方法によって、キュー内のすべてのタスクに、均等に実行のチャンスを与えます。ただし、優先キューの使用を検討すべき場合もあります。詳細については、bit.ly/NF0xQq を参照してください。

図 11 に、このプロセスを示します。

The Submission Process
図 11 送信プロセス

このプロセスの実装を説明するために、まず、クライアントから要求を受け取るたびに要求キューに要求を登録する SubmitRequest サービス操作のコード (図 12 参照) を示します。

図 12 SubmitRequest サービス操作

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
  {
    public void SubmitRequest(List<STask> stasks)
    {           
      CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
    }   
    ...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var stask in stasks)               
      _requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
  }
...
}

今度は、図 13 の送信側プロセスの実装を見てみましょう。

図 13 送信側の実装

public static class CoordinatorContext
{
  ...
static CoordinatorContext()
  {
    Submitter(...);
  }
  private static async void Submitter(int interval)
  {
    while (true)
    {
      await Task.Delay(interval);
      SendTaskRequestToTaskExecutionNode(
      GetTasksFromRequestQ());
    }
  }
  ...
}

図 12図 13 では、サービス操作が要求キューへの要求の登録 (書き込み) を行い、送信側のタスクが要求キューから要求の取り出し (読み取り) を行っていることがわかります。このようなシナリオでは、基盤となるデータ構造 (キュー) がスレッド セーフになるようにする必要があります。さいわい、まさにこのためのクラスとして ConcurrentQueue<T> があります。そのため、要求の基盤となるリポジトリとして、この型の単一のインスタンスを使用します。

public static class CoordinatorContext
{
  ...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
  new ConcurrentQueue<STaskInfo>();
  ...
}

今度は、GetTasksFromRequestQ メソッドの実装について見てみましょう。このメソッドは、実行間隔が経過したときにタスクの読み取りを行います。このメソッドで、調整プロセスが発生し、また、先ほど説明した調整が適用されます。このプロセスの実装を図 14 に示します。

図 14 GetTasksFromRequestQ の実装

public static class CoordinatorContext
{
  ...internal static List<STaskInfo> GetTasksFromRequestQ()
{
  var ret = new List<STaskInfo>();
  var maxSTasksPerRequest = //From a configuration
  var maxNumberOfTasks = //From a configuration
  var count =  // Count of submitted or executing tasks
  var countByType = // Enumerable of count by type
  for (int i = 0; i < maxSTasksPerRequest; i++)
  {
    STaskInfo info;
    if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
      return ret;
    var countTT = // Count of submitted or executing tasks of
                  // the type of the current item
    if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
    { _requestQ.Enqueue(info); }
    else ret.Add(info);
  }
  return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
  // Logic to read from a configuration repository the value by task type name
}
...
}

図 14 の実装の目的は、プロセスが調整の状態を評価できる数値を取得することです。図 15 では、_submissionTracker に対して実行できる妥当な LINQ クエリと、それらの値を取得する戻り値の項目 (ret) を含むリストが示されています。ただし、このアプローチは、パフォーマンスを犠牲にしないと適切に機能しないこともあります。この場合は、送信トラッカー インスタンスから項目が追加、削除されるたびに増加または減少する一連のスレッド セーフのカウンターを実装して、同時実行のディクショナリを直接クエリする代わりにそれらのカウンターを使用します。

図 15 調整の値

var countByType = (from t in _submissionTracker.Values
                   group t by t.ClientRequest.STaskTypeName into g
                   select new
                   {
                      TypeName =  g.Key,
                      Count = g.Count()
                   });
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
               where tt.TypeName == info.ClientRequest.STaskTypeName
               select tt.Count).SingleOrDefault()+ 
                   ret.Where((rt) => rt.ClientRequest.STaskTypeName == 
                   info.ClientRequest.STaskTypeName)
                   .Count();

タスク実行ノードに要求を送信して結果を処理する

ここまでは、タスク コーディネーターが要求を管理する方法について説明してきました。ここからは、タスク コーディネーターがタスク実行ノードに要求を送信する方法について、調整プロセスを考慮しながら見てみましょう。より適切なコンテキストを提供するために、まず、タスク実行ノードが (ロード バランサーを使用して) 公開するサービス操作を以下に示します。

[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskExecutionNode
  {
    [OperationContract]
    void Start(List<STask> stask);
    [OperationContract]
    void Cancel(string Id);
  }

名前が示すとおり、これらの操作の目的は、タスク実行要求のリストを開始することと、特定のタスクのキャンセルを要求することです。サービス コントラクトは、コントラクトの実装を通じてタスク コーディネーターを更新するために、同じコールバック コントラクトを活用します。

図 16 に、SendTaskToTaskExecutionNode メソッドの更新した実装を示します。この実装では、タスク コーディネーターが、STaskInfo インスタンスを _submissionTracker に格納し、タスク実行ノードで Start サービス操作を呼び出します。

図 16 SendTaskToTaskExecutionNode とサポート メソッド

internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
  {
  if (staskInfos.Count() == 0)
    return;
  var channel = new DuplexChannelFactory<ITaskExecutionNode>(
                new InstanceContext(new CallbackHandler()),
                new NetHttpBinding(), new EndpointAddress(“http://.../”))
                .CreateChannel();
  try
  {
    var requestId = Guid.NewGuid().ToString();
    var reqs = staskInfos.Select(s => AddRequestToTracker(requestId,s, channel))
      .Where(s => s != null);
    ((IChannel)channel).Open();
    channel.Start(reqs.ToList<STask>());
  }
  catch (CommunicationException ex)
  {
    foreach (var stask in staskInfos)
      HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
  }
  }
private static STask AddRequestToTracker(string requestId,
  STaskInfo info, ITaskExecutionNode channel)
{
  info.ExecutionRequestId = requestId;
  info.ExecutionRequestChannel = channel;
  if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
    return info.ClientRequest;
  HandleClientUpdate(info.ClientRequest.Id, STaskStatus.Faulted,
    new Exception(“Failed to add “));
  return null;
}

SendTaskToTaskExecutionNode メソッドは、以下のように、タスク実行ノードのタスク実行結果を処理するために、コールバック インスタンスを作成することに注目してください。

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
  public class CallbackHandler : ITaskUpdateCallback
  {
    public void UpdateStatus(string id, STaskStatus status, string result)
    {
      CoordinatorContext.HandleClientUpdate (id, status, result);
    }
  }

CallbackHandler は、HandleClientUpdate メソッドを呼び出すことで、コールバック操作を処理します。このメソッドは、submitterTracker から、対応する STaskInfo インスタンスを取得および削除して、結果を更新するためにクライアントにコールバックを実行します。さらに、これがグループにおける最後の要求の場合、タスク コーディネーターとタスク実行ノード間のチャネルを閉じます。図 17 に、HandleClientUpdate メソッドの実装を示します。

図 17 HandleClientUpdate とサポート メソッド

internal async static void HandleClientUpdate(
  string staskId, STaskStatus status, object result)
  {
    STaskInfo info;
    if (!_submissionTracker.TryGetValue(staskId, out info))
      throw new Exception(“Could not get task from the tracker”);
try
  {
    await Task.Run(() =>
      info.CallbackChannel.UpdateStatus(info.ClientRequest.Id, status, result));
    RemoveComplete(info.ClientRequest.Id);           
  }
catch(AggregateException ex)
  {
  // ...
  }
  }
private static void RemoveComplete(string staskId)
  {
    STaskInfo info;
    if (!_submissionTracker.TryRemove(staskId, out info))
      throw new Exception(“Failed to be removed from the tracking collection”);
    if (_submissionTracker.Values.Where((t) => t.ExecutionRequestId ==
      info.ExecutionRequestId).Count() == 0)
      CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
  }
  private static void CloseTaskRequestChannel(IChannel channel)
  {
    if (channel != null && channel.State != CommunicationState.Faulted)
      channel.Close();
  }

タスクのインプリメンター

クライアント コードでは、要求を追加する際に必要となるパラメーターの 1 つに typeName があります。この値は、最終的にはタスク実行ノードに送られます。typeName の値は、(並列タスクとして実行されることが想定されている機能をカプセル化し、すべてのタスク実行ノードに存在している) 関数デリゲートを公開するインターフェイスの実装の型名です。このインターフェイスを IRunnableTask と呼びます。このインターフェイスのインプリメンターは、パラメーターを、キャンセル トークン、およびパラメーターの配列としてクライアントから受け取ることを想定します。また、デリゲートもタスクの結果を返します。このインターフェイスを次に示します。

public interface IRunnableTask
{
  Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}

タスク実行ノードでタスクを開始する

大まかに言うと、タスク実行ノードは、TPL が実行できる実際のタスクにタスク実行要求を "変換" する役割、つまり、TPL タスクを開始する役割があります。これから説明するこのプロセスの実装を図 18 に示します。

図 18 タスクの開始

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{              
  public void Start(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var t in stasks)
      TaskExecutionContext.Start(t,callback);           
  }
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
  try
  {
    // Step 1.a
    var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
    // Step 1.b
    var rtask = from t in rtasks
                where t.Value.GetType().FullName == stask.STaskTypeName
                select t.Value;
    // Step 2
    var cs = new CancellationTokenSource();
    var ct = cs.Token;
    TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
    // Step 3 
    Task<Object>
      .Run(rtask.First().Run(ct, stask.STaskParameters), ct)
      .ContinueWith(tes => UpdateStatus(tes, stask, callback));
  }
  catch (Exception ex)
  {
    ...
  }
}
...
}

手順 1 (a と b): タスク実行ノードは、この段階で、IRunnableTask のインスタンスを作成する必要があります。IRunnableTask は、クライアントによって要求される型のタスクとして実行されるデリゲートを返します。このために、属性を使用しない構成アプローチを可能にする、.NET Framework 4.5 の MEF と新機能を活用します。図 19 のコードは、"extensions" というディレクトリにある IRunnableTask の実装をすべてエクスポートする単一のコンテナー インスタンスを作成します。MEF と、属性を使用しない構成アプローチの詳細については、2012 年 6 月の MSDN マガジン記事、「MEF の構成に属性を使用しない方法」(msdn.microsoft.com/magazine/jj133818) を参照してください。

図 19 コンテナーの作成

internal static  class CompositionUtil
{
  private readonly static Lazy<CompositionContainer> _container =
    new Lazy<CompositionContainer>(() =>
    {
      var builder = new RegistrationBuilder();
      builder.ForTypesDerivedFrom<IRunnableTask>()                         
             .Export<IRunnableTask>()
             .SetCreationPolicy(CreationPolicy.NonShared);
      var cat = new DirectoryCatalog(“extensions”, builder);
      return new CompositionContainer(cat, true, null);
    }
  ,true);
  internal static CompositionContainer ContainerInstance
  {
    get { return _container.Value; }
  } 
}

図 18 のコードに戻ります。このコードは、IRunnableTask 型のエクスポートを取得するためにコンテナーを使用して、クライアント要求に一致する型名のあるインスタンスを選択します。クライアントによって要求される型に対応するタスク インスタンスが 1 つしかないという重要な推測を行っていることに注意してください。LINQ クエリが返す 1 つ目のインスタンスを使用しているのはこれが理由です。

手順 2: コードは、TPL タスクを実際に作成する前に、キャンセル トークン ソースとキャンセル トークンを作成します。キャンセル ソースは、ConcurrentDictionary<TKey,TValue> の単一のインスタンスで追跡します。タスク実行ノードは、クライアントがキャンセルを要求するときに、このキャンセル ソースのリストを使用します。このインスタンスの定義を次に示します。

public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string, 
  CancellationTokenSource> _cancellationSources =
  new ConcurrentDictionary<string, CancellationTokenSource>();
...
}

手順 3: この段階では、作成したばかりのキャンセル トークンと共にタスクを実行します。タスクの後には、継続タスクが続きます。継続タスクが必要になる理由は、(成功したか失敗したかにかかわらず) TPL タスクが完了したら、サービス呼び出しを行うことで、実行結果によってタスク コーディネーターを更新する必要があるためです。図 20 のように、パラメーターを TPL タスク、タスク実行要求、タスク コーディネーターへのコールバック インスタンスとして受け取るデリゲートで、タスク コーディネーターを更新するプロセスをカプセル化します。

図 20 更新プロセスのカプセル化

private static Action<Task<Object>, STask, 
  ITaskUpdateCallback> UpdateStatus = (t, st, cb) =>
  {
    try
    {
      STaskStatus s;
      Object r = null;
      switch (t.Status)
      {
        case TaskStatus.Canceled: s = STaskStatus.Canceled;
          break;
        case TaskStatus.Faulted:
          s = STaskStatus.Faulted;
          r = t.Exception.Flatten();
          break;
        case TaskStatus.RanToCompletion:
          s = STaskStatus.Completed;
          r = t.Result;
          break;
        default:
          s = STaskStatus.Faulted;
          r = new Exception("Invalid Status");
          break;
      }
      CancellationTokenSource cs;
      TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
      cb.UpdateStatus(st.Id, s, r);
    }
    catch (Exception ex)
  {
  // Error handling
  }
};

キャンセルの要求と処理

TPL は、タスクのキャンセルを実装するメカニズムを提供します。このために、TPL タスクとして実行されている実際のプロセスをカプセル化するデリゲートが、キャンセル要求に応答して、実行を終了する必要があります。タスクのキャンセルの詳細については、MSDN ライブラリの記事「タスクのキャンセル」(bit.ly/NYVTO0) を参照してください。

IRunnableTask インターフェイスのパラメーターの 1 つは、キャンセル トークンです。タスク実行ノードは、タスクごとにトークンを作成します。キャンセル要求を確認してプロセスを適切に終了するタイミングを判断するのは、このインターフェイスのインプリメンターです。図 21 のコードは、キャンセルが要求されているかどうかを確認しながら、範囲内の偶数の数を計算する簡単なタスクを示しています。

図 21 キャンセルの確認

public class MySimpleCTask : IRunnableTask
{
  public Func<Object> Run(Nullable<CancellationToken> ct, 
    params string[] taskArgs)
  {
    var j = int.Parse(taskArgs[0]);
    var z = 0;
    return (() =>
  {
      for (int i = 0; i < j; i++)
      {
        if (i % 2 != 0)
        {
          z++;
          ct.Value.ThrowIfCancellationRequested();
        }
      }
      return z;
   });
  }
}

クライアントについて説明したときに確認したように、キャンセル トークンを含む要求を追加でき、クライアントは内部で必要なサブスクリプションを実行します。そのため、キャンセルが発生すると、キャンセル要求はタスク コーディネーターに送信されます。タスク コーディネーターは、キャンセル要求を受け取ったら、タスク実行ノードに要求が送信されたかどうかを確認し、キャンセル要求を送信します。その後、タスク実行ノードは、クライアント ID によって要求されたタスクに対応するキャンセル ソースを探します。タスク実行ノードへのキャンセル要求の送信は、実に簡単です。タスク コーディネーターがタスク実行要求を最初に送信した要求に対応するチャネルを見つけるだけです。これらのチャネルは、実行要求の状態を更新するコールバック用に開かれたままにする必要があります。

図 22 に、タスク コーディネーターにおけるサービス操作の実装を示します。

図 22 タスク コーディネーターでのサービス操作の実装

public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
  {
    return CoordinatorContext.CancelTask(Id);
  }
  ...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
  Id, out info) && info.ExecutionRequestChannel != null)
{
  info.ExecutionRequestChannel.Cancel(Id);
  return true;
}
return false;
}
  ...
}

最後として、図 23 に、タスク実行ノードにおけるサービス操作の実装を示します。

図 23 タスク実行ノードでのサービス操作の実装

class CancellationHandler : ICancellationHandler
  {
  public void Cancel(STask stask)
  {
    TaskExecutionContext.CanceTask(stask.Id);
  }
  }
public static class TaskExecutionContext
{
...
  internal static void CancelTask(string Id)
  {
    CancellationTokenSource tknSrc;
    if (_cancellationSources.TryGetValue(Id, out tknSrc))
      tknSrc.Cancel(); }
...
}

タスク コーディネーターのスケーラビリティとその他の考慮事項

この実装は、タスク コーディネーターが単一のノードで実行されると仮定していますが、タスク コーディネーターをスケール アウトすることも可能です (このためには、少なくとも、以下の変更が必要になります)。

  • タスク コーディネーターにアクセスするためのロード バランサーを導入する必要があります。
  • 前に説明したように、調整アプローチの鍵は、実行中のタスクの総数と型ごとの数の正確なカウンターです。タスク コーディネーターとして実行されているノードが 1 つ以上あるシナリオでは、これらのカウンターは、(競合状態、デッドロックなどを避けながら) 同期方式で更新または読み取りを可能にしながら、(データベースなどで) 集中管理される必要があります。

最後に、どの開発アプローチでも同じように、リスクおよび価値の点から、自分のニーズに合った市販の代替ソリューションとの比較検討を行う必要があります。たとえば、今回説明したアプローチに基づいて対処しない場合、大半は Microsoft HPC サーバーなどのテクノロジを妥当なソリューションとして検討する必要があります。

リソースを最適化する

TPL は、単一のマルチコア コンピューターの CPU リソースを最適に利用するのに必要なインフラストラクチャを提供します。また、コンピューターの境界を越えてスケール変換するアプローチを実装する際に便利です。これは、単一のマルチコア サーバーだけでなく複数のサーバーで並列処理を必要とする、ワークロードの自動化やバッチ処理のシナリオに便利です。

この水平方向のスケーラビリティを実現するために、アーキテクチャに関していくつか考慮すべきことがあります。中でも重要なのが、既存のファームにリソースを追加することが可能でありながらも既存のリソース間に負荷を分散する必要性と、実行する必要があるタスクのセマンティクスに従ってリソースを調整する能力です。マイクロソフトの開発ツールとテクノロジは、こうした重要な考慮事項を踏まえているアーキテクチャの実装に必要なビルディング ブロックを提供します。

Jesus Aguilar は、マイクロソフトで、シニア アプリケーション開発マネージャーとして開発者向けのプレミア サポートに携わっています。

この記事のレビューに協力してくれた技術スタッフの Ryan Berry、Steve Case、Rick Claude、および Piyush Joshi に心より感謝いたします。