Windows и C++

Среда пула потоков

Кенни Керр

Кенни КеррОбъекты, образующие API пула потоков Windows, можно разделить на два лагеря. В первом находятся те, которые представляют работу, таймеры, ввод-вывод, и ожидаемые объекты (waitable objects). Все они потенциально приводят к тому, что в пуле потоков выполняются обратные вызовы. Я уже рассказывал в прошлой статье об объектах работы, а остальные объекты опишу в последующих статьях. Второй лагерь состоит из объектов, управляющих средой, в которой выполняются эти обратные вызовы. Это и будет в центре внимание данной статьи.

Среда пула потоков влияет на то, будут ли обратные вызовы выполняться в пуле по умолчанию или в вашем специфическом пуле, следует ли присваивать приоритеты обратным вызова и т. д. Возможность управления этой средой становится еще важнее, когда вы выходите за рамки набора объектов работы или обратных вызовов. Эта среда также позволяет упростить координацию отмены и освобождения этих объектов (об этом мы поговорим в следующей статье).

Среда пула потоков не является объектом в том же смысле, что и остальные объекты, образующие API пула потоков. Для большей эффективности она объявляется просто как структура, поэтому вы можете напрямую выделять память под нее в своем приложении. Однако вы должны интерпретировать ее так же, как и другие объекты, и не полагаться на знание ее внутреннего устройства, а обращаться к ней только через набор открытых API-функций. Эта структура называется TP_CALLBACK_ENVIRON, и, внимательно посмотрев на нее, вы сразу же заметите, что она уже изменилась со времени своего первого появления в Windows Vista. Это еще одно напоминание о важности использования API-функций. Сами функции просто манипулируют с этой структурой, но защищают вас от любых изменений. Они объявляются подставляемыми, чтобы компилятор мог максимально оптимизировать их, так что пусть вас не терзает соблазн что-то подкрутить.

Функция InitializeThreadpoolEnvironment инициализирует структуру значениями по умолчанию. Функция DestroyThreadpoolEnvironment освобождает любые ресурсы, используемые средой. На момент написания этой статьи она ничего не делала. Но в будущем ситуация может измениться. Поскольку это подставляемая функция (inline function), вреда от ее вызова нет — при компиляции она просто «растворится». На рис. 1 показан класс — оболочка этих функций.

Рис. 1. Обертывание функции InitializeThreadpoolEnvironment

class environment
{
  environment(environment const &);
  environment & operator=(environment const &);

  TP_CALLBACK_ENVIRON m_value;

public:
  environment() throw()
  {
    InitializeThreadpoolEnvironment(&m_value);
  }

  ~environment() throw()
  {
    DestroyThreadpoolEnvironment(&m_value);
  }

  PTP_CALLBACK_ENVIRON get() throw()
  {
    return &m_value;
  }
};

Знакомая функция-член get предоставляется для унификации с шаблоном класса unique_handle, который я представил в первой статье из этой серии (https://msdn.microsoft.com/magazine/hh288076). Дотошные читатели, вероятно, помнят, что у функций CreateThreadpoolWork и TrySubmitThreadpoolCallback (о них шла речь в прошлой статье) есть еще один параметр, о котором я не упомянул. Я просто передавал null в каждом случае. Этот параметр на самом деле является указателем на environment, и именно через него вы сопоставляете различные объекты работы с environment:

environment e;
work w(CreateThreadpoolWork(callback, nullptr, e.get()));
check_bool(w);

Какая от этого польза? Ну, особой пользы нет — пока вам не потребуется специфическая среда.

Закрытые пулы

Без ваших указаний среда будет направлять обратные вызовы для обработки в пул потоков по умолчанию. Это тот самый пул, который и обрабатывал бы обратные вызовы, если бы вы не стали сопоставлять работу со средой. Любой код, выполняемый в процессе, может использовать этот пул. Учтите, что средний процесс явно или неявно загружается десятки DLL. Очевидно, что это может серьезно повлиять на производительность. И не обязательно в худшую сторону. Разделение пула потоков между различными подсистемами в процессе зачастую повышает производительность за счет эффективного совместного использования ограниченного количества физических процессоров в компьютере.

Альтернатива такова: каждая подсистема создает собственный пул потоков, и все они конкурируют за процессорное время при гораздо меньшей координации. С другой стороны, если конкретная подсистема чрезмерно использует пул потоков по умолчанию, вы можете защититься от этого, задействовав для нее закрытый пул. Например, эта подсистема может ставить в очередь длительно выполняемые обратные вызовы или так много обратных вызовов, что время ответа становится неприемлемым. Кроме того, у вас могут быть специфические требования, накладывающие определенные ограничения на количество потоков в пуле. Вот здесь и пригодится свой объект пула (pool object).

Функция CreateThreadpool создает закрытый объект пула, полностью независимый от пула потоков по умолчанию. Если функция завершается успешно, она возвращает непрозрачный указатель, представляющий объект пула. В ином случае возвращается null и более подробная информация предоставляется через функцию GetLastError. Функция CloseThreadpool, которой передан объект пула, сообщает системе, что данный объект может быть освобожден. И вновь шаблон класса unique_handle, с которым я ознакомил вас в первой статье из этой серии, берет на себя все детали, используя класс traits, специфичный для пула:

struct pool_traits
{
  static PTP_POOL invalid() throw()
  {
    return nullptr;
  }
  static void close(PTP_POOL value) throw()
  {
    CloseThreadpool(value);
   }
};

typedef unique_handle<PTP_POOL, pool_traits> pool;

Теперь я могу использовать удобный typedef и создавать объект пула так:

pool p(CreateThreadpool(nullptr));
check_bool(p);

На этот раз я ничего не скрываю. В данном случае этот параметр зарезервирован для будущего использования и должен быть задан как значение null-указателя. Подставляемая функция SetThreadpoolCallbackPool обновляет среду, указывая, какому пулу следует направить обратные вызовы:

SetThreadpoolCallbackPool(e.get(), p.get());

В этом случае объекты работы и любые другие объекты, созданные с помощью этой среды, будут сопоставлены с заданным пулом. Вы могли бы даже создать несколько сред, каждую со своим пулом, чтобы изолировать разные части вашего приложения. Просто будьте аккуратны в балансировке параллельной обработки между пулами, чтобы не планировать чрезмерно много потоков.

Как я намекнул ранее, также можно задавать минимальное и максимальное количества потоков в вашем пуле. Управление пулом потоков по умолчанию в таком стиле не разрешается, так как может повлиять на другие подсистемы и вызвать любые проблемы совместимости. Например, я мог бы создать пул ровно с одним потоком для обработки API с привязкой к конкретному потоку (thread affinity) и еще один пул для завершения ввода-вывода и других релевантных обратных вызовов безо всяких ограничений, позволив системе динамически регулировать количество потоков. Вот как я создал бы пул ровно с одним постоянным потоком:

check_bool(SetThreadpoolThreadMinimum(p.get(), 1));
SetThreadpoolThreadMaximum(p.get(), 1);

Заметьте, что задание минимума может оказаться неудачным, а указание максимума — нет. По умолчанию минимум равен нулю, и, если ему присваивается любое другое число, операция может закончиться неудачей, так как функция реально пытается создать запрошенное число потоков.

Назначение приоритетов обратным вызовам

Еще одна функциональность, которая становится доступной благодаря среде пула потоков, — возможность назначать приоритеты обратным вызовам. Это поддерживается только в Windows 7. Учитывайте это, если вы все еще ориентируетесь на Windows Vista. Обратный вызов с заданным приоритетом (prioritized callback) гарантированно выполняется до любых других обратных вызовов с меньшим приоритетом. Это никак не влияет на приоритеты потоков, а потому не приводит к выполнению обратных вызовов с вытеснением. Приоритеты обратных вызовов влияют лишь на порядок их выполнения.

Существует три уровня приоритета: низкий, обычный и высокий. Приоритет среды задается через функцию SetThreadpoolCallbackPriority:

SetThreadpoolCallbackPriority(e.get(), TP_CALLBACK_PRIORITY_HIGH);

И вновь любые объекты работы и прочие объекты, созданные с помощью этой среды, получат соответствующие приоритеты для своих обратных вызовов.

Последовательный пул

В прошлой статье я приводил пример класса functional_pool, чтобы продемонстрировать различные функции, связанные с объектами работы. На этот раз я намерен показать, как создать простой последовательный пул с назначением приоритетов, использующий все пояснявшиеся в этой статье функции, которые имеют дело со средой пула потоков. Под «последовательным» я имею в виду пул, управляющий ровно одним постоянным потоком. А вводя в него приоритеты, я просто хочу поддерживать передачу функций с обычным или высоким приоритетом. Начать определение класса serial_pool можно, как показано на рис. 2.

рис. 2. Определение класса serial_pool

class serial_pool
{
  typedef concurrent_queue<function<void()>> queue;

  pool m_pool;
  queue m_queue, m_queue_high;
  work m_work, m_work_high;

  static void CALLBACK callback(
    PTP_CALLBACK_INSTANCE, void * context, PTP_WORK)
  {
    auto q = static_cast<queue *>(context);

    function<void()> function;
    q->try_pop(function);

    function();
  }

В отличие от класса functional_pool класс serial_pool действительно управляет объектом пула. Ему также нужны раздельные очереди и объекты работы для обычного и высокого приоритета. Объекты работы можно создавать с разными значениями контекста, указывающими на соответствующую очередь, а затем просто повторно использовать закрытую функцию обратного вызова. Это позволяет избежать любого ветвления в период выполнения. Обратный вызов по-прежнему выталкивает одну функцию из очереди и вызывает ее. Однако конструктор serial_pool (рис. 3) выполняет дополнительную работу.

рис. 3 Конструктор serial_pool

public:
  serial_pool() :
    m_pool(CreateThreadpool(nullptr))
  {
    check_bool(m_pool);
    check_bool(SetThreadpoolThreadMinimum(m_pool.get(), 1));
    SetThreadpoolThreadMaximum(m_pool.get(), 1);

    environment e;
    SetThreadpoolCallbackPool(e.get(), m_pool.get());

    SetThreadpoolCallbackPriority(e.get(),
      TP_CALLBACK_PRIORITY_NORMAL);
    check_bool(m_work.reset(CreateThreadpoolWork(
      callback, &m_queue, e.get())));

    SetThreadpoolCallbackPriority(e.get(),
      TP_CALLBACK_PRIORITY_HIGH);
    check_bool(m_work_high.reset(CreateThreadpoolWork(
      callback, &m_queue_high, e.get())));
  }

Первым делом создается закрытый пул и задаются лимиты его параллелизма, чтобы обеспечить последовательное выполнение любых обратных вызовов. Далее создается среда и указывается пул для приема последующих объектов. Наконец, создаются объекты работы, настраивается приоритет среды для задания соответствующих приоритетов объектам работы и устанавливается связь с закрытым пулом, который они совместно используют. Хотя пул и объекты работы нужно поддерживать в течение жизненного цикла объекта serial_pool, среда создается в стеке, так как она нужно лишь для установления связей между «заинтересованными сторонами».

Деструктор теперь должен ждать завершения всех объектов работы, чтобы гарантировать, что после уничтожения объекта serial_pool не будет попыток выполнения обратных вызовов:

~serial_pool()
{
  WaitForThreadpoolWorkCallbacks(m_work.get(), true);
  WaitForThreadpoolWorkCallbacks(
    m_work_high.get(), true);
}

Наконец, нужны две функции submit для отправки в очередь функций с обычным или высоким приоритетом:

template <typename Function>
void submit(Function const & function)
{
  m_queue.push(function);
  SubmitThreadpoolWork(m_work.get());
}

template <typename Function>
void submit_high(Function const & function)
{
  m_queue_high.push(function);
  SubmitThreadpoolWork(m_work_high.get());
}

В конечном счете все сводится к тому, как были созданы объекты работы, в частности какая информация о нужной среде пула потоков была предоставлена. На рис. 4 показан пример, из которого ясно видно, как работает последовательный пул с поддержкой приоритетов.

Рис. 4. Последовательный пул с поддержкой приоритетов в действии

int main()
{
  serial_pool pool;

  for (int i = 0; i < 10; ++i)
  {
    pool.submit([]
    {
      printf("normal: %d\n", GetCurrentThreadId());
    });

    pool.submit_high([]
    {
      printf("high: %d\n", GetCurrentThreadId());
    });
  }
  getch();
}

В примере на рис. 4 возможно, что один обратный вызов с обычным приоритетом будет выполнен первым — в зависимости от того, насколько быстро реагирует система, — потому что он передан первым. В дальнейшем должны выполняться все обратные вызовы с высоким приоритетом, а потом оставшиеся обратные вызовы с обычным приоритетом. Вы можете поэкспериментировать, добавляя вызовы Sleep и повышая уровень параллелизма, чтобы увидеть, как пул потоков изменяет свое поведение согласно вашим спецификациям.

Присоединяйтесь ко мне в следующем месяце — мы исследуем крайне важные возможности отмены и очистки, предоставляемые API пула потоков Windows.


Кенни Керр (Kenny Kerr)  - высококвалифицированный специалист в области разработки ПО для Windows. С ним можно связаться через kennykerr.ca.

Выражаю благодарность за рецензирование статьи экспертам Стивену Т. Лававей (Stephan T. Lavavej).