.NET 相关问题

排序与 ThreadPool 的执行

Stephen Toub

问: 我的系统中的多个组件需要异步,执行工作这使我认为 Microsoft.NET Framework ThreadPool 是正确的解决方案。但是,我有我认为是唯一的要求: 每个组件需要确保按顺序处理的工作项并且,结果,没有两个它的工作项执行一次。确定,通过,与彼此同时执行多个组件 ; 实际上的需要。是否有任何建议?

这不是为唯一一个困境如您可能认为视为各种包括基于传递的邮件的重要方案 A 。考虑使用一个管道实现通过的管道在任何时刻活动的多个阶段获得并行性的好处。

例如,您可能有数据从文件中读取、 压缩它,其,加密和将它写出到一个新的文件的管道。压缩可以与该的加密同时但不是相同的数据在同一时间后要输入到另一个需要的输出。而,压缩例程可以压缩某些数据,并发送它关闭给加密例程进行处理此时该压缩例程可以处理下一段数据。

由于许多压缩和加密算法维护影响如何将来的数据的状态是压缩和加密,很重要保持顺序。(never mind 本示例处理文件,它会是很好如果未能解密并解压缩结果以得到正确的顺序的回原始数据的所有)。

有几个可能的解决方案。第一个解决方案是只是专用于每个组件的线程。此 DedicatedThread 必须要执行的工作项和服务的队列的单个线程的-先出 (FIFO) 队列。当该组件进行的工作时它转储到在的队列的工时,并选取工作和执行最终将线程获取周围。因为只有一个线程,则只有一项将运行一次。和使用 FIFO 队列,它们生成顺序中处理工作项。

我提供的示例中2008 年 1 月.NET 相关问题列我将使用简单的工作项类来表示工作是执行, 图 1 所示。使用此工作项类型的 DedicatedThread 的实现如 图 2 所示。实现大部分是基本的 BlockingQueue <T> 实现 (.NET Framework 4.0 包含 BlockingCollection <T> 一个更好的类型适合这样的实现)。DedicatedThread 的构造函数只是创建 BlockingQueue <t> 实例,然后 spins 最不断地等待到达队列中的其他项目,然后再执行的线程。

图 1 捕获工作项

internal class WorkItem {
  public WaitCallback Callback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = s => {
    var item = (WorkItem)s;
    item.Callback(item.State);
 };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else Callback(State);
  }
}

图 2 DedicatedThread 实现

public class DedicatedThread {
  private BlockingQueue<WorkItem> _workItems = 
    new BlockingQueue<WorkItem>();

  public DedicatedThread() {
    new Thread(() => {
      while (true) { workItems.Dequeue().Execute(); }
    }) { IsBackground = true }.Start();
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _workItems.Enqueue(new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() });
  }

  private class BlockingQueue<T> {
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _gate = new Semaphore(0, Int32.MaxValue);

    public void Enqueue(T item) {
      lock (_queue) _queue.Enqueue(item);
      _gate.Release();
    }

    public T Dequeue() {
      _gate.WaitOne();
      lock (_queue) return _queue.Dequeue();
    }
  }
}

方案这所提供的基本功能,它可能会满足您的需要,但有一些重要的缺点。第一次,线程被保留供每个组件。与一个或两个组件,不能有问题。但组件大量,这可能导致线程的数量的严重分解。导致错误的性能。

此特定的实现不也非常可靠。是例如如果怎样要结束一个组件,您如何判断线程停止阻止?如果从工作项引发异常会怎样?

作为一个备用的有趣,请注意此解决方案是类似于 Windows 使用在典型的消息泵。消息泵是一个循环,等待消息到达,调度它们 (处理它们),然后返回并等待多。该邮件特定窗口处理一个单线程。在 图 3 ,应表现出非常像 图 2 中代码的行为的代码演示在相似之处。新的线程是创建设置,创建一个控件,确保其句柄已初始化并使用 Application.Run 执行消息循环。若要排队到此线程的工作项,只需使用控件的 begin­invoke 方法。请注意我不推荐这种方法,但而只是指针的在较高级别的作为 DedicatedThread 解决方案已显示相同的基本概念。

图 3 相似的 UI 消息循环

public class WindowsFormsDedicatedThread {
  private Control _control;

  public WindowsFormsDedicatedThread() {
    using (var mre = new ManualResetEvent(false)) {
      new Thread(() => {
        _control = new Control();
        var forceHandleCreation = _control.Handle;
        mre.Set();
        Application.Run();
      }) { IsBackground = true }.Start();
      mre.WaitOne();
    }
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _control.BeginInvoke(callback, state);
  }
} 

第二个解决方案包括执行中使用 ThreadPool。而不是创建一个新的自定义的线程,每个服务专用队列的组件的旋转,我们将保留只是队列组件,这样从同一个队列没有两个元素过提供一次。这具有允许控制线程数量所需,处理其注入和处理可靠性问题和超出的最新的线程旋转,这是很少,是为业务获得您退休,本身 ThreadPool 的优点。

该解决方案的实现如 图 4 所示。fifo­execution 类维护只是两个字段: 工作项以进行处理的队列和值一个 Boolean 类型的值,该值指示是否请求已被颁发给 ThreadPool 处理工作项。这两个这些字段受工作项列表中的锁定。实现的其余都只需两个方法。

图 4 实现 FifoExecution

public class FifoExecution {
  private Queue<WorkItem> _workItems = new Queue<WorkItem>();
  private bool _delegateQueuedOrRunning = false;

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    var item = new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() };
    lock (_workItems) {
      _workItems.Enqueue(item);
      if (!_delegateQueuedOrRunning) {
        _delegateQueuedOrRunning = true;
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
      }
    }
  }

  private void ProcessQueuedItems(object ignored) {
    while (true) {
      WorkItem item;
      lock (_workItems) {
        if (_workItems.Count == 0) {
          _delegateQueuedOrRunning = false;
          break;
        }
        item = _workItems.Dequeue();
      }
      try { item.Execute(); }
      catch {
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
          null);
        throw;
      }
    }
  }
}

第一种方法签名相匹配的 ThreadPool (ThreadPool 还提供接受只是一个 WaitCallback,您可以选择添加一个重载一个便于使用重载) 公开的是 QueueUserWorkItem。该方法首先创建要存储的工作项,然后将锁定。(创建该工作项时访问没有共享的状态。因此,以便使锁定尽可能小,此项目的捕获是之前进行采用锁)。 一旦持有锁,创建的工作项目将为到工作项队列的排队。

该方法然后检查是否已被请求 ThreadPool 处理排队的工作项,并,如果一个没有已成为,使得类的请求 (和为将来说明它)。ThreadPool 此请求是只使用 ThreadPool 的线程之一执行 ProcessQueuedItems 方法。

当 ThreadPool 线程调用 ProcessQueuedItems 将进入循环。此循环中, 花费锁定,并按住锁,它检查是否有任何更多的工作项进行处理。如果没有任何,它复位请求标记 (以便将来的队列的项目将处理从池再次请求),并退出。如果有要处理的工作项,它获取下一个、 释放锁,执行该的处理并启动所有超过再次,运行直到队列中没有更多的项目。

这是简单但功能强大的实现。组件可能会立即创建 FifoExecution 的实例,并用它将日程安排工作项。每个实例的 FifoExecution,只有一个排队的工作项将能够执行一次,并且已排队的工作项将执行它们已排队的订单中。此外,工作实例将能够同时执行的不同 FifoExecution 中的项。最好的一部分您要现在超出线程管理的业务,离开的所有线程管理的硬盘 (但非常重要) 工作到 ThreadPool。

在极端情况下,每个组件保持与 ThreadPool 将可能斜坡达让每个组件,一个线程的工作达到饱和的池就像在原始 dedicated­thread 实现。但是,将只发生如果是认为适当的 ThreadPool。如果组件不保留池饱和,许多较少的线程将需要。

有是如让执行正确的操作与异常有关的 ThreadPool 之类的其他优点。在 DedicatedThread 实现中如果会怎样处理项目将引发异常?该线程将会关闭,崩溃,但数据被,根据应用程序的配置过程可能不会破坏下。在这种情况下工作项将开始到在的 DedicatedThread 的队列,但不是过将获取处理。fifo­execution,ThreadPool 将只是停止添加更多的线程已离开消失的补偿。

图 5 显示了一个利用 FifoExecution 类的简单演示应用程序。此应用程序在管道中有三个阶段。每个阶段写出当前段数据的 ID 它的使用 (它是只有循环迭代)。然后执行某些工作 (由一个 thread.SpinWait 表示在此处) 并将数据传递 (再次,只是循环迭代) 以及下一个阶段。每个步骤将输出其信息具有不同数目的选项卡,以便轻松地查看结果出分隔。如您可以看到在 图 6 所示的输出中,每个阶段 (列) 保持工作正确排序。

FifoExecution 的图 5 演示

static void Main(string[] args) {
  var stage1 = new FifoExecution();
  var stage2 = new FifoExecution();
  var stage3 = new FifoExecution();

  for (int i = 0; i < 100; i++) {
    stage1.QueueUserWorkItem(one => {
      Console.WriteLine("" + one);
      Thread.SpinWait(100000000);

      stage2.QueueUserWorkItem(two => {
        Console.WriteLine("\t\t" + two);
        Thread.SpinWait(100000000);

        stage3.QueueUserWorkItem(three => {
          Console.WriteLine("\t\t\t\t" + three);
          Thread.SpinWait(100000000);
        }, two);
      }, one);
    }, i);
  }

   Console.ReadLine();
}

fig06.gif

图 6 从演示应用程序的输出

此外很有趣要注意的管道的各个阶段之间的公平性不足。您可以看到是例如 图 6 中的 stage1 由已迭代 21,stage2 仍回位于 13 而 stage3 位于 9。这是很大程度上由于为了 ProcessQueuedItems 的实现。从服务 stage1 将可能坐 ProcessQueuedItems 循环中并不返回直到没有更多的 stage1 工作池示例应用程序非常快速地为 100 个工作项按到 stage1,并因此线程。这使它不公平的斜线通过其他阶段。如果您的应用程序中看到类似的行为,并且这是一个问题,可以增加通过修改为更类似于以下的 ProcessQueuedItems 实现阶段之间的公平性:

private void ProcessQueuedItems(object ignored) {
  WorkItem item;
  lock (_workItems) {
    if (_workItems.Count == 0) {
      _delegateQueuedOrRunning = false;
      return;
    }
    item = _workItems.Dequeue();
  }
  try { item.Execute(); }
  finally {
    ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
      null);
  }
}

现在,即使有要处理的更多项目,ProcessQueuedItems 不会循环但而不是将以递归方式队列本身在种 ThreadPool 因此确定其自身优先级其他阶段中的项目后面。在此修改, 图 5 中该应用程序现在输出查找中的显示 图 7 。您可以在计划编制确实像 stage2 和 stage3 使用与之前的更多公平性的此新输出中看到 (有仍一些滞后时间之间的阶段但这应该假定这是一个管道)。

fig07.gif

图 7 使用 Fairer 计划的新输出

当然,不会可用来自此增强的公平性。每个工作项现在会通过计划程序中添加一些成本的额外旅行。您需要确定是否可以为您的应用程序进行了权衡 ; 是例如在实际工作您正在您的工作项中的操作是否此开销应该是可以忽略和 unnoticeable。

这是只有一个更多示例就可以构建之上 ThreadPool 而无需自己创建自定义线程池中添加功能的系统执行的方式。有关其他示例,请参阅以前的版本的.NET 问题中的列MSDN 杂志 》 .

将您的问题和提出的意见发送至netqa@Microsoft.com.

Stephen Toub 是 Microsoft 并行计算平台团队在高级程序经理。他还是为 MSDN Magazine 的特约。