TPL

Horizontale Skalierbarkeit für die parallele Ausführung von Aufgaben

Jesus Aguilar

Die Task Parallel Library (TPL) wurde mit Microsoft .NET Framework 4 eingeführt. Durch die TPL können die Anwendungsentwickler Lösungen erstellen, die die Möglichkeiten der parallelen Verarbeitung auf einem Mehrkerncomputer nutzen. In vielen Szenarios ist die vertikale Skalierung, d. h. das Hinzufügen weiterer Kerne, jedoch durch eine Reihe von Faktoren eingeschränkt, beispielsweise Kosten- und Hostingbeschränkungen. Wenn in solchen Fällen Skalierbarkeit erforderlich ist, empfiehlt sich die Verteilung der Verarbeitung auf ein Array von Servern. Ein Beispiel hierfür ist das Cloud-Hosting. In diesem Artikel beschreibe ich die wichtigsten Aspekte einer konzeptionellen Lösung hierfür, einschließlich der Implementierung. Dabei kommen viele der neuen Features von .NET Framework 4.5 zum Einsatz.

Grundlegende Annahmen

Für den beschriebenen Ansatz sind die TPL und verschiedene weitere Technologien erforderlich, unter anderem:

  • Task Parallel Library (TPL)
  • Windows Communication Foundation (WCF)
  • Managed Extensibility Framework (MEF)

Ich gehe auf diese Technologien nur im Zusammenhang mit dem Problem ein, das ich lösen möchte. Ich setze voraus, dass Sie mit ihnen vertraut sind.

Remotetaskclient, Taskkoordinator und Taskausführungsknoten

Der Remotetaskclient ist eine clientseitige Ebene. Hinter dieser Ebene verbirgt sich die komplexe Semantik, die bei der Verwendung einer verteilten Umgebung entsteht. Der Remotetaskclient interagiert direkt mit dem Taskkoordinator, welcher dann zum Einstiegspunkt für die zugrunde liegende Infrastruktur wird. Die Merkmale des Taskkoordinators im Überblick:

  1. Er ist der einzige Kontaktpunkt mit den Clients.
  2. Er macht die Dienste verfügbar, die zum Anfordern der Ausführung von Tasks auf der skalierbaren Plattform sowie zum Abbrechen eines bestimmten Tasks notwendig sind.
  3. Er verarbeitet die Drosselung und die Einreihung von Taskausführungsanforderungen in Warteschlangen und unterstützt so die reibungslose Ausführung der Umgebung.

Die Taskausführungsknoten sind die Hosts der Prozesse, in denen die Tasks ausgeführt werden. Die tatsächlichen Implementierungen der Tasks, die von der TPL ausgeführt werden, befinden sich in den Taskausführungsknoten.

Die wichtigsten Aspekte dieser logischen Ebenen und der Informationsfluss:

  1. Der Remotetaskclient fordert die Ausführung von einem oder mehreren Tasks an.
  2. Der Taskkoordinator übermittelt die Anforderung an die Taskausführungsknoten.
  3. Die Taskausführungsknoten führen die Tasks aus und aktualisieren den Status der jeweiligen Anforderung im Taskkoordinator.
  4. Der Taskkoordinator aktualisiert den Client mit dem jeweiligen Ergebnis der Ausführung der Anforderung.
  5. Die Taskausführungsknoten befinden sich hinter einem Lastenausgleich. Dadurch können nach Bedarf weitere Knoten hinzugefügt werden, um eine horizontale Skalierung zu ermöglichen.

In Abbildung 1 sind die logischen Ebenen und der Informationsfluss dargestellt.

Scaling Tasks HorizontallyAbbildung 1: Horizontale Skalierung von Tasks

Die Taskausführungsknoten aktualisieren den Taskkoordinator, welcher wiederum den Remotetaskclient aktualisiert. Ich beschreibe eine Implementierung, die auf bidirektionaler Kommunikation zwischen dem Client und dem Taskkoordinator sowie zwischen dem Taskkoordinator und den Taskausführungsknoten basiert. In Bezug auf WCF impliziert dies die Verwendung eines Duplexkanals, damit die Taskausführungsknoten einen Rückruf an den Taskkoordinator ausführen können und anschließend der Taskkoordinator auf dieselbe Weise den Client aktualisieren kann. Ich erläutere dazu, wie Sie diesen bidirektionalen Kommunikationsansatz mit WebSockets erreichen. Die WebSockets-Übertragung ist als neue Bindung in .NET Framework 4.5 implementiert und für Windows 8 verfügbar. Weitere Informationen über die Bindung finden Sie unter bit.ly/SOLNiU.

Client und Taskkoordinator

Sie kennen jetzt die drei wesentlichen logischen Ebenen: Remotetaskclient, Taskkoordinator und Taskausführungsknoten. Lassen Sie uns nun mit der Implementierung vom Remotetaskclient beginnen. Wenn ich in diesem Artikel den Begriff „Client“ verwende, beziehe ich mich auf den Remotetaskclient.

Wie bereits erwähnt, bietet der Client den Vorteil, die Komplexität der zugrunde liegenden Komponenten verbergen zu können. Um dieses Ziel zu erreichen, können Sie eine API bereitstellen, die den Eindruck einer lokalen Taskausführung vermittelt. Tatsächlich können sie jedoch auch andernorts ausgeführt werden. Der Code in Abbildung 2 zeigt die öffentlichen Methoden der RemoteTaskClient-Klasse.

Abbildung 2: Öffentliche Methoden der RemoteTaskClient-Klasse

public class RemoteTaskClient<TResult> : IDisposable
{
  public void AddRequest(string typeName, 
    string[] parameters, CancellationToken tk)
  {...}
  public void AddRequest(string typeName, string[] parameters)
  {...}
  public Task<TResult>[] SubmitRequests()
  {...}
  public RemoteTaskClient(string taskCoodinatorEndpointAddress)
  {...}
  public void Dispose()
  {...}
}

Mit der AddRequest-Methode können Sie Anforderungen zur Remoteausführung hinzufügen. Für jede Anforderung müssen Sie typeName angeben. Dies ist der Typ der tatsächlichen Implementierung, die den Delegaten enthält, der von der Infrastruktur remote als TPL-Task ausgeführt wird. Ebenfalls erforderlich sind die zugehörigen Parameter. Anschließend übermitteln Sie die Anforderungen mithilfe der SubmitRequest-Methode. Das Ergebnis der übermittelten Anforderung ist ein Array von TPL-Tasks, einer für jede Anforderung. Durch diesen Ansatz können Sie die entstandenen TPL-Tasks so verwalten, als ob sie lokal wären. Sie können beispielsweise verschiedene Anforderungen übermitteln und darauf warten, dass sie abgeschlossen werden, wie in folgendem Beispielcode:

using (var c = new RemoteTaskClient<int>("..."))
  {
    c.AddRequest("...", null);
    c.AddRequest("...", null);
    var ts = c.SubmitRequests();
    Task.WaitAll(ts);
    foreach (var t in ts)
      Console.WriteLine(t.Result);
  }

Bevor wir zu den Details der RemoteTaskClient-Implementierung kommen, betrachten wir die Dienstvorgänge und Datenverträge, die der Taskkoordinator verfügbar macht. Da diese Dienste der Clientimplementierung zugrunde liegen, erhalten Sie durch das Verständnis der Verträge zusätzliche Kontextinformationen, bevor wir zur RemoteTaskClient-Implementierung kommen.

Der Code in Abbildung 3 zeigt die Dienstvorgänge, die der Taskkoordinator für den Client verfügbar macht. Durch den SubmitRequest-Vorgang kann der Client die Ausführung von einem oder mehreren TPL-Tasks anfordern. Durch den CancelTask-Vorgang kann der Client auch den Abbruch eines bestimmten, nicht abgeschlossenen TPL-Tasks anfordern. Bei dem UpdateStatus-Vorgang handelt es sich um einen Rückruf. Durch eine clientseitige Implementierung dieses Rückrufvertrags aktualisiert der Taskkoordinator den Status im Client.

Abbildung 3: Dienstvorgänge

[ServiceContract(CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskCoordinator
  {
    [OperationContract(IsOneWay = true)]
    void SubmitRequest(List<STask> stask);
    [OperationContract]
    bool CancelTask(string Id);       
  }
  public interface ITaskUpdateCallback
  {
    [OperationContract (IsOneWay = true)]
    void UpdateStatus(string id, STaskStatus status, string result);
  }

Sehen wir uns den Datenvertrag an, der die Taskausführungsanforderung darstellt. Dies ist die Datenentität, die der Client an den Taskkoordinator sendet, der wiederum die Anforderung an den Taskausführungsknoten übermittelt, auf dem die eigentliche Ausführung stattfindet. Die in Abbildung 4 gezeigte STask-Klasse erzeugt eine Taskausführungsanforderung. Mithilfe der STaskTypeName- und STaskParameters-Eigenschaften kann der Client den auszuführenden Tasktyp sowie die erforderlichen Parameter festlegen. Der Taskkoordinator verwendet die Eigenschafts-ID als eindeutigen Bezeichner, durch den die logischen Ebenen die Anforderung auf den tatsächlich im System ausgeführten TPL-Task beziehen können.

Abbildung 4: Die STask-Klasse

[DataContract]
  public class STask
  {
    [DataMember]
    public string Id
    { get; set; }
    [DataMember]
    public string STaskTypeName
    { get; set; }
    [DataMember]
    public string[] STaskParameters
    { get; set; }
  }

Zurück zu RemoteTaskClient und dazu, wie ich den lokalen TPL-Task auf das Ergebnis der Ausführung auf den Taskausführungsknoten beziehen möchte. Die TPL hat die nützliche TaskCompletionSource<TResult>-Klasse, mit der ich einen TPL-Task erstellen und dessen Lebenszyklus steuern kann. Durch diesen Mechanismus kann ich signalisieren, wenn ein bestimmter Task abgeschlossen, abgebrochen oder fehlerhaft ist. Dies impliziert, dass jede Anforderung, die (über den Taskkoordinator) an einen Taskausführungsknoten gesendet wird, auf eine TaskCompletionSource-Instanz bezogen sein muss. Zu diesem Zweck habe ich die ClientRequestInfo-Klasse implementiert, wie in Abbildung 5 gezeigt.

Abbildung 5: Die ClientRequestInfo-Klasse

internal class ClientRequestInfo<TResult>
{
  internal STask TaskExecutionRequest
  { get; set; }
  internal TaskCompletionSource<TResult> CompletionSource
  { get; set; }
  internal ClientRequestInfo(string typeName, string[] args)
  {
    TaskExecutionRequest = new STask()
      {Id = Guid.NewGuid().ToString(), STaskTypeName =typeName,
        STaskParameters = args };
    CompletionSource = new TaskCompletionSource<TResult>();
  }
}

In Abbildung 6 wird die Implementierung des Konstruktors dieser Klasse dargestellt.

Abbildung 6: Der ClientRequestInfo-Konstruktor

ITaskCoordinator _client;           
ConcurrentDictionary<string, ClientRequestInfo<TResult>> 
  _requests = new ConcurrentDictionary<string, 
  ClientRequestInfo<TResult>>();
  public RemoteTaskClient(string taskCoordinatorEndpointAddress)
  {           
    var factory = new DuplexChannelFactory<ITaskCoordinator>
       (new InstanceContext(new CallbackHandler<TResult>(_requests)),
       new NetHttpBinding(),
       new EndpointAddress(taskCoordinatorEndpointAddress));
    _client = factory.CreateChannel();
    ((IClientChannel)_client).Open();
  }

Ich öffne einen Duplexkanal zum Taskkoordinator und erstelle eine Rückrufinstanz vom Typ CallbackHandler. Die CallbackHandler-Instanz erhält einen _requests-Parameter, der ClientRequestInfo-Instanzen enthält. Der Grund ist, dass das _requests-Wörterbuch alle aktiven Instanzen der Clientanforderungen (und die TaskCompletionSource-Instanzen, die mit diesen verknüpft sind) enthält, und CallbackHandler die Aktualisierungen vom Taskkoordinator verarbeitet. Da mehrere Dienstanforderungen das _requests-Wörterbuch aktualisieren, muss ich Threadsicherheit gewährleisten: Deshalb muss ich dies als eine Instanz von ConcurrentDictionary erstellen.

Abbildung 7 zeigt die Implementierung der CallbackHandler-Klasse.

Abbildung 7: Die CallbackHandler-Klasse

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CallbackHandler<TResult> : ITaskUpdateCallback
{
  ConcurrentDictionary<string, ClientRequestInfo<TResult>> _requests;
  public void UpdateStatus(string id, STaskStatus status, Object result)
    {
      ClientRequestInfo<TResult> info;
      if (_requests.TryRemove(id, out info))
      {                           
        switch (status)
        {
          case STaskStatus.
            Completed: info.CompletionSource.SetResult(
              (TResult)result);
            break;
          case STaskStatus.Canceled: 
            info.CompletionSource.SetCanceled();
            break;
          case STaskStatus.Faulted: 
            info.CompletionSource.SetException(
              (Exception)result);
            break;
        }
      }
}
  internal CallbackHandler(ConcurrentDictionary<string,
    ClientRequestInfo<TResult>> requests)
  {
    requests = requests;
  }
}

Betrachten wir nun die Implementierung der AddRequest- und SubmitRequest-Methoden, wie in Abbildung 8 gezeigt.

Abbildung 8: Die AddRequest- und SubmitRequest-Methoden

public void AddRequest(string typeName, string[] parameters, 
  CancellationToken tk)
{
  var info = new ClientRequestInfo<TResult>(typeName, args);
  _buffer.Add(info);
  tk.Register(()=> _client.CancelTask(info.TaskExecutionRequest.Id));
}
public void AddRequest(string typeName, string[] parameters)
  {
    _buffer.Add(new ClientRequestInfo<TResult>(typeName, parameters));
  }
public Task<TResult>[] SubmitRequests()
  {
    if (_buffer.Count == 0)
      return null;
    var req = _buffer.Select((r) =>
    {
      _requests.TryAdd(r.TaskExecutionRequest.Id, r);
      return r.TaskExecutionRequest;                
    });
    _client.SubmitRequest(req.ToList<STask>());
    var ret =  _buffer.Select(r =>
      r.CompletionSource.Task).ToArray<Task<TResult>>();
    _buffer.Clear();
    return ret;
  }

Nachverfolgen von Clientanforderungen

Im letzten Abschnitt haben Sie gesehen, dass der Client ausschließlich mit dem Taskkoordinator interagiert. Dieser ist dafür zuständig, die Clientanforderungen zu verarbeiten und den Client anschließend mit den Ergebnissen der Ausführung des TPL-Tasks zu aktualisieren. Wie beim Client muss dazu die ursprüngliche Anforderung in irgendeiner Form beibehalten werden. Außerdem muss Verschiedenes nachverfolgt werden: die entsprechende Rückrufinstanz, die die Kommunikation mit dem Client ermöglicht; der Kanal zu den Taskausführungsknoten, die mit der Verbindung verknüpft sind (wie später beschrieben, erforderlich für Abbruchszenarios); ein eindeutiger Bezeichner, der alle Taskausführungsanforderungen gruppiert, die mit einem einzelnen Aufruf eines Taskausführungsknotens verknüpft sind (um zu bestimmen, wann der Kanal nicht mehr benötigt wird) sowie der Status und das Ergebnis der Ausführung. In Abbildung 9 wird die Definition der STaskInfo-Klasse gezeigt, der Entität, die diese Informationen enthält. Ich verwende außerdem eine einzelne Instanz von ConcurrentDictionary<TKey,TValue> als Mechanismus für die Dauerhaftigkeit.

Abbildung 9: Die STaskInfo- und CoordinatorContext-Klassen

public class STaskInfo
  {
    public string ExecutionRequestId
    { get; set; }
    public STask ClientRequest
    { get; set; }
    public ITaskUpdateCallback CallbackChannel
    { get; private set; }
    public ITaskExecutionNode ExecutionRequestChannel
    { get; set; }
    public STaskInfo(ITaskUpdateCallback callback)
    {
      CallbackChannel = callback;
    }
  }
public static class CoordinatorContext
{
...
private static readonly ConcurrentDictionary<string, STaskInfo> 
  _submissionTracker =
  new ConcurrentDictionary<string, STaskInfo>();
...
}

Und schließlich ist _submissionTracker in der CoordinatorContext-Klasse enthalten. Ich verwende diese Klasse, um die Hauptfunktionalität des Taskkoordinators zu implementieren.

Verarbeiten von Clientanforderungen

Der Taskkoordinator ist der einzige Einstiegspunkt für die Clients. Daher muss er so viele Clientanforderungen wie möglich verarbeiten können und gleichzeitig dafür Sorge tragen, dass die Taskausführungsknoten im Hinblick auf die Ressourcen nicht ausgelastet werden. Das ist nicht so einfach, wie es scheint. Eine vereinfachte Lösung verdeutlicht die möglichen Schwierigkeiten:

  1. Der Taskkoordinator macht den Dienstvorgang verfügbar, über den die Clients Taskausführungsanforderungen übermitteln.
  2. Der Taskkoordinator übermittelt diese Anforderungen an die Taskausführungsknoten, damit sie ausgeführt werden, und verfolgt die Anforderungen, d. h. er behält ihren Status bei.

Abbildung 10 zeigt eine Basisimplementierung dieses Übermittlungsprozesses.

Abbildung 10: Implementierung des Übermittlungsprozesses

public class TaskCoordinatorService : ITaskCoordinator
{
...
  public void SubmitRequest(List<STask> stasks)
  {    
    CoordinatorContext.SendTasksToTaskHandler(stasks);
  }
...
}
public static class CoordinatorContext
{
...
  internal static void SendTaskRequestToTaskExecutionNode(List<STask> stasks)
  {
  var clientFactory = //Client factory creation logic.. 
  var channel = clientFactory.CreateChannel();
  foreach (var stask in stasks)
    _submissionTracker.TryAdd(stask.Id, stask);
  try
  {
    ((IClientChannel)channel).Open();
    channel.Start(stasks);
  }
  catch (CommunicationException ex)
  {   
    // Error handling and logging ...
  }
  finally
  {
    if (((IClientChannel)channel).State != CommunicationState.Faulted)               
       ((IClientChannel)channel).Close();
  }
}
...
}

Diese vereinfachte Implementierung würde in einigen Szenarios jedoch nicht besonders gut funktionieren:

  • Wenn der Client eine große Anzahl von Tasks in einer einzelnen Anforderung sendet, kommen alle in einem einzelnen Taskausführungsknoten an. Die Verfügbarkeit von mehr als einem Taskausführungsknoten angenommen, führt dies zu einer ungleichmäßigen Nutzung der verfügbaren Ressourcen.
  • Möglicherweise werden die verfügbaren Ressourcen in den Taskausführungsknoten vom System erschöpft, wenn in Szenarios mit Spitzenauslastung die Anzahl der ausgeführten TPL-Tasks so groß ist, dass diese Ressourcen sie nicht mehr verarbeiten können. Dieser Fall kann bei einer Bindung der als TPL-Task ausgeführten Aufgabe an eine bestimmte Ressource (beispielsweise Arbeitsspeicher) eintreten, die in Spitzenzeiten das Risiko vergrößert, zu einem System zu führen, das nicht mehr antwortet.

Drosselung

Eine Methode zum Bewältigen solcher Schwierigkeiten ist, die Taskausführungsanforderungen sozusagen zu verwalten, während sie das System passieren. In diesem Zusammenhang können Sie sich den Taskkoordinator als eine Drosselungssteuerung vorstellen. Bevor ich den Drosselungsprozess erläutere, möchte ich auf die Semantik der Drosselungen eingehen, die ich zusammen mit dem Drosselungsprozess verwende, um diese Risiken zu minimieren.

Das Problem im ersten Szenario ist zu bewältigen, indem die Anzahl der Taskausführungsanforderungen begrenzt wird, die der Taskkoordinator in einer einzelnen Anforderung an die Taskausführungsknoten übermitteln kann. Diese Drosselung nenne ich maxSTasks­PerRequest. Durch diesen Ansatz kann der Lastenausgleichsalgorithmus die Last ordnungsgemäß auf die verfügbaren Taskausführungsknoten verteilen.

Das zweite Szenario stellt eine größere Herausforderung dar. Eine einleuchtende Lösung ist, bei einer bestimmten Anzahl die Zahl der von den Taskausführungsknoten ausgeführten Tasks zu begrenzen. Diese Drosselung bezeichne ich als maxNumberOfTasks.

Zusätzlich dazu kann die Lösung von einer weiteren Drosselung profitieren, die die Anzahl der ausgeführten Tasks anhand von deren Typ begrenzt. Den Nutzen dieser Drosselung demonstriere ich mithilfe eines Szenarios, in welchem die Taskausführungsknoten zwei Typen von Tasks bereitstellt haben, T1 und T2. T1 ist CPU-gebunden und T2 ist Datenträger-E/A-gebunden. In diesem Szenario wird der Durchsatz einer Clientübermittlungsanforderung für die Ausführung von T1-Tasks stärker von aktiven Tasks beeinflusst, die an denselben Typ Einschränkung gebunden sind. Je höher also die Anzahl von T1-Tasks, desto größer die Auswirkung. Da T2-Tasks an eine andere Einschränkung gebunden sind, haben sie nicht die gleiche Auswirkung auf T1-Tasks. Wenn ich die Ausführung von Tasks nach Typ begrenzen kann, bedeutet das, dass ich die Anzahl der zu einer bestimmten Zeit ausgeführten T1-Tasks steuern kann. Dadurch kann ich die CPU-Ressourcen und als Ergebnis den Gesamtdurchsatz maximieren. Diese Drosselung bezeichne ich als maxNumberOfTasksByType.

Warteschlangen und Drosselung

Sie kennen jetzt die Semantik der Drosselungen und wissen, wie Drosselungen dazu beitragen, dass Taskausführungsknoten ordnungsgemäß ausgeführt werden. Betrachten wir nun, was passiert, wenn der von der Drosselung bestimmte Grenzwert erreicht wird, also den eigentlichen Drosselungsprozess.

Eine Möglichkeit besteht darin, einfach eine Ausnahme auszulösen. Das würde sich aber auf den Gesamtdurchsatz der Lösung auswirken, da der Client die Prüfung auf einen bestimmten Fehler veranlassen und dann die Anforderungen erneut übermitteln würde, bis der Taskkoordinator diese verarbeiten kann. Eine Alternative ist die serverseitige Verwendung von Warteschlangen, um die Anforderungen vom Client vorübergehend zu halten, und eine Art Überwachungsprozess (ein Übermittlungsprozess), der in regelmäßigen Abständen die Anforderungen der Warteschlange liest und sie an die Taskausführungsknoten übermittelt. Ich verwende den Übermittlungsprozess, um die eigentliche Drosselung auszuführen, da dieser beim Lesen in der Warteschlange die folgenden Regeln berücksichtigt:

  1. Die Anzahl der Anforderungen, die aus der Warteschlange genommen werden können, richtet sich nach maxSTasksPerRequest.
  2. Bei Erreichen der maxNumberOfTasks-Drosselung werden keine weiteren Anforderungen aus der Warteschlange genommen, und die Anforderungswarteschlange bleibt so, wie sie ist.
  3. Bei Erreichen der maxNumberOfTasksByType-Drosselung wird die Anforderung aus der Warteschlange genommen und anschließend wieder in die Anforderungswarteschlange eingereiht. Durch das erneute Einreihen der Anforderung kann die Verarbeitung von Tasks anderer Typen fortgesetzt werden. Diese Strategie bietet allen Tasks in der Warteschlange die gleiche Möglichkeit, ausgeführt zu werden. In einigen Fällen können Sie allerdings überlegen, stattdessen eine Prioritätswarteschlange zu verwenden. Nützliche Informationen dazu finden Sie unter bit.ly/NF0xQq.

In Abbildung 11 wird der Prozess dargestellt.

The Submission Process
Abbildung 11: Der Übermittlungsprozess

Um mit der Beschreibung der Implementierung dieses Prozesses zu beginnen, zeige ich zunächst den Code (siehe Abbildung 12) für den SubmitRequest-Dienstvorgang, der die Anforderungen vom Client in die Anforderungswarteschlange einreiht.

Abbildung 12: Der SubmitRequest-Dienstvorgang

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskCoordinatorService : ITaskCoordinator
  {
    public void SubmitRequest(List<STask> stasks)
    {           
      CoordinatorContext.EnqueueRequestsInRequestQ(stasks);
    }   
    ...
}
public static class CoordinatorContext
{
...
internal static void EnqueueRequestsInRequestQ(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var stask in stasks)               
      _requestQ.Enqueue(new STaskInfo(callback) { ClientRequest = stask });
  }
...
}

Betrachten wir nun die Implementierung des Übermittlungsprozesses, wie in Abbildung 13 gezeigt.

Abbildung 13: Implementierung der Übermittlung

public static class CoordinatorContext
{
  ...
static CoordinatorContext()
  {
    Submitter(...);
  }
  private static async void Submitter(int interval)
  {
    while (true)
    {
      await Task.Delay(interval);
      SendTaskRequestToTaskExecutionNode(
      GetTasksFromRequestQ());
    }
  }
  ...
}

Sie können in den Abbildungen 12 und 13 sehen, wie der Dienstvorgang eine Anforderung in die Anforderungswarteschlange einreiht (schreibt) und der Übermittlungstask eine Anforderung aus der Anforderungswarteschlange entfernt (liest). In diesem Szenario müssen Sie Threadsicherheit für die zugrunde liegende Datenstruktur – die Warteschlange – gewährleisten. Glücklicherweise gibt es die ConcurrentQueue<T>-Klasse, die genau dazu dient. Daher verwende ich eine einzelne Instanz von diesem Typ als zugrunde liegendes Repository für die Anforderungen.

public static class CoordinatorContext
{
  ...
private static readonly ConcurrentQueue<STaskInfo> _requestQ =
  new ConcurrentQueue<STaskInfo>();
  ...
}

Kommen wir zur Implementierung der GetTasksFromRequestQ-Methode, welche die Tasks liest, wenn das Ausführungsintervall abläuft. In dieser Methode wird der Drosselungsprozess ausgeführt. Hier werden die Drosselungen angewendet, die ich zuvor beschrieben habe. Abbildung 14 zeigt eine Implementierung dieses Prozesses.

Abbildung 14: GetTasksFromRequestQ-Implementierung

public static class CoordinatorContext
{
  ...internal static List<STaskInfo> GetTasksFromRequestQ()
{
  var ret = new List<STaskInfo>();
  var maxSTasksPerRequest = //From a configuration
  var maxNumberOfTasks = //From a configuration
  var count =  // Count of submitted or executing tasks
  var countByType = // Enumerable of count by type
  for (int i = 0; i < maxSTasksPerRequest; i++)
  {
    STaskInfo info;
    if (count + i == maxNumberOfTasks || !_requestQ.TryDequeue(out info))
      return ret;
    var countTT = // Count of submitted or executing tasks of
                  // the type of the current item
    if (countTT == GetMaxNumberOfTasksByType(info.ClientRequest.STaskTypeName))
    { _requestQ.Enqueue(info); }
    else ret.Add(info);
  }
  return ret;
}
}
private static int GetMaxNumberOfTasksByType(string taskTypeName)
{
  // Logic to read from a configuration repository the value by task type name
}
...
}

Ziel der Implementierung in Abbildung 14 ist, die Zahlen zu erreichen, mit denen der Prozess die Drosselungsbedingungen prüfen kann. Abbildung 15 zeigt die plausiblen LINQ-Abfragen, die für _submissionTracker ausgeführt werden können, und eine Liste mit den Rückgabeelementen (ret), um diese Werte zu erhalten. Dieser Ansatz kann zu Lasten der Leistung gehen. Falls dies der Fall ist, können Sie alternativ eine Reihe threadsicherer Indikatoren implementieren, die sich erhöhen oder vermindern, wenn Elemente zur Übermittlungsprotokollierungsinstanz hinzugefügt oder daraus entfernt werden. Diese Indikatoren können Sie verwenden, anstatt das parallele Wörterbuch direkt abzufragen.

Abbildung 15: Drosselungswerte

var countByType = (from t in _submissionTracker.Values
                   group t by t.ClientRequest.STaskTypeName into g
                   select new
                   {
                      TypeName =  g.Key,
                      Count = g.Count()
                   });
var count = countByType.Sum(c => c.Count);
var countTT = (from tt in countByType
               where tt.TypeName == info.ClientRequest.STaskTypeName
               select tt.Count).SingleOrDefault()+ 
                   ret.Where((rt) => rt.ClientRequest.STaskTypeName == 
                   info.ClientRequest.STaskTypeName)
                   .Count();

Senden von Anforderungen an die Taskausführungsknoten und Verarbeitungsergebnisse

Bisher habe ich beschrieben, wie der Taskkoordinator die Anforderungen verwaltet. Sehen wir uns unter Berücksichtigung des Drosselungsprozesses an, wie der Taskkoordinator die Anforderungen an die Taskausführungsknoten übermittelt. Um den Kontext besser darzustellen, betrachten wir zunächst die von den Taskausführungsknoten über den Lastenausgleich verfügbar gemachten Dienstvorgänge:

[ServiceContract( CallbackContract = typeof(ITaskUpdateCallback))]
  public interface ITaskExecutionNode
  {
    [OperationContract]
    void Start(List<STask> stask);
    [OperationContract]
    void Cancel(string Id);
  }

Wie die Namen schon sagen, dienen diese Vorgänge zum Starten einer Liste von Taskausführungsanforderungen und zum Anfordern des Abbruchs von einem bestimmten Task. Der Dienstvertrag nutzt denselben Rückrufvertrag, um den Taskkoordinator über eine Vertragsimplementierung zu aktualisieren.

In Abbildung 16 wird eine aktualisierte Implementierung der SendTaskToTaskExecutionNode-Methode gezeigt. Der Taskkoordinator speichert darin die STaskInfo-Instanzen in _submissionTracker und ruft die Start-Dienstvorgänge auf einem Taskausführungsknoten auf.

Abbildung 16: Die SendTaskToTaskExecutionNode-Methode und unterstützende Methoden

internal static void SendTaskRequestToTaskExecutionNode(List<STaskInfo> staskInfos)
  {
  if (staskInfos.Count() == 0)
    return;
  var channel = new DuplexChannelFactory<ITaskExecutionNode>(
                new InstanceContext(new CallbackHandler()),
                new NetHttpBinding(), new EndpointAddress(“http://.../”))
                .CreateChannel();
  try
  {
    var requestId = Guid.NewGuid().ToString();
    var reqs = staskInfos.Select(s => AddRequestToTracker(requestId,s, channel))
      .Where(s => s != null);
    ((IChannel)channel).Open();
    channel.Start(reqs.ToList<STask>());
  }
  catch (CommunicationException ex)
  {
    foreach (var stask in staskInfos)
      HandleClientUpdate(stask.ClientRequest.Id, STaskStatus.Faulted, ex);
  }
  }
private static STask AddRequestToTracker(string requestId,
  STaskInfo info, ITaskExecutionNode channel)
{
  info.ExecutionRequestId = requestId;
  info.ExecutionRequestChannel = channel;
  if (_submissionTracker.TryAdd(info.ClientRequest.Id, info))
    return info.ClientRequest;
  HandleClientUpdate(info.ClientRequest.Id, STaskStatus.Faulted,
    new Exception(“Failed to add “));
  return null;
}

Die SendTaskToTaskExecutionNode-Methode erstellt eine Rückrufinstanz, um das Ergebnis der Taskausführung in einem Taskausführungsknoten zu verarbeiten:

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
  public class CallbackHandler : ITaskUpdateCallback
  {
    public void UpdateStatus(string id, STaskStatus status, string result)
    {
      CoordinatorContext.HandleClientUpdate (id, status, result);
    }
  }

CallbackHandler verarbeitet den Rückrufvorgang durch Aufrufen der HandleClientUpdate-Methode. Diese Methode ruft die entsprechende STaskInfo-Instanz ab und entfernt sie aus submitterTracker. Zur Aktualisierung des Ergebnisses führt sie einen Rückruf an den Client aus. Wenn dies die letzte Anforderung in der Gruppe ist, schließt die Methode außerdem den Kanal zwischen dem Taskkoordinator und dem Taskausführungsknoten. In Abbildung 17 wird die Implementierung der HandleClientUpdate-Methode dargestellt.

Abbildung 17: HandleClientUpdate-Methode und unterstützende Methoden

internal async static void HandleClientUpdate(
  string staskId, STaskStatus status, object result)
  {
    STaskInfo info;
    if (!_submissionTracker.TryGetValue(staskId, out info))
      throw new Exception(“Could not get task from the tracker”);
try
  {
    await Task.Run(() =>
      info.CallbackChannel.UpdateStatus(info.ClientRequest.Id, status, result));
    RemoveComplete(info.ClientRequest.Id);           
  }
catch(AggregateException ex)
  {
  // ...
  }
  }
private static void RemoveComplete(string staskId)
  {
    STaskInfo info;
    if (!_submissionTracker.TryRemove(staskId, out info))
      throw new Exception(“Failed to be removed from the tracking collection”);
    if (_submissionTracker.Values.Where((t) => t.ExecutionRequestId ==
      info.ExecutionRequestId).Count() == 0)
      CloseTaskRequestChannel((IChannel)info.ExecutionRequestChannel);
  }
  private static void CloseTaskRequestChannel(IChannel channel)
  {
    if (channel != null && channel.State != CommunicationState.Faulted)
      channel.Close();
  }

Taskimplementierung

Im Clientcode ist typeName einer der erforderlichen Parameter beim Hinzufügen von Anforderungen. Dieser Wert erreicht schließlich den Taskausführungsknoten. Der Wert von typeName ist der Typname der Implementierung einer Schnittstelle, die einen Funktionsdelegaten verfügbar macht. Dieser kapselt die Funktionalität ein, die als paralleler Task ausgeführt werden soll, und befindet sich in allen Taskausführungsknoten. Ich nenne diese Schnittstelle IRunnableTask. Die Implementierungen dieser Schnittstelle sollten als Parameter ein Abbruchtoken und ein Parameterarray vom Client erhalten. Der Delegat sollte auch das Taskergebnis zurückgeben. So sieht die Schnittstelle aus:

public interface IRunnableTask
{
  Func<Object> Run(CancellationToken ct, params string[] taskArgs );
}

Starten eines Tasks in einem Taskausführungsknoten

Im Wesentlichen ist ein Taskausführungsknoten für die Umwandlung einer Taskausführungsanforderung in einen von der TPL ausführbaren Task zuständig. Kurz gesagt, das Starten eines TPL-Tasks. Abbildung 18 zeigt eine Implementierung dieses Prozesses, den ich anschließend erläutere.

Abbildung 18: Starten eines Tasks

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class TaskExecutionNodeHandler : ITaskExecutionNode
{              
  public void Start(List<STask> stasks)
  {
    var callback =
      OperationContext.Current.GetCallbackChannel<ITaskUpdateCallback>();
    foreach (var t in stasks)
      TaskExecutionContext.Start(t,callback);           
  }
...
}
public static class TaskExecutionContext
{
...
internal static void Start(STask stask, ITaskUpdateCallback callback)
{
  try
  {
    // Step 1.a
    var rtasks = CompositionUtil.ContainerInstance.GetExports<IRunnableTask>();
    // Step 1.b
    var rtask = from t in rtasks
                where t.Value.GetType().FullName == stask.STaskTypeName
                select t.Value;
    // Step 2
    var cs = new CancellationTokenSource();
    var ct = cs.Token;
    TaskExecutionContext._cancellationSources.TryAdd(stask.Id, cs);
    // Step 3 
    Task<Object>
      .Run(rtask.First().Run(ct, stask.STaskParameters), ct)
      .ContinueWith(tes => UpdateStatus(tes, stask, callback));
  }
  catch (Exception ex)
  {
    ...
  }
}
...
}

Schritt 1 (a und b): In diesem Stadium muss der Taskausführungsknoten eine IRunnableTask-Instanz erstellen, die einen Delegaten zurückgibt. Dieser wird als Task des Typs ausgeführt, der vom Client angefordert wurde. Dazu nutze ich MEF und ein neues Feature in .NET Framework 4.5, das einen attributfreien Konfigurationsansatz ermöglicht. Der Code in Abbildung 19 erstellt eine einzelne Containerinstanz, die alle Implementierungen von IRunnableTask, die sich im Verzeichnis „extensions“ befinden, exportiert. Weitere Informationen über MEF und den attributfreien Konfigurationsansatz finden Sie im MSDN Magazin-Artikel „Ein attributfreier Ansatz für die Konfiguration von MEF“ vom Juni 2012 unter msdn.microsoft.com/magazine/jj133818.

Abbildung 19: Erstellen eines Containers

internal static  class CompositionUtil
{
  private readonly static Lazy<CompositionContainer> _container =
    new Lazy<CompositionContainer>(() =>
    {
      var builder = new RegistrationBuilder();
      builder.ForTypesDerivedFrom<IRunnableTask>()                         
             .Export<IRunnableTask>()
             .SetCreationPolicy(CreationPolicy.NonShared);
      var cat = new DirectoryCatalog(“extensions”, builder);
      return new CompositionContainer(cat, true, null);
    }
  ,true);
  internal static CompositionContainer ContainerInstance
  {
    get { return _container.Value; }
  } 
}

Zurück zum Code in Abbildung 18. Der Code erhält mithilfe des Containers den Export vom Typ IRunnableTask. Anschließend wählt der Code die Instanz mit dem Typnamen aus, der mit der Clientanforderung übereinstimmt. Ich bin dabei von der wichtigen Annahme ausgegangen, dass es nur eine Taskinstanz gibt, die dem vom Client angeforderten Typ entspricht. Aus diesem Grund verwende ich die erste von der LINQ-Abfrage zurückgegebene Instanz.

Schritt 2: Vor der eigentlichen Erstellung des TPL-Tasks erstellt der Code eine Quelle für ein Abbruchtoken und ein Abbruchtoken. Ich verwende zur Nachverfolgung der Abbruchquelle eine einzelne Instanz von ConcurrentDictionary<TKey,TValue>. Der Taskausführungsknoten verwendet diese Liste mit Abbruchquellen, wenn ein Client einen Abbruch anfordert. Hier ist die Definition dieser Instanz:

public static class TaskExecutionContext
{
...
private readonly static ConcurrentDictionary<string, 
  CancellationTokenSource> _cancellationSources =
  new ConcurrentDictionary<string, CancellationTokenSource>();
...
}

Schritt 3: An dieser Stelle führe ich den Task mit dem soeben erstellten Abbruchtoken aus. Dem Task folgt eine Fortsetzungsaufgabe. Diese ist notwendig, da der Taskkoordinator durch einen Dienstaufruf mit dem Ausführungsergebnis aktualisiert werden muss, sobald der TPL-Task abgeschlossen ist (erfolgreich oder fehlerhaft). Wie in Abbildung 20 gezeigt, kapsele ich den Prozess zur Aktualisierung des Taskkoordinators in einen Delegaten ein. Dieser empfängt den TPL-Task als einen Parameter, die Taskausführungsanforderung und eine Rückrufinstanz für den Taskkoordinator.

Abbildung 20: Einkapseln des Aktualisierungsprozesses

private static Action<Task<Object>, STask, 
  ITaskUpdateCallback> UpdateStatus = (t, st, cb) =>
  {
    try
    {
      STaskStatus s;
      Object r = null;
      switch (t.Status)
      {
        case TaskStatus.Canceled: s = STaskStatus.Canceled;
          break;
        case TaskStatus.Faulted:
          s = STaskStatus.Faulted;
          r = t.Exception.Flatten();
          break;
        case TaskStatus.RanToCompletion:
          s = STaskStatus.Completed;
          r = t.Result;
          break;
        default:
          s = STaskStatus.Faulted;
          r = new Exception("Invalid Status");
          break;
      }
      CancellationTokenSource cs;
      TaskExecutionContext._cancellationSources.TryRemove(st.Id, out cs);
      cb.UpdateStatus(st.Id, s, r);
    }
    catch (Exception ex)
  {
  // Error handling
  }
};

Anfordern und Verarbeiten eines Abbruchs

Die TPL stellt einen Mechanismus zur Implementierung eines Taskabbruchs bereit. Dazu kapselt der Delegat den tatsächlich ausgeführten Prozess ein, da ein TPL-Task auf die Abbruchanforderung antworten und die Ausführung beenden muss. Weitere Informationen zum Taskabbruch finden Sie im MSDN-Bibliotheksartikel „Aufgabenabbruch“ unter bit.ly/NYVTO0.

Einer der Parameter der IRunnableTask-Schnittstelle ist ein Abbruchtoken. Der Taskausführungsknoten erstellt für jeden Task ein Token. Die Implementierung der Schnittstelle bestimmt, wann auf eine Abbruchanforderung geprüft und der Prozess ordentlich beendet wird. Der Code in Abbildung 21 zeigt einen einfachen Task zum Berechnen der Anzahl von geraden Zahlen in einem Bereich. Dabei wird geprüft, ob ein Abbruch angefordert wurde.

Abbildung 21: Prüfung auf einen Abbruch

public class MySimpleCTask : IRunnableTask
{
  public Func<Object> Run(Nullable<CancellationToken> ct, 
    params string[] taskArgs)
  {
    var j = int.Parse(taskArgs[0]);
    var z = 0;
    return (() =>
  {
      for (int i = 0; i < j; i++)
      {
        if (i % 2 != 0)
        {
          z++;
          ct.Value.ThrowIfCancellationRequested();
        }
      }
      return z;
   });
  }
}

Bei der Beschreibung des Clients haben Sie bereits gesehen, dass Sie eine Anforderung mit einem Abbruchtoken hinzufügen können, und der Client das erforderliche Abonnement intern ausführt. Bei Auslösen eines Abbruchs wird also eine Abbruchanforderung an den Taskkoordinator gesendet. Nachdem der Taskkoordinator die Abbruchanforderung empfangen hat, prüft er, ob die Anforderung an einen Taskausführungsknoten übermittelt wurde, und sendet eine Abbruchanforderung. Der Taskausführungsknoten sucht dann nach der Abbruchquelle, die dem von der Client-ID angeforderten Task entspricht. Die Übermittlung der Abbruchanforderung an den Taskausführungsknoten ist vergleichsweise einfach. Sie müssen dazu nur den entsprechenden Kanal für die Anforderung finden, mit der der Taskkoordinator die Taskausführungsanforderung ursprünglich übermittelt hat. Diese Kanäle müssen für die Rückrufe geöffnet bleiben, mit denen der Status der Ausführungsanforderung aktualisiert wird.

Abbildung 22 zeigt die Implementierung der Dienstvorgänge im Taskkoordinator.

Abbildung 22: Implementierung der Dienstvorgänge im Taskkoordinator

public class TaskCoordinatorService : ITaskCoordinator
{
...
public bool CancelTask(string Id)
  {
    return CoordinatorContext.CancelTask(Id);
  }
  ...}
public static class CoordinatorContext
{
...
internal static bool CancelTask(string Id)
{
STaskInfo info;
if(_submissionTracker.TryGetValue(
  Id, out info) && info.ExecutionRequestChannel != null)
{
  info.ExecutionRequestChannel.Cancel(Id);
  return true;
}
return false;
}
  ...
}

Abbildung 23 zeigt schließlich die Implementierung der Dienstvorgänge in den Taskausführungsknoten.

Abbildung 23: Implementierung der Dienstvorgänge in den Taskausführungsknoten

class CancellationHandler : ICancellationHandler
  {
  public void Cancel(STask stask)
  {
    TaskExecutionContext.CanceTask(stask.Id);
  }
  }
public static class TaskExecutionContext
{
...
  internal static void CancelTask(string Id)
  {
    CancellationTokenSource tknSrc;
    if (_cancellationSources.TryGetValue(Id, out tknSrc))
      tknSrc.Cancel(); }
...
}

Skalierbarkeit des Taskkoordinators und weitere Aspekte

Diese Implementierung geht von der Ausführung des Taskkoordinators auf einem einzelnen Knoten aus. Es ist aber auch gut möglich, den Umfang des Taskkoordinators zu vergrößern. Das erfordert mindestens die folgenden Änderungen:

  • Für den Zugriff auf den Taskkoordinator muss ein Lastenausgleich hinzugefügt werden.
  • Wie bereits beschrieben, ist der wesentliche Aspekt bei Verwendung der Drosselung eine genaue Anzahl der ausgeführten Tasks, insgesamt und nach Typ. In einem Szenario, in dem mindestens zwei Knoten als Taskkoordinatoren ausgeführt werden, müssen diese Indikatoren zentral verwaltet werden, zum Beispiel in einer Datenbank. Dabei müssen sie weiterhin aktualisiert oder synchron gelesen werden können (u. a. sind Racebedingungen und Deadlocks zu vermeiden).

Zum Schluss möchte ich anmerken, dass wie bei jedem Entwicklungsansatz der Nutzen und die Risiken mit denen anderer Alternativen verglichen werden müssen, die möglicherweise Ihrem Bedarf entsprechen und als Standardversionen verfügbar sind. Technologien wie Microsoft HPC Server können für viele Szenarios eine überzeugende Lösung darstellen, bei denen Sie sonst den hier beschriebenen Ansatz verwendet hätten.

Optimieren von Ressourcen

Die TPL stellt die notwendige Infrastruktur bereit, um die CPU-Ressourcen eines einzelnen Mehrkerncomputers optimal zu nutzen. Sie ist außerdem nützlich, wenn ein computerübergreifender Skalierungsansatz implementiert wird. Das kann in Szenarios mit Automatisierung der Arbeitsauslastung und Batchverarbeitung hilfreich sein, in denen Parallelität nicht nur auf einem einzelnen Mehrkernserver, sondern auch serverübergreifend erforderlich ist.

Zum Erzielen dieser horizontalen Skalierbarkeit müssen verschiedene Architekturaspekte berücksichtigt werden. Die wichtigsten dieser Aspekte: die Notwendigkeit des Lastenausgleichs für die vorhandenen Ressourcen, wobei der bestehenden Farm weitere Ressourcen hinzugefügt werden können, und die Möglichkeit zur Drosselung der Ressourcen gemäß der Semantik der auszuführenden Tasks. Die Entwicklungstools und -technologien von Microsoft bieten Ihnen die notwendigen Bausteine zur Implementierung einer Architektur, die diese entscheidenden Aspekte berücksichtigt.

Jesus Aguilar ist Senior Application Development Manager im Premier Support for Developers-Bereich bei Microsoft.

Unser Dank gilt den folgenden technischen Experten für die Durchsicht dieses Artikels: Ryan Berry, Steve Case, Rick Claude und Piyush Joshi.