如何:使用计划组影响执行顺序

在并发运行时,计划任务的顺序是不确定的。 但是,可以使用计划策略来影响任务运行的顺序。 本主题说明如何将计划组与 concurrency::SchedulingProtocol 计划程序策略结合使用,以影响任务运行的顺序。

此示例运行一组任务两次,每次都使用不同的计划策略。 这两种策略都将处理资源的最大数量限制为 2。 第一次运行使用默认的 EnhanceScheduleGroupLocality 策略,第二次运行使用 EnhanceForwardProgress 策略。 在 EnhanceScheduleGroupLocality 策略中,计划程序在一个计划组中运行所有任务,直到每个任务完成或生成结果。 在 EnhanceForwardProgress 策略中,计划程序在一个任务完成或生成结果后,以轮循机制移至下一个计划组。

当每个计划组包含相关任务时,EnhanceScheduleGroupLocality 策略通常会提高性能,因为在任务之间保留了缓存区域。 EnhanceForwardProgress 策略使任务能够向前推进,当你需要跨计划组公平计划时非常有用。

示例

此示例定义派生自 concurrency::agentwork_yield_agent 类。 work_yield_agent 类执行一个工作单元,生成当前上下文,然后执行另一个工作单元。 代理使用 concurrency::wait 函数协同生成当前上下文,以便其他上下文可以运行。

此示例创建四个 work_yield_agent 对象。 为了说明如何设置计划程序策略以影响代理运行的顺序,此示例将前两个代理与一个计划组相关联,将另外两个代理与另一个计划组相关联。 此示例使用 concurrency::CurrentScheduler::CreateScheduleGroup 方法创建 concurrency::ScheduleGroup 对象。 此示例运行所有四个代理两次,每次都使用不同的计划策略。

// scheduling-protocol.cpp
// compile with: /EHsc
#include <agents.h>
#include <vector>
#include <algorithm>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

#pragma optimize( "", off )
// Simulates work by performing a long spin loop.
void spin_loop()
{
   for (int i = 0; i < 500000000; ++i)
   {
   }
}
#pragma optimize( "", on )

// Agent that performs some work and then yields the current context.
class work_yield_agent : public agent
{
public:
   explicit work_yield_agent(
      unsigned int group_number, unsigned int task_number)
      : _group_number(group_number)
      , _task_number(task_number)
   {
   }

   explicit work_yield_agent(Scheduler& scheduler,
      unsigned int group_number, unsigned int task_number)
      : agent(scheduler)
      , _group_number(group_number)
      , _task_number(task_number)
   {
   }

   explicit work_yield_agent(ScheduleGroup& group,
      unsigned int group_number, unsigned int task_number)
      : agent(group)       
      , _group_number(group_number)
      , _task_number(task_number)
   {
   }
   
protected:
   // Performs the work of the agent.   
   void run()
   {
      wstringstream header, ss;

      // Create a string that is prepended to each message.
      header << L"group " << _group_number 
             << L",task " << _task_number << L": ";

      // Perform work.
      ss << header.str() << L"first loop..." << endl;
      wcout << ss.str();
      spin_loop();

      // Cooperatively yield the current context. 
      // The task scheduler will then run all blocked contexts.
      ss = wstringstream();
      ss << header.str() << L"waiting..." << endl;
      wcout << ss.str();
      concurrency::wait(0);

      // Perform more work.
      ss = wstringstream();
      ss << header.str() << L"second loop..." << endl;
      wcout << ss.str();
      spin_loop();

      // Print a final message and then set the agent to the 
      // finished state.
      ss = wstringstream();
      ss << header.str() << L"finished..." << endl;
      wcout << ss.str();

      done();
   }  

private:
   // The group number that the agent belongs to.
   unsigned int _group_number;
   // A task number that is associated with the agent.
   unsigned int _task_number;
};

// Creates and runs several groups of agents. Each group of agents is associated 
// with a different schedule group.
void run_agents()
{
   // The number of schedule groups to create.
   const unsigned int group_count = 2;
   // The number of agent to create per schedule group.
   const unsigned int tasks_per_group = 2;

   // A collection of schedule groups.
   vector<ScheduleGroup*> groups;
   // A collection of agents.
   vector<agent*> agents;

   // Create a series of schedule groups. 
   for (unsigned int group = 0; group < group_count; ++group)
   {
      groups.push_back(CurrentScheduler::CreateScheduleGroup());

      // For each schedule group, create a series of agents.
      for (unsigned int task = 0; task < tasks_per_group; ++task)
      {
         // Add an agent to the collection. Pass the current schedule 
         // group to the work_yield_agent constructor to schedule the agent
         // in this group.
         agents.push_back(new work_yield_agent(*groups.back(), group, task));
      }
   }

   // Start each agent.
   for_each(begin(agents), end(agents), [](agent* a) {
      a->start();
   });

   // Wait for all agents to finsih.
   agent::wait_for_all(agents.size(), &agents[0]);

   // Free the memory that was allocated for each agent.
   for_each(begin(agents), end(agents), [](agent* a) {
      delete a;
   });

   // Release each schedule group.
   for_each(begin(groups), end(groups), [](ScheduleGroup* group) {
      group->Release();
   });
}

int wmain()
{
   // Run the agents two times. Each run uses a scheduler
   // policy that limits the maximum number of processing resources to two.

   // The first run uses the EnhanceScheduleGroupLocality 
   // scheduling protocol. 
   wcout << L"Using EnhanceScheduleGroupLocality..." << endl;
   CurrentScheduler::Create(SchedulerPolicy(3, 
      MinConcurrency, 1,
      MaxConcurrency, 2,
      SchedulingProtocol, EnhanceScheduleGroupLocality));

   run_agents();
   CurrentScheduler::Detach();

   wcout << endl << endl;

   // The second run uses the EnhanceForwardProgress 
   // scheduling protocol. 
   wcout << L"Using EnhanceForwardProgress..." << endl;
   CurrentScheduler::Create(SchedulerPolicy(3, 
      MinConcurrency, 1,
      MaxConcurrency, 2,
      SchedulingProtocol, EnhanceForwardProgress));

   run_agents();
   CurrentScheduler::Detach();
}

本示例生成以下输出。

Using EnhanceScheduleGroupLocality...
group 0,
    task 0: first loop...
group 0,
    task 1: first loop...
group 0,
    task 0: waiting...
group 1,
    task 0: first loop...
group 0,
    task 1: waiting...
group 1,
    task 1: first loop...
group 1,
    task 0: waiting...
group 0,
    task 0: second loop...
group 1,
    task 1: waiting...
group 0,
    task 1: second loop...
group 0,
    task 0: finished...
group 1,
    task 0: second loop...
group 0,
    task 1: finished...
group 1,
    task 1: second loop...
group 1,
    task 0: finished...
group 1,
    task 1: finished...

Using EnhanceForwardProgress...
group 0,
    task 0: first loop...
group 1,
    task 0: first loop...
group 0,
    task 0: waiting...
group 0,
    task 1: first loop...
group 1,
    task 0: waiting...
group 1,
    task 1: first loop...
group 0,
    task 1: waiting...
group 0,
    task 0: second loop...
group 1,
    task 1: waiting...
group 1,
    task 0: second loop...
group 0,
    task 0: finished...
group 0,
    task 1: second loop...
group 1,
    task 0: finished...
group 1,
    task 1: second loop...
group 0,
    task 1: finished...
group 1,
    task 1: finished...

这两种策略都生成相同的事件序列。 但是,使用 EnhanceScheduleGroupLocality 的策略会先启动属于第一个计划组的两个代理,然后再启动属于第二个组的代理。 使用 EnhanceForwardProgress 的策略先启动第一个组中的一个代理,然后启动第二个组中的第一个代理。

编译代码

复制示例代码,并将它粘贴到 Visual Studio 项目中,或粘贴到名为 scheduling-protocol.cpp 的文件中,再在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc scheduling-protocol.cpp

另请参阅

计划组
异步代理