.NET: вопросы и ответы

Упорядоченное выполнение с использованием ThreadPool

Стивен Тауб (Stephen Toub)

ВМногие компоненты моей системы должны выполнять работу асинхронно, что приводит меня к мысли о том, что Microsoft .NET Framework ThreadPool является подходящим решением. Однако, у меня имеется требование, которое, как я полагаю, является уникальным: каждому компоненту необходимо обеспечить упорядоченную обработку своих элементов, и, в результате, никакая пара его элементов не должна выполняться одновременно. Тем не менее, для многих компонентов естественно выполняться параллельно друг с другом; по существу, это крайне желательно. Можете ли что-нибудь порекомендовать?

О Это не такая уникальная трудность, как вы могли бы полагать, поскольку она возникает во множестве важных сценариев, включая те, которые основаны на передаче сообщений. Рассмотрим реализацию конвейера, который получает преимущества параллелизма, имея одновременно несколько активных стадий.

Например, можно было бы иметь конвейер, который читает данные из файла, сжимает их, шифрует и записывает в новый файл. Сжатие можно выполнять одновременно с шифрованием, но не для одних и тех же данных в один и тот же момент времени, поскольку выходные данные одной обработки являются входными для другой. Предпочтительнее, чтобы процедура сжатия сжимала часть данных и отправляла их для обработки процедуре шифрования, а в этот момент процедура сжатия работала с другой частью данных.

Поскольку многие алгоритмы сжатия и шифрования поддерживают состояние, оказывающее влияние на способ, которым сжимаются и шифруются будущие данные, важно соблюдать определенный порядок. (Не говоря о том, что в этом примере речь идет о файлах, и было бы хорошо, если бы вы могли расшифровывать и распаковывать результаты, чтобы получать исходные данные в правильном порядке.)

У этой задачи есть несколько возможных решений. Первое решение заключается просто в выделении потока каждому компоненту. В рамках этого решения выполняемые рабочие элементы помещаются в очередь FIFO (first-in-first-out — «первым вошел – первым вышел»). Кроме того, есть один поток, обслуживающий эту очередь. Когда у компонента есть работа, которую требуется выполнить, он сбрасывает эту работу в очередь. Со временем поток доберется до этой работы и выполнит ее. Поскольку имеется только один поток, в каждый конкретный момент времени будет выполняться только один элемент. И, поскольку используется очередь FIFO, рабочие элементы будут обрабатываться в том порядке, в котором они создавались.

Так же, как в примере, приведенном в статье рубрики «NET: вопросы и ответы» за январь 2009 г., для представления работы, которую требуется выполнить, я буду использовать простой класс WorkItem, показанный на рис. 1. Реализация DedicatedThread, использующая этот тип WorkItem, показана на рис. 2. Основная часть реализации представляет собой упрощенную реализацию BlockingQueue<T> (в .NET Framework 4.0 входит тип BlockingCollection<T>, который больше подошел бы для реализации такого рода). Конструктор потока DedicatedThread создает экземпляр BlockingQueue<T>, затем запускает поток, который постоянно ожидает прибытия следующего элемента из очереди и выполняет его.

Рис. 1. Запись рабочего элемента

internal class WorkItem {
  public WaitCallback Callback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = s => {
    var item = (WorkItem)s;
    item.Callback(item.State);
 };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else Callback(State);
  }
}

Рис. 2. Реализация DedicatedThread

public class DedicatedThread {
  private BlockingQueue<WorkItem> _workItems = 
    new BlockingQueue<WorkItem>();

  public DedicatedThread() {
    new Thread(() => {
      while (true) { workItems.Dequeue().Execute(); }
    }) { IsBackground = true }.Start();
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _workItems.Enqueue(new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() });
  }

  private class BlockingQueue<T> {
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _gate = new Semaphore(0, Int32.MaxValue);

    public void Enqueue(T item) {
      lock (_queue) _queue.Enqueue(item);
      _gate.Release();
    }

    public T Dequeue() {
      _gate.WaitOne();
      lock (_queue) return _queue.Dequeue();
    }
  }
}

Это обеспечивает основные функциональные возможности для вашего случая и может удовлетворить ваши потребности, но здесь имеются некоторые существенные недостатки. Во-первых, для каждого компонента резервируется по потоку. При наличии одного или двух компонентов это может не вызвать осложнений. Но при большом числе компонентов это может привести к значительному росту числа потоков. Что может привести к снижению производительности.

Кроме этого, данная конкретная реализация не слишком надежна. Например, что произойдет, если вам потребуется полностью уничтожить некоторый компонент — как сообщить потоку о необходимости прекратить блокировку? А что случится, если в рабочем элементе возникнет исключение?

В качестве отступления интересно отметить, что это решение подобно тому, что применяет Windows в обычном цикле обработки сообщений. Цикл обработки сообщений ожидает прибытия сообщений, обрабатывает их, затем возвращается в начало и ждет следующих сообщений. Сообщения для конкретного окна обрабатываются одним потоком. Сходство демонстрируется кодом на рис. 3, который должен продемонстрировать поведение, в большой степени похожее на поведение кода из рис. 2. Запускается новый поток, который создает Control, обеспечивает инициализацию его дескриптора и использует Application.Run для выполнения цикла обработки сообщений. Для помещения рабочего элемента в очередь используется метод BeginInvoke класса Control. Отмечу, что я не рекомендую этот подход, а скорее просто обращаю внимание на то, что на высоком уровне это та же концепция, как в уже показанном решении DedicatedThread.

Рис. 3 Сходство с циклом обработки сообщений пользовательского интерфейса

public class WindowsFormsDedicatedThread {
  private Control _control;

  public WindowsFormsDedicatedThread() {
    using (var mre = new ManualResetEvent(false)) {
      new Thread(() => {
        _control = new Control();
        var forceHandleCreation = _control.Handle;
        mre.Set();
        Application.Run();
      }) { IsBackground = true }.Start();
      mre.WaitOne();
    }
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _control.BeginInvoke(callback, state);
  }
} 

Во втором решении для выполнения используется ThreadPool. Вместо того, чтобы запускать новый пользовательский поток для каждого компонента, который обслуживал бы личную очередь, мы оставим по одной очереди для каждого компонента, чтобы никакие два элемента из одной очереди не обслуживались одновременно. Преимущество этого подхода в том, что ThreadPool сам управляет необходимым количеством потоков, обрабатывает пуск потоков и завершение их работы, а также проблемы, связанные с надежностью. Самостоятельный же запуск новых потоков редко оказывается разумным.

Реализация этого решения показана на рис. 4. Класс FifoExecution поддерживает всего два поля: очередь рабочих элементов, которые требуется обработать, и логическое значение, указывающее, выдавался ли запрос к ThreadPool на обработку рабочих элементов. Оба эти поля защищены блокировкой в списке рабочих элементов. Оставшаяся часть реализации состоит всего из двух методов.

Рис. 4. Реализация класса FifoExecution

public class FifoExecution {
  private Queue<WorkItem> _workItems = new Queue<WorkItem>();
  private bool _delegateQueuedOrRunning = false;

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    var item = new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() };
    lock (_workItems) {
      _workItems.Enqueue(item);
      if (!_delegateQueuedOrRunning) {
        _delegateQueuedOrRunning = true;
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
      }
    }
  }

  private void ProcessQueuedItems(object ignored) {
    while (true) {
      WorkItem item;
      lock (_workItems) {
        if (_workItems.Count == 0) {
          _delegateQueuedOrRunning = false;
          break;
        }
        item = _workItems.Dequeue();
      }
      try { item.Execute(); }
      catch {
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
          null);
        throw;
      }
    }
  }
}

Первый метод — QueueUserWorkItem, сигнатура которого соответствует сигнатуре, предоставляемой классом ThreadPool (кроме этого, у ThreadPool есть удобная перегрузка, которая как раз принимает WaitCallback. Ее можно добавить по своему выбору). Сначала метод создает сохраняемый WorkItem, затем устанавливает блокировку. (При создании WorkItem не осуществляется доступ ни к какому общему состоянию. Т.е., для того чтобы блокировка присутствовала как можно меньшее время, захват данного элемента выполняется до установки блокировки.) После установки блокировки созданный рабочий элемент помещается в очередь рабочих элементов.

Затем метод проверяет, выдавался ли запрос к ThreadPool на обработку находящихся в очереди рабочих элементов, и, если запроса не было, выдает такой запрос (и делает о нем заметку на будущее). Это запрос к ThreadPool на использование одного из потоков ThreadPool для выполнения метода ProcessQueuedItems.

Будучи вызванным потоком ThreadPool, ProcessQueuedItems входит в цикл. В этом цикле он устанавливает блокировку и, сохраняя эту блокировку, проверяет наличие рабочих элементов, требующих обработки. Если таких элементов нет, он сбрасывает флаг запроса (чтобы будущие элементы из очереди снова запрашивали обработку из пула) и завершает работу. Если имеются рабочие элементы, требующие обработки, он извлекает следующий элемент, снимает блокировку, выполняет обработку и запускает все снова, работая до тех пор, пока не будут исчерпаны элементы в очереди.

Это простая, но мощная реализация. Теперь компонент может создавать новый экземпляр FifoExecution и использовать его для планирования рабочих элементов. Для каждого экземпляра FifoExecution в каждый момент времени будет выполняться только один рабочий элемент из очереди, а помещенные в очередь элементы будут выполняться в порядке их попадания в нее. Кроме этого, рабочие элементы из различных экземпляров FifoExecution могут выполняться параллельно. Лучше всего то, что от управления потоками вы теперь освобождаетесь, а заниматься этой тяжелой (но очень важной) работой будет класс ThreadPool.

В исключительных случаях, когда каждый компонент поддерживает пул в состоянии насыщения работой, ThreadPool, вероятно, дойдет до использования отдельного потока для каждого компонента, точно так, как в исходной реализации DedicatedThread. Но это случится только в том случае, если это сочтет разумным класс ThreadPool. Если компоненты не держат пул в перегруженном состоянии, потребуется намного меньше потоков.

Существуют дополнительные преимущества, например предоставление классу ThreadPool возможности выполнять правильные действия в отношении исключений. Что происходит в реализации DedicatedThread, если обработка элемента приводит к исключению? Поток разрушится, но, в зависимости от настройки приложения, процесс может сохраниться. В этом случае рабочие элементы будут выстраиваться в очередь к DedicatedThread, но ни один из них никогда не будет обработан. В случае FifoExecution класс ThreadPool просто станет добавлять потоки, чтобы компенсировать те, которые исчезли.

На рис. 5 показано простое демонстрационное приложение, использующее класс FifoExecution. У этого приложения имеются три стадии обработки в конвейере. На каждой стадии записывается идентификатор текущего фрагмента данных, с которым идет работа (что является просто итерацией цикла). Затем выполняется некоторая работа (представленная здесь методом Thread.SpinWait) и данные передаются (опять же, просто итерация цикла) дальше к следующей ступени. На каждом этапе выводится соответствующая информация с разным числом вкладок для облегчения просмотра распределенных по ним результатов. Как можно наблюдать в результатах, показанных на рис. 6, на каждой стадии (в каждом столбец) поддерживается требуемый порядок выполнения работы.

Рис. 5. Демонстрация класса FifoExecution

static void Main(string[] args) {
  var stage1 = new FifoExecution();
  var stage2 = new FifoExecution();
  var stage3 = new FifoExecution();

  for (int i = 0; i < 100; i++) {
    stage1.QueueUserWorkItem(one => {
      Console.WriteLine("" + one);
      Thread.SpinWait(100000000);

      stage2.QueueUserWorkItem(two => {
        Console.WriteLine("\t\t" + two);
        Thread.SpinWait(100000000);

        stage3.QueueUserWorkItem(three => {
          Console.WriteLine("\t\t\t\t" + three);
          Thread.SpinWait(100000000);
        }, two);
      }, one);
    }, i);
  }

   Console.ReadLine();
}

fig06.gif

Рис. 6. Выходные данные демонстрационного приложения

Интересно также отметить, что между стадиями конвейера нет равноправия. Например, видно, что стадия stage1 на рис. 6 уже находится на 21-ой итерации, в то время как stage2 все еще на 13-ой, а stage3 — на 9-ой. Это, в основном, определяется моей реализацией метода ProcessQueuedItems. В примере приложения 100 рабочих элементов очень быстро попадают на стадию stage1, и поэтому поток из пула, обслуживающего stage1, с большой вероятностью будет находиться в цикле метода ProcessQueuedItems и не будет возвращаться до тех пор, пока не исчерпается работа ступени stage1. Это создает неравноправный сдвиг по отношению к другим стадиям. Если вы наблюдаете подобное поведение в своем приложении, и это создает проблемы, можно повысить равноправие стадий, изменив реализацию метода ProcessQueuedItems на другую, в большей степени похожую на следующую.

private void ProcessQueuedItems(object ignored) {
  WorkItem item;
  lock (_workItems) {
    if (_workItems.Count == 0) {
      _delegateQueuedOrRunning = false;
      return;
    }
    item = _workItems.Dequeue();
  }
  try { item.Execute(); }
  finally {
    ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
      null);
  }
}

Теперь, даже если еще есть элементы, которые следует обработать, метод ProcessQueuedItems не будет выполнять цикл, а вместо этого рекурсивным образом поместит себя в очередь к ThreadPool, сообщив тем самым себе приоритет меньший, чем для элементов других стадий. При таком изменении выходные данные приложения из рис. 5 выглядят теперь так, как показано на рис. 7. Из этих новых выходных данных видно, что планирование для стадий stage2 и stage3 действительно стало более справедливым, чем раньше (между стадиями все еще сохраняется сдвиг, но в случае конвейера этого следует ожидать).

fig07.gif

Рис. 7 Новые выходные данные при более равноправном планировании

Безусловно, такое повышение равноправия потребует определенных издержек. Каждый рабочий элемент теперь влечет дополнительный проход через модуль планировщика, что несколько увеличивает затраты. Вам придется принять решение относительно того, можете ли вы позволить себе такую уступку для приложения; например, если в ваших рабочих элементах выполняется значительный объем работы, то такие издержки должны быть пренебрежимо малы, и на них можно не обращать внимания.

Это еще один пример того, как можно создавать системы на основе ThreadPool, добавляющего функциональные возможности, без необходимости самостоятельно создавать пулы пользовательских потоков. Другие примеры см. в предыдущих выпусках рубрики «NET: вопросы и ответы» журнала MSDN Magazine.

Вопросы и комментарии направляйте по адресу netqa@microsoft.com.

Стивен Тауб (Stephen Toub) — старший руководитель программы в рабочей группе платформы параллельных вычислений Майкрософт. Он также является пишущим редактором журнала MSDN Magazine.