C++

Visual C++ 11 中新的并发功能

Diego Dagum

下载代码示例

最新的 C++ 迭代(称为 C++11,在去年通过了国际标准化组织 (ISO) 的审批)形式化了一组新库和一些保留字以处理并发。 许多开发者以前都在 C++ 中使用过并发功能,但都是通过第三方的库,即,通常直接公开 OS API。

Herb Sutter 在 2004 年 12 月宣告“免费的性能午餐”结束,因为禁止 CPU 制造商通过物理能耗和增加碳排放量来生产更快的 CPU。 由此进入了当前主流的多核时代,一种新的实现,而 C++(标准组件)为适应此类变化取得了重要的飞跃。

本文下面的内容将分成两节,另外还有一些小节。 第一节,从并行执行开始,介绍允许应用程序并行运行独立或半独立活动的技术。 第二节,从同步并发执行开始,探讨同步机制,这些活动通过同步方式处理数据,以避免出现争用情况。

本文基于即将推出的 Visual C++ 版本(现在称为 Visual C++ 11)中包括的功能。 当前版本 (Visual C++ 2010) 中已提供其中部分功能。 尽管本文不提供关于为并行算法建模的指南,也不提供关于所有可用选项的详尽文档,但却全面介绍了新的 C++11 并发功能,内容丰富详实。

并行执行

当您对数据建模和设计算法时,很自然地就会按照具有一定顺序的步骤指定这些建模和设计过程。 只要性能位于可接受的范围内,这就是最值得推荐的方案,因为它通常更易于理解,而这符合维护代码的要求。

当性能成为令人担忧的问题时,为处理这种情况通常都会先尝试优化序列算法以减少使用的 CPU 循环。 这种做法始终可行,直到无法再进行优化或难以优化。 这时就需要将连续的一系列步骤拆分为同时进行的多项活动。

在第一节中,您将了解到以下内容:

  • 异步任务: 一小部分原始算法,仅通过它们生成或使用的数据进行链接。
  • 线程: 运行时环境管理的执行单元。 它们与任务相关,因为任务在某种程度上在线程上运行。
  • 线程内部: 线程绑定变量、线程传播的异常等等

异步任务

在本文随附的代码中,您将找到一个名为“顺序案列”的项目,如图 1 所示。

图 1 顺序案例代码

int a, b, c;
int calculateA()
{
  return a+a*b;
}
int calculateB()
{
  return a*(a+a*(a+1));
}
int calculateC()
{
  return b*(b+1)-b;
}
int main(int argc, char *argv[])
{
  getUserData(); // initializes a and b
  c = calculateA() * (calculateB() + calculateC());
  showResult();
}

主函数向用户请求一些数据,然后将该数据提交给三个函数: calculateA、calculateB 和 calculateC。 稍后将组合这些结果,以便为用户生成一些输出信息。

随附材料中计算函数的编码方式在每个函数中引入了 1 到 3 秒的随机延迟。 由于这些步骤是按顺序执行的,因此只要输入数据,就会产生一个在最糟糕情况下为 9 秒的总体执行时间。 您可以通过按 F5 运行本示例来尝试此代码。

因此,我需要修改执行序列和查找并发执行步骤。 由于这些函数都是独立的,因此我可以使用异步函数并行执行它们:

int main(int argc, char *argv[])

{

  getUserData();

  future<int> f1 = async(calculateB), f2 = async(calculateC);

  c = (calculateA() + f1.get()) * f2.get();

  showResult();

}

在这里我引入了两个概念: async 和 future,在 <future> 标头和 std 命名空间中都有定义。 前者接收函数、lambda 或函数对象(即算符)并返回 future。 您可以将 future 的概念理解为事件结果的占位符。 什么结果? 异步调用函数返回的结果。

在某些时候,我将需要这些并行运行函数的结果。 对每个 future 调用 get 方法会阻止执行,直到值可用。

您可以通过运行随附示例中的 AsyncTasks 项目来测试修改后的代码,并将其与顺序案例进行比较。 经过此修改后最糟情况下的延迟大约为 3 秒,与顺序版本的 9 秒相比有很大进步。

此轻型编程模型将开发者从创建线程的任务中解放出来。 然而,您可以指定线程策略,但这里我不介绍此内容。

线程

前面一节介绍的异步任务模型在某些指定的应用场景中可能已经足够了,但如果您需要进一步处理和控制线程的执行,那么 C++11 还提供了线程类,该类在 <thread> 标头中声明并位于 std 命名空间中。

尽管编程模型更为复杂,但线程可以提供更好的同步和协调方法,以允许它们执行其他线程并等待既定的时间长度,或直到其他线程完成后再继续。

在以下随附代码的“线程”项目中提供的示例中,我让 lambda 函数(为其赋予了整数参数)将其小于 100,000 的倍数显示到控制台:

auto multiple_finder = [](int n) {

  for (int i = 0; i < 100000; i++)

    if (i%n==0)

      cout << i << " is a multiple of " << n << endl;

};

int main(int argc, char *argv[])

{

  thread th(multiple_finder, 23456);

  multiple_finder(34567);

  th.join();

}

正如您将在后面的示例中看到的,我视情况将 lambda 传递给线程;一个函数或算符就已足够。

在主函数中,我使用不同的参数在两个线程中运行此函数。 看一下生成的结果(因为运行时机不同,运行产生的结果也不同):

0 is a multiple of 23456
0 is a multiple of 34567
23456 is a multiple of 23456
34567 is a multiple of 34567
46912 is a multiple of 23456
69134 is a multiple of 34567
70368 is a multiple of 23456
93824 is a multiple of 23456

我可以使用线程实现前面一节中有关异步任务的示例。 为此,我需要引入 promise 的概念。 可以将 promise 理解为一个用于放置可用结果的接收器。 将结果放置在其中后又从哪个位置提取该结果呢? 每个 promise 都有一个关联的 future。

图 2 中显示的、示例代码的 Promise 项目中提供的代码将三个线程(而非任务)与 promise 关联并让每个线程调用 calculate 函数。 将这些细节与轻型 AsyncTasks 版本比较。

图 2 关联 Future 和 Promise

typedef int (*calculate)(void);
void func2promise(calculate f, promise<int> &p)
{
  p.set_value(f());
}
int main(int argc, char *argv[])
{
  getUserData();
  promise<int> p1, p2;
  future<int> f1 = p1.get_future(), f2 = p2.get_future();
  thread t1(&func2promise, calculateB, std::ref(p1)),
    t2(&func2promise, calculateC, std::ref(p2));
  c = (calculateA() + f1.get()) * f2.get();
  t1.join(); t2.join();
  showResult();
}

线程绑定变量和异常

在 C++ 中,您可以定义全局变量,它的范围绑定到整个应用程序,包括线程。 但相对于线程,现在有方法定义这些全局变量,以便每个线程保有自己的副本。 此概念称为线程本地存储,其声明如下:

thread_local int subtotal = 0;

如果声明在函数范围内完成,则只有该函数能够看到变量,但每个线程将继续维护自己的静态副本。 也就是说,每个线程的变量的值在函数调用之间将得到保持。

尽管 thread_local 在 Visual C++ 11 中不可用,但可以使用非标准的 Microsoft 扩展对它进行模拟:

#define  thread_local __declspec(thread)

如果线程内引发异常将会发生什么? 有时候可以在线程内的调用堆栈中捕获和处理异常。 但如果线程不处理异常,则需要采用一种方法将异常传输到发起方线程。 C++11 引入了此类机制。

图 3 中,在随附代码的项目 ThreadInternals 中提供了一个 sum_until_element_with_threshold 函数,该函数遍历矢量直至找到特定元素,而在此过程中它会对所有元素求和。 如果总和超过阈值,则引发异常。

图 3 线程本地存储和线程异常

thread_local unsigned sum_total = 0;
void sum_until_element_with_threshold(unsigned element,
  unsigned threshold, exception_ptr& pExc)
{
  try{
    find_if_not(begin(v), end(v), [=](const unsigned i) -> bool {
      bool ret = (i!=element);
      sum_total+= i;
      if (sum_total>threshold)
        throw runtime_error("Sum exceeded threshold.");
      return ret;
    });
    cout << "(Thread #" << this_thread::get_id() << ") " <<
      "Sum of elements until " << element << " is found: " << sum_total << endl;
  } catch (...) {
    pExc = current_exception();
  }
}

如果发生该情况,将通过 current_exception 将异常捕获到 exception_ptr 中。

主函数对 sum_until_element_with_threshold 触发线程,同时使用其他参数调用该相同的函数。 当两个调用(一个在主线程中,另一个在从主线程触发的线程中)都完成后,将对其相应的 exception_ptrs 进行分析:

const unsigned THRESHOLD = 100000;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  exception_ptr pExc1, pExc2;
  scramble_vector(1000);
  thread th(sum_until_element_with_threshold, 0, THRESHOLD, ref(pExc1));
  sum_until_element_with_threshold(100, THRESHOLD, ref(pExc2));
  th.join();
  dealWithExceptionIfAny(pExc1);
  dealWithExceptionIfAny(pExc2);
}

如果其中任何 exception_ptrs 进行了初始化(即,表明出现某些异常),将使用 rethrow_exception 再次触发它们的异常:

void dealWithExceptionIfAny(exception_ptr pExc)
{
  try
  {
    if (!(pExc==exception_ptr()))
      rethrow_exception(pExc);
    } catch (const exception& exc) {
      cout << "(Main thread) Exception received from thread: " <<
        exc.what() << endl;
  }
}

当第二个线程中的总和超过其阈值时,我们将获得以下执行结果:

(Thread #10164) Sum of elements until 0 is found: 94574
(Main thread) Exception received from thread: Sum exceeded threshold.

同步并发执行

最好能够将所有应用程序拆分为 100% 独立的异步任务组。 但实际上这几乎是不可能的,因为各方并发处理的数据都至少具有一定的依赖关系。 本节介绍可避免发生争用情况的新 C++11 技术。

您将了解到以下信息:

  • 原子类型: 与基元数据类型相似,但允许进行线程安全修改。
  • 互斥和锁定: 允许我们定义线程安全临界区的元素。
  • 条件变量: 一种在满足某条件之前停止执行线程的方法。

原子类型

<atomic> 标头引入了一系列可通过连锁操作实现的基元类型(atomic_char、atomic_int 等等)。 因此,这些类型等同于它们的不带 atomic_ 前缀的同音词,但不同的是这些类型的所有赋值运算符(==、++、--、+=、*= 等等)均不受争用情况的影响。 因此,在为这些数据类型赋值期间,其他线程无法在我们完成赋值操作之前中断并更改值。

在下面的示例中,有两个并行线程(其中一个是主线程)在相同矢量中查找不同的元素:

atomic_uint total_iterations;
vector<unsigned> v;
int main(int argc, char *argv[])
{
  total_iterations = 0;
  scramble_vector(1000);
  thread th(find_element, 0);
  find_element(100);
  th.join();
  cout << total_iterations << " total iterations." << endl;
 }

当找到每个元素后,会显示来自线程内部的消息,告知在矢量(或迭代)中的哪个位置找到了该元素。

void find_element(unsigned element)
{
  unsigned iterations = 0;
  find_if(begin(v), end(v), [=, &iterations](const unsigned i) -> bool {
    ++iterations;
    return (i==element);
  });
  total_iterations+= iterations;
  cout << "Thread #" << this_thread::get_id() << ": found after " <<
    iterations << " iterations." << endl;
}

还有一个常用变量 total_iterations,它使用两个线程都应用的总迭代次数进行更新。 因此,total_iterations 必须为原子以防止两个线程同时对其进行更新。 在前面的示例中,即使您不需要在 find_element 中显示部分数量的迭代,您仍然在该本地变量(而非 total_iterations)中累积迭代,以避免争用原子变量。

您可以在随附代码下载的“原子”项目中找到上述示例。 运行该示例,可获得下面的结果:

Thread #8064: found after 168 iterations.
Thread #6948: found after 395 iterations.
563 total iterations.

互斥和锁定

前面一节介绍了在对基元类型进行写访问时发生互斥的特殊情况。 <mutex> 标头定义了一系列用于定义临界区的可锁定类。 这样,您就可以定义互斥以在一系列函数或方法中建立临界区,在这种情况下,一次只能有一个线程可以通过成功锁定系列互斥来访问此系列中的任何成员。

尝试锁定互斥的线程可以保持阻止状态直到互斥可用,或直接放弃尝试。 在这两种极端的做法之间,还可以使 timed_mutex 类保持阻止状态一段时间,然后再放弃尝试。 允许锁定将尝试停止帮助防止死锁。

锁定的互斥必须明确解锁后,其他线程才能对其进行锁定。 无法解锁可能会导致不确定的应用程序行为,继而容易出错,这与忘记释放动态内存相似。 忘记释放锁定实际上更严重,因为它可能意味着如果其他代码继续等待该锁定,那么应用程序将再也无法正常运行。 幸运的是,C++11 还提供锁定类。 虽然针对互斥执行锁定,但其析构函数确保锁定后还会解锁。

代码下载的“互斥”项目中提供的以下代码定义有关互斥 mx 的临界区:

mutex mx;
void funcA();
void funcB();
int main()
{
  thread th(funcA)
  funcB();
  th.join();
}

此互斥用于保证两个函数(funcA 和 funcB)可以并行运行,而不会在临界区中同时出现。

如果需要,函数 funcA 将等待进入临界区。 为了实现此过程,您只需要最简单的锁定机制,即 lock_guard:

void funcA()
{
  for (int i = 0; i<3; ++i)
  {
    this_thread::sleep_for(chrono::seconds(1));
    cout << this_thread::get_id() << ": locking with wait... "
<< endl;
    lock_guard<mutex> lg(mx);
    ...
// Do something in the critical region.
cout << this_thread::get_id() << ": releasing lock." << endl;
  }
}

这样定义后,funcA 应访问临界区三次。 而函数 funcB 将尝试锁定,但如果互斥到那时已锁定,则 funcB 将等待几秒,然后再次尝试访问临界区。 它使用的机制是 unique_lock,另外还有策略 try_to_lock_t,如图 4 所示。

图 4 锁定与等待

void funcB()
{
  int successful_attempts = 0;
  for (int i = 0; i<5; ++i)
  {
    unique_lock<mutex> ul(mx, try_to_lock_t());
    if (ul)
    {
      ++successful_attempts;
      cout << this_thread::get_id() << ": lock attempt successful." <<
        endl;
      ...
// Do something in the critical region
      cout << this_thread::get_id() << ": releasing lock." << endl;
    } else {
      cout << this_thread::get_id() <<
        ": lock attempt unsuccessful.
Hibernating..." << endl;
      this_thread::sleep_for(chrono::seconds(1));
    }
  }
  cout << this_thread::get_id() << ": " << successful_attempts
    << " successful attempts." << endl;
}

这样定义后,funcB 将最多五次尝试进入临界区。 图 5 显示执行的结果。 五次尝试中,funcB 只能进入临界区两次。

图 5 执行示例项目互斥

funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcA: releasing lock.
funcB: lock attempt successful.
funcA: locking with wait ...
funcB: releasing lock.
funcA: lock secured ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcB: lock attempt unsuccessful.
Hibernating ...
funcA: releasing lock.
funcB: 2 successful attempts.
funcA: locking with wait ...
funcA: lock secured ...
funcA: releasing lock.

条件变量

标头 <condition_variable> 指出了本文最后的内容,这些内容是当线程之间的协调受制于事件时所出现的各种情况的基础。

在代码下载的 CondVar 项目中提供的以下示例中,producer 函数推送队列中的元素:

mutex mq;
condition_variable cv;
queue<int> q;
void producer()
{
  for (int i = 0;i<3;++i)
  {
    ...
// Produce element
    cout << "Producer: element " << i << " queued." << endl;
    mq.lock();      q.push(i);  mq.unlock();
    cv.
notify_all();
  }
}

标准队列不是线程安全的,所以您必须确保排队时没有其他人正在使用它(即,consumer 没有弹出任何元素)。

consumer 函数尝试在可用时从队列中获取元素,或者它只是针对条件变量等待一会儿,然后再重新尝试;在连续两次尝试失败后,consumer 结束(参见图 6)。

图 6 通过条件变量唤醒线程

void consumer()
{
  unique_lock<mutex> l(m);
  int failed_attempts = 0;
  while (true)
  {
    mq.lock();
    if (q.size())
    {
      int elem = q.front();
      q.pop();
      mq.unlock();
      failed_attempts = 0;
      cout << "Consumer: fetching " << elem << " from queue." << endl;
      ...
// Consume elem
    } else {
      mq.unlock();
      if (++failed_attempts>1)
      {
        cout << "Consumer: too many failed attempts -> Exiting." << endl;
        break;
      } else {
        cout << "Consumer: queue not ready -> going to sleep." << endl;
        cv.wait_for(l, chrono::seconds(5));
      }
    }
  }
}

每次新的元素可用时,producer 都会通过 notify_all 唤醒 consumer。 这样,producer 可以在元素准备就绪的情况下避免 consumer 在整个间隔期内都处于睡眠状态。

图 7 显示相关运行的结果。

图 7 使用条件变量进行同步

Consumer: queue not ready -> going to sleep.
Producer: element 0 queued.
Consumer: fetching 0 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 1 queued.
Consumer: fetching 1 from queue.
Consumer: queue not ready -> going to sleep.
Producer: element 2 queued.
Producer: element 3 queued.
Consumer: fetching 2 from queue.
Producer: element 4 queued.
Consumer: fetching 3 from queue.
Consumer: fetching 4 from queue.
Consumer: queue not ready -> going to sleep.
Consumer: two consecutive failed attempts -> Exiting.

整体概览

综上所述,本文展示了 C++11 中所引入机制的概念性全景图,这些机制允许在多核环境为主流的时代中并行执行任务。

异步任务允许轻型编程模型并行化执行。 可以通过关联的 future 检索每项任务的结果。

线程可以提供比任务更多的粒度,尽管它们更大一些,并且所提供的机制可保持静态变量的单独副本并在线程之间传输异常。

在对同一数据执行并行线程时,C++11 可提供资源以避免发生争用情况。 原子类型通过受信任的方式来确保一次只有一个线程修改数据。

互斥可帮助我们定义代码中的临界区,即防止线程同时访问的区域。 锁定可包装互斥,以尝试在前者的生命周期内解锁后者。

最后,条件变量提高了线程同步的效率,因为某些线程可以等待其他线程通知的事件。

本文未介绍配置和使用所有这些功能的全部方法,但读者现在对它们有了一个整体了解,并随时可进行深入了解。

Diego Dagum 是一位具有 20 多年经验的软件开发者。 他目前是 Microsoft Visual C++ 社区的项目经理。

衷心感谢以下技术专家对本文的审阅: David CraveyAlon FliessFabio GaluppoMarc Gregoire