.imgright1 { float: right; margin: 5px 0px 10px 10px; width:353px; font-weight: normal; font-size: 14px; color: #003399; font-family: "Segoe UI", Arial; }

.NET 相关问题

并行使用依赖项的操作

Stephen Toub

问: 我有一个系统的多个组件需要在执行,但是这些组件可能上相互有依赖项。 我希望能够尽可能并行运行这些组件。 我如何可以转有关执行此操作?

串行或并行,这是一个相当常见的问题,在开发软件, A 。 在连续的实现常见的方法是为图形模型依赖项。 每个组件由在的图形中的一个节点代表,每个依赖项表示由共享依赖项在两个节点之间的定向边缘。 这将创建内容的已知作为一个定向非循环关系图 (DAG),假设没有周期 (是例如组件 1 取决组件 2,依赖组件 3,回依赖组件 1 的)。 拓扑的排序可以然后用于创建一个部分排序这些组件之间,以便如果执行了组件按顺序的所有组件的依赖项的顺序将已都完成时将运行该组件。

fig01.gif

图 1 A 定向非循环图

假设您有有以下依赖项 (我将引用本示例在此专栏) 之类的 8 个组件:

  • 1、 2 和 3 的组件依赖于没有其他
  • 组件 4 依赖组件 1
  • 组件 5 依赖 1、 2 和 3 的组件
  • 组件 6 依赖 3 和 4 的组件
  • 组件 7 依赖第 5 步和第 6 的组件
  • 组件 8 依赖组件 5

对于此设置的依赖项的 DAG 如 图 1 所示。 有序的应用程序中通过运行拓扑排序通过此图,您可能会几个部分 orderings,包括只运行它们按升序排列 (1,2,3,4,5,6,7,8),这正好满足此处列出依赖项之一。 其他 orderings 当然是可能如 3,2,1,5,8,4,6,7。

能够并行执行的某些组件如下所示的图中可能产生速度有价值的增加。 假设,只的图,每个组件采用执行,一秒,这是每个组件的本身在本质上与无限号为处理内核,在计算机上运行应用程序的顺序和没有所需的以外执行组件 (没有开销) 零处理。 如果您是按顺序运行的所有组件,它将需要八秒钟。 但您可以更好地并行。 您可以运行组件 1、 2 和同时,3 采用总共一秒钟。 您可以再运行 4 和 5 同时,采用另一第二个。 这些完成时,您可以运行 6 和 8 同时,采用另一第二个。 并且最后运行用于的四个秒总计的 7。 使用并行性,已经因此剪切示例问题的运行时间一半。

提到,这种问题弹出很多不同的软件问题域中。 MSBuild 为例。 Microsoft.NET Framework–based 项目通常引用这样设置这些两个项目之间的依赖项在同一解决方案中其他.NET 项目。 在 N 的 MSBuild 启动提供 / m 开关并行化在生成过程在计算机中使用可用的处理器利用像提到的项目引用的依赖项信息。

本专栏的其余部分,我将构建一个小的 DependencyManagement 类,以帮助解决这些问题。 下面是公共 API 的分级显示:

public class DependencyManager 
{
  public void AddOperation(
    int id, Action operation, params int [] dependencies);
  public event EventHandler<OperationCompletedEventArgs> 
    OperationCompleted;
  public void Execute();
}

主入口点到类型是在 AddOperation 为方法接受您提供给表示操作执行的操作,委托的唯一 ID 它并且其完成此操作所依赖的依赖项组成。 如果操作不有任何依赖项,参数数组将为空。

一构成的操作完成,提供的操作以及起始 ID 和结束时间执行的任何时间,将引发 OperationCompleted 事件。

和最后,Execute 方法开始关闭过程,并等待所有操作完成。 DependencyManager 类将并行运行操作时, API 本身并不旨在线程安全这意味着应将只有一个线程使用以访问 AddOperation,并执行一次。

此类允许我 Express 以前的示例问题,以下列方式:

Action oneSecond = () => { ComputeForOneSecond(); };
DependencyManager dm = new DependencyManager();
dm.AddOperation(1, oneSecond);
dm.AddOperation(2, oneSecond);
dm.AddOperation(3, oneSecond);
dm.AddOperation(4, oneSecond, 1);
dm.AddOperation(5, oneSecond, 1, 2, 3);
dm.AddOperation(6, oneSecond, 3, 4);
dm.AddOperation(7, oneSecond, 5, 6);
dm.AddOperation(8, oneSecond, 5);
dm.Execute();

内部,有关每个已注册的操作的信息存储在 OperationData 类中:

private class OperationData {
  internal int Id;
  internal Action Operation;
  internal int [] Dependencies;
  internal ExecutionContext Context;
  internal int NumRemainingDependencies;
  internal DateTimeOffset Start, End;
}

ID、 操作和依存关系字段存储在相应的数据,作为参数,传递到 AddOperation 方法和上下文域存储 AddOperation 被调用时捕获的 ExecutionContext (有关为什么这一点非常重要的详细信息,请参阅我最后的几个.NET 问题列)。 此外,OperationData 存储需要完成,然后才可以运行的剩余依赖项的数量。 最后,它跟踪的开始,并此操作,并将这些的结束时间将报告给 DependencyManager 的使用者,当对此特定的操作中引发 OperationCompleted 事件。

两个集合用于维护的一组添加到 DependencyManager 实例的操作:

private Dictionary<int, OperationData> _operations  =
  new Dictionary<int, OperationData>();private Dictionary<int, List<int>> _dependenciesFromTo;

在以前的字典包含对于每个添加的操作的一项,并将操作的 ID 映射到其 OperationData 实例 ; 这使快速且轻松地查找数据操作分配的 ID。 而该第一个词典不包含所有您需要做工作的信息,不包含这些信息使工作,简单或有效的格式。 后者词典将修复此通过一个操作 ID 的所有直接依赖于它的操作 ID 提供一个映射。 对于是实例在示例问题前面所示此词典中的条目 ID 为 1 会将映射包含 ID 4 和 5,因为这些两种操作直接依赖 ID 为 1 的完成列表 (以。 此字典使得快速和方便完成操作将在完成通知发送给所有依赖它的其他操作。

DependencyManager 类包含三个更专用的字段,除了词典:

private object _stateLock = new object();
private ManualResetEvent _doneEvent;
private int _remainingCount;

_stateLock 字段用于异步执行已注册的操作过程中才保护通过处理这些操作即词典和所包含的数据访问的共享的状态。 在 ManualResetEvent 只作为 Execute 方法的机制等待完成的所有操作。 上次操作以完成将设置允许 Execute 方法以退出在该事件。 操作知道它的最后完成,因为它们完成时的所有操作减少 _remainingCount 字段。 在这种情况下,如果现在零值和它的查找操作就减少,操作知道它的最后一个,并可以将设置该 _doneEvent。 (作为一个备用,这种倒计时逻辑通常位于派生/联结的并行性问题和.NET Framework 4.0 中新的 CountdownEvent 使这类问题更易于实现。

AddOperation 方法的代码很简单。

public void AddOperation(
    int id, Action operation, params int[] dependencies) {
  if (operation == null) 
    throw new ArgumentNullException("operation");
  if (dependencies == null) 
    throw new ArgumentNullException("dependencies");

  var data = new OperationData {
    Context = ExecutionContext.Capture(),
    Id = id, Operation = operation, 
      Dependencies = dependencies
  };
  _operations.Add(id, data);
}

后参数是否有效,该方法将所有数据捕获到在 OperationData 类的实例以及该实例存储到跟踪所有操作的字典。

在所有的操作已添加,可以运行了 图 2 ,所示将过程将该 Execute 方法。 该方法开头设置即加转置依赖项为中每个 OperationData 存储,以便可以轻松地查找目标操作分配该目标的 ID。 具有依赖项的操作的信息的运行的相关性信息

图 2 启动过程

public void Execute() {
  // TODO: verification will go here later
  // Fill dependency data structures
  _dependenciesFromTo = new Dictionary<int, List<int>>();
  foreach (var op in _operations.Values) {
    op.NumRemainingDependencies = op.Dependencies.Length;

    foreach (var from in op.Dependencies) {
      List<int> toList;
      if (!_dependenciesFromTo.TryGetValue(from, out toList)) {
        toList = new List<int>();
        _dependenciesFromTo.Add(from, toList);
      }
      toList.Add(op.Id);
    }
  }

  // Launch and wait
  _remainingCount = _operations.Count;
  using (_doneEvent = new ManualResetEvent(false)) {
    lock (_stateLock) {
      foreach (var op in _operations.Values) {
        if (op.NumRemainingDependencies == 0) 
          QueueOperation(op);
      }
    }
    _doneEvent.WaitOne();
  }
}

private void QueueOperation(OperationData data) {
  ThreadPool.UnsafeQueueUserWorkItem(state => 
    ProcessOperation((OperationData)state), data);
}

本方法将通过查找所有具有零个依赖项的操作,并为.NET ThreadPool 执行队列它们的工作的其余部分。

以前,我注意到 Execute 方法和在 AddOperation 不相对于彼此,线程安全表示这些方法应按顺序调用,首先使用 AddOperation 添加的所有操作执行,然后调用执行。 因此,注意 AddOperation 访问任何锁或其他机制用于共享的状态同步词典。 在锁定,但是,执行 does 换行属于其主体。 这是因为一次就在排队等候将第一个工作项运行,该工作项可能会访问,和修改同时与执行正文的其余部分词典。 要防止发生这样的争用,受执行的主体的其余部分。 所有的相关工作已经被排队,Execute 方法将阻止等待的所有工作完成。

QueueOperation 只是队列在的 ThreadPool ProcessOperation 方法,并 ProcessOperation 运行指定的操作。 此方法如 图 3 所示。 方法的第一部分只执行提供 OperationData 中存储的委托,这样在提供的 ExecutionContext (是否一未能被捕获),且超存储结果的运行备份到更高版本的报告的数据结构。 (有关更多详细的计时信息 System.Diagnostics.stopwatch 可以使用而不是 DateTimeOffset)。

图 3 处理的操作

private void ProcessOperation(OperationData data) {
  // Time and run the operation's delegate
  data.Start = DateTimeOffset.Now;
  if (data.Context != null) {
    ExecutionContext.Run(data.Context.CreateCopy(), 
      op => ((OperationData)op).Operation(), data);
  }
  else data.Operation();
  data.End = DateTimeOffset.Now;


  // Raise the operation completed event
  OnOperationCompleted(data);

  // Signal to all that depend on this operation of its
  // completion, and potentially launch newly available
  lock (_stateLock) 
{
    List<int> toList;
    if (_dependenciesFromTo.TryGetValue(data.Id, out toList)) {
      foreach (var targetId in toList) {
        OperationData targetData = _operations[targetId];
        if (--targetData.NumRemainingDependencies == 0) 
          QueueOperation(targetData);
      }
    }
    _dependenciesFromTo.Remove(data.Id);


    if (--_remainingCount == 0) _doneEvent.Set();
  }
}

private void OnOperationCompleted(OperationData data) {
  var handler = OperationCompleted;
  if (handler != null)
    handler(this, new OperationCompletedEventArgs(
      data.Id, data.Start, data.End));
}

public class OperationCompletedEventArgs : EventArgs {
  internal OperationCompletedEventArgs(
    int id, DateTimeOffset start, DateTimeOffset end) {
    Id = id; Start = start; End = end;
  }
  public int Id { get; private set; }
  public DateTimeOffset Start { get; private set; }
  public DateTimeOffset End { get; private set; }
}

该操作完成后,您需要通知任何相关的操作。 下面是 handy_dependenciesFromTo 词典进入播放。 您只是枚举的所有依赖此特定的项,查找它们 OperationData 实例并减少其 NumRemainingDependencies 一个操作。 如果剩余的依赖其数变得零,请现在适于执行,并 QueueOperation 来启动每个符合条件的操作。

还可以执行一些 housecleaning: 现在在操作完成不需要的项 in_dependenciesToFrom 再次,以便您可以删除它。

最后,您通知在 DependencyManager 已完成一项的更多操作,可能设置 the_doneEvent (将唤醒 Execute 方法),如果是最后一次操作。

没有异常处理所示这种实现,但添加一些可能是重要的具体取决于如何使用类。 例如,如果一个 OperationCompleted 事件处理程序引发异常,该异常将传播的 OnOperationCompleted 只引发该事件),阻止任何依赖此操作被通知,可能会挂起系统中。 获取一个需要解决的强大实现。

您已了解到现在为止所代表实现大部分。 但是,仍有一些有趣的方法可以。 是例如实现假定调用其他操作的依赖项已注册的时间执行的所有操作。 如果它们不相关的完成通知将不会引发,并执行将永远不会返回,因为某些依赖项将已得到满足,并因此某些操作不会过运行。 若要解决此问题,可以实施将捕获这样误用并将对其调用放在 Execute 方法的开头的 VerifyThatAllOperationsHaveBeenRegistered 方法:

private void VerifyThatAllOperationsHaveBeenRegistered() {
  foreach (var op in _operations.Values) {
    foreach (var dependency in op.Dependencies) {
      if (!_operations.ContainsKey(dependency)) {
        throw new InvalidOperationException(
          "Missing operation: " + dependency);
      }
    }
  }
}

同样,可以验证有实际上没有周期在图形中。 如果存在周期,Execute 方法将永远不会完成,因为某些依赖项将保持 unsatisfied。 例如,假设错误地输入组件 2 依赖组件 8。 这将导致循环 (2 = 5 > = > = 2 > 的 8),将防止 2、 5、 7 和 8 的组件执行。 可以检测类周期方法 VerifyThereAreNoCycles,重新放置 (之后 VerifyThatAllOperationsHaveBeenRegistered) 开始处执行的对此方法的调用。

有几个用于查找在图中的周期的适当算法。 一个是从 (在中的情况下此文章,在我的示例中具有任何依赖项如 1、 2 和 3 的组件操作的) 图表中每个项节点执行深度优先搜索在图表上。 深度优先搜索方式,当您访问它时将标记一个节点。 如果您访问已被标记的节点,您已找到一个循环。

另一种解决方案基于拓扑排序此列的开头提到。 如果将失败拓扑排序关系图将具有一个周期。 由于具有拓扑排序是也很有用一般情况下 (例如,如果想要 (可选) 使用 DependencyManager 按顺序执行),我选择的方法。

图 4 显示了返回操作 ID 的部分有序的列表 CreateTopologicalSort 方法的实现。 (该代码还显示则 VerifyThereAreNoCycles 为方法这只需调用 CreateTopologicalSort 并引发异常,如果该方法返回空指示排序失败)。 首先进行是临时的字典排序操作的数据结构中转换操作词典。 这些临时字典映射在到词典您以前看到的概念。 实际上,dependenciesToFrom 会将操作 ID 映射到的所有操作这取决于 (入站边缘) 的 ID。 相反,dependenciesFromTo 会将操作 ID 映射到的所有依赖它 (出站边缘) 的操作的 ID。 该算法运行,您将缓慢 pare 下直到,这些字典的内容很有可能,没什么左。

图 4 拓扑排序和循环检测

private void VerifyThereAreNoCycles() {
  if (CreateTopologicalSort() == null) 
    throw new InvalidOperationException("Cycle detected");
}

private List<int> CreateTopologicalSort() {
  // Build up the dependencies graph
  var dependenciesToFrom = new Dictionary<int, List<int>>();
  var dependenciesFromTo = new Dictionary<int, List<int>>();
  foreach (var op in _operations.Values) {
    // Note that op.Id depends on each of op.Dependencies
    dependenciesToFrom.Add(op.Id, new List<int>(op.Dependencies));

    // Note that each of op.Dependencies is relied on by op.Id
    foreach (var depId in op.Dependencies) {
      List<int> ids;
      if (!dependenciesFromTo.TryGetValue(depId, out ids)) {
        ids = new List<int>();
        dependenciesFromTo.Add(depId, ids);
      }
      ids.Add(op.Id);
    }
  }

  // Create the sorted list
  var overallPartialOrderingIds = new List<int>(dependenciesToFrom.Count);
  var thisIterationIds = new List<int>(dependenciesToFrom.Count);
  while (dependenciesToFrom.Count > 0) {
    thisIterationIds.Clear();
    foreach (var item in dependenciesToFrom) {
      // If an item has zero input operations, remove it.
      if (item.Value.Count == 0) {
        thisIterationIds.Add(item.Key);

        // Remove all outbound edges
        List<int> depIds;
        if (dependenciesFromTo.TryGetValue(item.Key, out depIds)) {
          foreach (var depId in depIds) 
{
            dependenciesToFrom[depId].Remove(item.Key);
          }
        }
      }
    }

    // If nothing was found to remove, there's no valid sort.
    if (thisIterationIds.Count == 0) return null;

    // Remove the found items from the dictionary and 
    // add them to the overall ordering
    foreach (var id in thisIterationIds) dependenciesToFrom.Remove(id);
    overallPartialOrderingIds.AddRange(thisIterationIds);
  }

  return overallPartialOrderingIds;
}

一旦这些词典进行填充,您开始运行该算法。 其理念是您 cull 关系图中有零个入站的边缘的所有节点在本文的上下文,这意味着对其他操作的任何依赖项的操作。 找到后此类操作,将它们添加到已排序的结果 (用在其末尾添加的顺序无关紧要,因此"部分排序"术语) 的末尾,删除关系图。 您还需要从这些操作它您可以轻松地删除所有出站边缘使用 dependenciesFromTo 字典的查找。 关系图包含零个节点,此时算法完成并且之前您返回您建立排序,您反复执行此过程。 如果在您要的收到但是,其中的所有节点在图表中具有入站的点边缘,图表包含一个周期,并且任何更多可以执行。

与,实现已完成。 本专栏开头向我显示运行该示例问题,我已被引用整个未能与 DependencyManager 类使用该代码。 如果您将记住我建议您未能获得到四个秒运行的时间。 实际上,我的便携式计算机上运行该示例代码时, 得到悬停几毫秒时间为 4 秒内的排练时间。

但是这是部分幸运。 我 postulated 在假想环境包括核心无限数目的计算机。 遗憾的是,我的便携式计算机未在这种情况下,配置,且而是仅限于使用两个核心。 但是,指定正确的顺序,问题可以仍执行与两个处理单元的四个秒。 如果操作 1 和 2 开始运行,然后第二个的更高版本 3 和 4 可以开始运行,然后第二个的更高版本 5 和 6 可以启动运行,第二个更高版本 7 和 8 可以启动运行。

但是,如果 2 和 3 的操作会运行第一个而不是 1 和 2 的操作会怎样呢? 这当然可能是,所有操作 1、 2 和 3 有任何依赖项中。 如果操作 2 和 3 运行第一个,然后下迭代操作 1 和 5 将可以运行。 进入后,4 和第 8 步的操作将能够转。 但在下一次的迭代仅操作 6 可,因为操作 7 取决于操作 6,完成并因此整个计算将会大约五秒而不是四个。

在我的示例应用程序在更改,我调用的顺序 AddOperation 第一个注册操作 3 和 2 时, 在输出时确认本期件箱与只超过 5 秒的运行时间。 如果作为一名开发人员必须在 priori 了解每个操作将需要多长时间,可以使用的构造一个更好地排序方式启动操作。 为您保留为练习。

当然有很多其他有趣事情您可以构建的这种实现使用它作为起始点。 对于任何依赖项的操作,您可能需要到能够启动它们将其添加该的 DependencyManager 到作为运行而不是等待,直到调用执行。 您可能希望能够异步执行,DependencyManager,和更高版本等待它的结果或来做其他它完成时。 您可能希望能够建立 DependencyManagers 的图形。 您可能需要能够返回以及传递到依赖项的数据的操作因此生成最大的数据流网络。 等。 有很多有趣且有用的可能性。 享受 !

有关.NET 相关问题到请将您的问题和意见 netqa@Microsoft.com.

Stephen Toub 是 Microsoft 并行计算平台团队的高级项目经理领导。 他还是为 MSDN Magazine 的特约。