并行算法

并行模式库 (PPL) 提供了对数据集合并行地执行工作的算法。 这些算法类似于 C++ 标准库提供的算法。

并行算法由并发运行时中的现有功能组成。 例如,concurrency::parallel_for 算法使用 concurrency::structured_task_group 对象执行并行循环迭代。 根据可用的计算资源数,parallel_for 算法以最佳的方式对工作进行分区。

章节

parallel_for 算法

concurrency::parallel_for 算法重复地以并行方式执行相同的任务。 其中的每个任务都由迭代值进行参数化。 如果循环体不在该循环的迭代之间共享资源,则此算法非常有用。

parallel_for 算法以最佳方式对任务进行分区,以实现并行执行。 当工作负载不平衡时,此算法还会使用工作窃取算法和范围窃取来平衡这些分区。 当一个循环迭代协同阻塞时,运行时会将分配给当前线程的迭代范围重新分配给其他线程或处理器。 同样地,当一个线程完成一个迭代范围时,运行时会将其他线程的工作重新分配给该线程。 parallel_for 算法还支持嵌套并行。 当一个并行循环包含另一个并行循环时,运行时会以一种有效的方式协调处理循环主体之间的资源,以实现并行执行。

parallel_for 算法有多个重载版本。 第一个版本采用一个起始值、一个结束值和一个工作函数(Lambda 表达式、函数对象或函数指针)。 第二个版本采用一个起始值、一个结束值、一个步长值和一个工作函数。 此函数的第一个版本使用 1 作为步长值。 其余版本采用分区程序对象,使您能够指定 parallel_for 如何在线程之间对范围进行分区。 本文档的分区工作一节中更详细地介绍了分区程序。

可以转换多个 for 循环以使用 parallel_for。 但是,parallel_for 算法在以下方面与 for 语句不同:

  • parallel_for 算法 parallel_for 不会按预先确定的顺序执行任务。

  • parallel_for 算法不支持任意终止条件。 当迭代变量的当前值小于 last 时,算法 parallel_for 将停止。

  • _Index_type 类型参数必须是整型类型。 此整型类型可以有符号,也可以无符号。

  • 循环迭代必须是前向的。 如果 _Step 参数小于 1,算法 parallel_for 将引发 std::invalid_argument 类型的异常。

  • parallel_for 算法的异常处理机制不同于 for 循环的异常处理机制。 如果在并行循环体中同时发生多个异常,运行时只会将其中一个异常传播到 parallel_for 线程。 此外,当一个循环迭代引发异常时,运行时不会立即停止整个循环。 相反,循环被置于取消状态,运行时会放弃尚未启动的任何任务。 有关异常处理和并行算法的详细信息,请参阅异常处理

尽管 parallel_for 算法不支持任意终止条件,但你可以使用取消来停止所有任务。 有关取消的详细信息,请参阅 PPL 中的取消操作

注意

负载均衡和支持取消等功能所带来的计划成本对于并行执行循环体的好处来说可能不是经济高效的,尤其是在循环体相对较小时。 您可以在并行循环中使用分区程序来尽量减少此开销。 有关详细信息,请参阅本文档稍后部分的分区工作

示例

以下示例展示了 parallel_for 算法的基本结构。 本示例并行输出到控制台中范围 [1, 5] 内的每个值。

// parallel-for-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <array>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Print each value from 1 to 5 in parallel.
   parallel_for(1, 6, [](int value) {
      wstringstream ss;
      ss << value << L' ';
      wcout << ss.str();
   });
}

此示例产生以下示例输出:

1 2 4 3 5

由于 parallel_for 算法对每个项并行执行操作,因此将值输出到控制台的顺序会有所不同。

有关使用 parallel_for 算法的完整示例,请参阅如何:编写 parallel_for 循环

[返回页首]

parallel_for_each 算法

concurrency::parallel_for_each 算法以并行方式对迭代容器执行任务,例如由 C++ 标准库提供的任务。 它与 parallel_for 算法使用相同的分区逻辑。

parallel_for_each 算法类似于 C++ 标准库 std::for_each 算法,但是 parallel_for_each 算法会并行执行任务。 与其他并行算法一样,parallel_for_each 不会按特定顺序执行任务。

尽管 parallel_for_each 算法同时适用于向前迭代器和随机访问迭代器,但它在使用随机访问迭代器时性能更好。

示例

以下示例展示了 parallel_for_each 算法的基本结构。 本示例将 std::array 对象中的每个值并行输出到控制台中。

// parallel-for-each-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <array>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an array of integer values.
   array<int, 5> values = { 1, 2, 3, 4, 5 };

   // Print each value in the array in parallel.
   parallel_for_each(begin(values), end(values), [](int value) {
      wstringstream ss;
      ss << value << L' ';
      wcout << ss.str();
   });
}
/* Sample output:
   5 4 3 1 2
*/

此示例产生以下示例输出:

4 5 1 2 3

由于 parallel_for_each 算法对每个项并行执行操作,因此将值输出到控制台的顺序会有所不同。

有关使用 parallel_for_each 算法的完整示例,请参阅如何:编写 parallel_for_each 循环

[返回页首]

parallel_invoke 算法

concurrency::parallel_invoke 算法以并行方式执行一组任务。 在每个任务完成之前,它不会返回。 如果你有几个独立的任务需要同时执行,则此算法非常有用。

parallel_invoke 算法采用一系列工作函数(Lambda 函数、函数对象或函数指针)作为其参数。 parallel_invoke 算法将进行重载,以采用两到十个参数。 你传递给 parallel_invoke 的每个函数都必须采用零参数。

与其他并行算法一样,parallel_invoke 不会按特定顺序执行任务。 主题任务并行说明了 parallel_invoke 算法与任务和任务组的关系。

示例

以下示例展示了 parallel_invoke 算法的基本结构。 此示例同时对三个局部变量调用 twice 函数,并将结果输出到控制台。

// parallel-invoke-structure.cpp
// compile with: /EHsc
#include <ppl.h>
#include <string>
#include <iostream>

using namespace concurrency;
using namespace std;

// Returns the result of adding a value to itself.
template <typename T>
T twice(const T& t) {
   return t + t;
}

int wmain()
{
   // Define several values.
   int n = 54;
   double d = 5.6;
   wstring s = L"Hello";

   // Call the twice function on each value concurrently.
   parallel_invoke(
      [&n] { n = twice(n); },
      [&d] { d = twice(d); },
      [&s] { s = twice(s); }
   );

   // Print the values to the console.
   wcout << n << L' ' << d << L' ' << s << endl;
}

该示例产生下面的输出:

108 11.2 HelloHello

有关使用 parallel_invoke 算法的完整示例,请参阅如何:使用 parallel_invoke 来编写并行排序例程如何:使用 parallel_invoke 来执行并行操作

[返回页首]

parallel_transform 和 parallel_reduce 算法

concurrency::parallel_transformconcurrency::parallel_reduce 算法分别是 C++ 标准库算法 std::transformstd::accumulate 的并行版本。 并发运行时版本的行为类似于 C++ 标准库版本,只不过操作顺序不确定,因为它们是并行执行的。 当您使用的集足够大,可从并行处理中获得性能和可扩展性优势时,请使用这些算法。

重要

因为这些迭代器会生成稳定的内存地址,所以 parallel_transform 算法和 parallel_reduce 算法仅支持随机访问、双向和向前迭代器。 而且这些迭代器必须生成非 const 左值。

parallel_transform 算法

您可以使用 parallel transform 算法执行许多数据并行化操作。 例如,可以:

  • 调整图像的亮度,并执行其他图像处理操作。

  • 在两个向量之间求和或取点积,并对向量执行其他数值计算。

  • 执行三维射线跟踪,其中每次迭代引用一个必须呈现的像素。

下面的示例显示用于调用 parallel_transform 算法的基本结构。 此示例以两种方法对 std::vector 对象的每个元素求反。 第一种方法是使用 lambda 表达式。 第二种方法是使用派生自 std::unary_functionstd::negate

// basic-parallel-transform.cpp
// compile with: /EHsc
#include <ppl.h>
#include <random>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create a large vector that contains random integer data.
    vector<int> values(1250000);
    generate(begin(values), end(values), mt19937(42));

    // Create a vector to hold the results.
    // Depending on your requirements, you can also transform the 
    // vector in-place.
    vector<int> results(values.size());

    // Negate each element in parallel.
    parallel_transform(begin(values), end(values), begin(results), [](int n) {
        return -n;
    });

    // Alternatively, use the negate class to perform the operation.
    parallel_transform(begin(values), end(values), begin(values), negate<int>());
}

警告

本示例演示 parallel_transform 的基本用法。 由于工作函数不会执行大量工作,因此本示例中不会有显著的性能提升。

parallel_transform 算法有两个重载。 第一个重载采用一个输入范围和一个一元函数。 该一元函数可以是采用一个自变量的 Lambda 表达式、一个函数对象或从 unary_function 派生的一个类型。 第二个重载采用两个输入范围和一个二元函数。 该二元函数可以是采用两个自变量的 Lambda 表达式、一个函数对象或从 std::binary_function 派生的一个类型。 下面的示例阐释了这些差异。

//
// Demonstrate use of parallel_transform together with a unary function.

// This example uses a lambda expression.
parallel_transform(begin(values), end(values), 
    begin(results), [](int n) { 
        return -n;
    });

// Alternatively, use the negate class:
parallel_transform(begin(values), end(values), 
    begin(results), negate<int>());

//
// Demonstrate use of parallel_transform together with a binary function.

// This example uses a lambda expression.
parallel_transform(begin(values), end(values), begin(results), 
    begin(results), [](int n, int m) {
        return n * m;
    });

// Alternatively, use the multiplies class:
parallel_transform(begin(values), end(values), begin(results), 
    begin(results), multiplies<int>());

重要

您为 parallel_transform 的输出提供的迭代器必须与输入迭代器完全重叠或根本不重叠。 如果输入迭代器和输出迭代器部分重叠,则此算法的行为是未指定的。

parallel_reduce 算法

当您具有满足关联属性的操作序列时,parallel_reduce 算法很有用。 (此算法不需要可交换属性。)下面是可以使用 parallel_reduce 执行的一些操作:

  • 将矩阵的序列相乘以生成一个矩阵。

  • 用矩阵序列乘以一个向量来生成一个向量。

  • 计算字符串序列的长度。

  • 将一个元素列表(例如字符串)组合为一个元素。

下面的基本示例演示如何使用 parallel_reduce 算法将一个字符串序列组合为一个字符串。 与 parallel_transform 的示例一样,此基本示例中不会有性能提升。

// basic-parallel-reduce.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <string>
#include <vector>

using namespace concurrency;
using namespace std;

int wmain()
{
  // Create a vector of strings.
  vector<wstring> words{
      L"Lorem ",
      L"ipsum ",
      L"dolor ",
      L"sit ",
      L"amet, ",
      L"consectetur ",
      L"adipiscing ",
      L"elit."};
      
  // Reduce the vector to one string in parallel.
  wcout << parallel_reduce(begin(words), end(words), wstring()) << endl;
}

/* Output:
   Lorem ipsum dolor sit amet, consectetur adipiscing elit.
*/

在许多情况下,你可以将 parallel_reduce 视为 parallel_for_each 算法与 concurrency::combinable 类一起使用的简写形式。

示例:并行执行映射和降低

映射操作会将一个函数应用于序列中的每个值。 化简操作会将一个序列的元素组合为一个值。 可以使用 C++ 标准模板库 std::transformstd::accumulate 函数来执行映射和化简操作。 但是,对于许多问题,您可以使用 parallel_transform 算法并行执行映射操作,并使用 parallel_reduce 算法并行执行化简操作。

下面的示例将按串行方式计算质数和所需的时间与按并行方式计算质数和所需的时间进行比较。 映射阶段会将非质数值转换为 0,而化简阶段将对这些值求和。

// parallel-map-reduce-sum-of-primes.cpp
// compile with: /EHsc
#include <windows.h>
#include <ppl.h>
#include <array>
#include <numeric>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Determines whether the input value is prime.
bool is_prime(int n)
{
   if (n < 2)
      return false;
   for (int i = 2; i < n; ++i)
   {
      if ((n % i) == 0)
         return false;
   }
   return true;
}

int wmain()
{   
   // Create an array object that contains 200000 integers.
   array<int, 200000> a;

   // Initialize the array such that a[i] == i.
   iota(begin(a), end(a), 0);

   int prime_sum;
   __int64 elapsed;

   // Compute the sum of the numbers in the array that are prime.
   elapsed = time_call([&] {
       transform(begin(a), end(a), begin(a), [](int i) { 
         return is_prime(i) ? i : 0; 
      });
      prime_sum = accumulate(begin(a), end(a), 0);
   });   
   wcout << prime_sum << endl;   
   wcout << L"serial time: " << elapsed << L" ms" << endl << endl;

   // Now perform the same task in parallel.
   elapsed = time_call([&] {
      parallel_transform(begin(a), end(a), begin(a), [](int i) { 
         return is_prime(i) ? i : 0; 
      });
      prime_sum = parallel_reduce(begin(a), end(a), 0);
   });
   wcout << prime_sum << endl;
   wcout << L"parallel time: " << elapsed << L" ms" << endl << endl;
}
/* Sample output:
   1709600813
   serial time: 7406 ms
   
   1709600813
   parallel time: 1969 ms
*/

有关并行执行映射和化简操作的示例,请参阅如何:并行执行映射和化简操作

[返回页首]

分区工作

若要对数据源操作进行并行化,一个必要步骤是将源分区为可由多个线程同时访问的多个部分。 分区程序将指定并行算法应如何在线程之间对范围进行分区。 如本文档前面所述,PPL 使用的是默认分区机制,该默认分区机制创建初始工作负荷并在工作负荷不平衡时使用工作窃取算法和范围窃取来平衡这些分区。 例如,当某个循环迭代完成一个迭代范围时,运行时会将其他线程的工作重新分配给该线程。 但是,在某些方案中,你可能希望指定另一个更适用于你的问题的分区机制。

parallel_forparallel_for_eachparallel_transform 算法提供采用一个附加参数 _Partitioner 的重载版本。 此参数定义了用于划分工作的分区程序类型。 以下是 PPL 定义的分区程序种类:

concurrency::affinity_partitioner
将工作划分为一个固定数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 此分区程序类型与 static_partitioner 类似,但通过将范围映射到辅助线程的方式改善了缓存的关联。 当在相同数据集中多次执行一个循环(例如一个循环内的循环)且数据适合缓存时,此分区程序类型可提高性能。 此分区程序不完全参与取消。 它也不使用协作停滞语义,因此不能与具有前向依赖关系的并行循环一起使用。

concurrency::auto_partitioner
将工作划分为一个初始数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 当您不调用采用 _Partitioner 参数的重载的并行算法时,运行时默认使用此类型。 每个范围可以划分为子范围,从而实现负载平衡。 当一个工作范围完成时,运行时会将其他线程工作的子范围重新分配给该线程。 如果您的工作负荷不在另外一个类别下或者您需要完全支持取消或协作停滞,请使用该分区程序。

concurrency::simple_partitioner
将工作划分到范围中,使每个范围至少拥有给定区块大小所指定的迭代的数目。 此分区程序类型加入了负载平衡;然而,运行时未将范围划分为子范围。 对于每个辅助,运行时将在 _Chunk_size 迭代完成后检查取消情况并执行负载平衡。

concurrency::static_partitioner
将工作划分为一个固定数量的范围(通常是可用于在循环中工作的辅助线程的数量)。 此分区程序类型可以提高性能,因为它不使用工作窃取,开销较小。 当一个并行循环的每次迭代执行固定和统一数量的工作而且您不需要支持取消或前向协作停滞时,请使用此分区程序类型。

警告

parallel_for_eachparallel_transform 算法仅支持为静态、简单和关联分区程序使用随机访问迭代器(如 std::vector)的容器。 采用双向和向前迭代器的容器的使用会导致编译时错误。 默认分区程序 auto_partitioner 支持所有这三种迭代器类型。

通常,除 affinity_partitioner 外,这些分区程序的使用方式相同。 大多数分区程序类型不会维持状态,而且不会由运行时进行修改。 因此,如下例所示,您可以在调用站点创建这些分区程序对象。

// static-partitioner.cpp
// compile with: /EHsc
#include <ppl.h>

using namespace concurrency;

void DoWork(int n)
{
    // TODO: Perform a fixed amount of work...
}

int wmain()
{
    // Use a static partitioner to perform a fixed amount of parallel work.
    parallel_for(0, 100000, [](int n) {
        DoWork(n);
    }, static_partitioner());
}

但是,必须将 affinity_partitioner 对象作为非 const 左值引用传递,以便算法可以存储状态,以供未来循环重用。 下面的示例演示对数据集多次并行执行相同操作的基本应用程序。 因为数组有可能适合缓存,使用 affinity_partitioner 可以提高性能。

// affinity-partitioner.cpp
// compile with: /EHsc
#include <ppl.h>
#include <array>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create an array and fill it with zeroes.
    array<unsigned char, 8 * 1024> data;
    data.fill(0);

    // Use an affinity partitioner to perform parallel work on data
    // that is likely to remain in cache.
    // We use the same affinitiy partitioner throughout so that the 
    // runtime can schedule work to occur at the same location for each 
    // iteration of the outer loop.

    affinity_partitioner ap;
    for (int i = 0; i < 100000; i++)
    {
        parallel_for_each(begin(data), end(data), [](unsigned char& c)
        {
            c++;
        }, ap);
    }
}

注意

在修改依赖于协作停滞语义的现有代码以使用 static_partitioneraffinity_partitioner 时应谨慎。 这些分区程序类型不使用负载平衡或范围窃取,因此可能会更改应用程序的行为。

确定在任何给定方案中是否使用分区程序的最佳方式是:体验并度量操作在有代表性的负载和计算机配置下要花多长时间完成。 例如,如果是只有几个内核的多核计算机,静态分区可以让速度显著提升;但如果是内核相对较多的计算机,静态分区可能会导致速度降低。

[返回页首]

并行排序

PPL 提供三种排序算法:concurrency::parallel_sortconcurrency::parallel_buffered_sortconcurrency::parallel_radixsort。 当您具有可受益于并行排序的数据集时,这些排序算法很有用。 具体而言,当您具有大型数据集时或使用需要消耗大量计算资源的比较操作对数据进行排序时,并行排序很有用。 每种算法都会就地对元素排序。

parallel_sortparallel_buffered_sort 算法都是基于比较的算法。 即,它们按值来比较元素。 parallel_sort 算法没有其他内存要求,适用于通用排序。 parallel_buffered_sort 算法的性能好于 parallel_sort,但是它需要 O(N) 空间。

parallel_radixsort 算法是基于哈希的。 即,它使用整数键来对元素排序。 通过使用键,此算法可以直接计算元素的目标,而不是使用比较。 与 parallel_buffered_sort 类似,此算法需要 O(N) 空间。

下表总结了三种并行排序算法的重要属性。

算法 说明 排序机制 排序稳定性 内存需求 时间复杂性 迭代器访问
parallel_sort 通用的基于比较的排序。 基于比较(升序) 不稳定 O((N/P)log(N/P) + 2N((P-1)/P)) Random
parallel_buffered_sort 需要 O(N) 空间,基于比较的更快的通用排序。 基于比较(升序) 不稳定 需要额外的 O(N) 空间 O((N/P)log(N)) Random
parallel_radixsort 需要 O(N) 空间,基于整数键的排序。 基于哈希 稳定版 需要额外的 O(N) 空间 O(N/P) Random

下图用图形更形象地说明了三种并行排序算法的重要属性。

Comparison of the sorting algorithms.

这些并行排序算法遵循取消和异常处理的规则。 有关并发运行时中的取消和异常处理的详细信息,请参阅取消并行算法异常处理

提示

这些并行排序算法支持移动语义。 可以定义一个移动赋值运算符,以使交换操作的出现更有效。 有关移动语义和移动赋值运算符的详细信息,请参阅 Rvalue 引用声明符:&&移动构造函数和移动赋值运算符 (C++)。 如果您不提供移动赋值运算符或交换函数,排序算法将使用复制构造函数。

下面的基本示例演示如何使用 parallel_sortvector 值的 int 进行排序。 默认情况下,parallel_sort 使用 std::less 来比较值。

// basic-parallel-sort.cpp
// compile with: /EHsc
#include <ppl.h>
#include <random>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create and sort a large vector of random values.
    vector<int> values(25000000);
    generate(begin(values), end(values), mt19937(42));
    parallel_sort(begin(values), end(values));

    // Print a few values.
    wcout << "V(0)        = " << values[0] << endl;
    wcout << "V(12500000) = " << values[12500000] << endl;
    wcout << "V(24999999) = " << values[24999999] << endl;
} 
/* Output:
   V(0)        = -2147483129
   V(12500000) = -427327
   V(24999999) = 2147483311
*/

此示例演示如何提供自定义比较函数。 它使用 std::complex::real 方法按升序对 std::complex<double> 值进行排序。

// For this example, ensure that you add the following #include directive:
// #include <complex>

// Create and sort a large vector of random values.
vector<complex<double>> values(25000000);
generate(begin(values), end(values), mt19937(42));
parallel_sort(begin(values), end(values),
    [](const complex<double>& left, const complex<double>& right) {
        return left.real() < right.real();
    });

// Print a few values.
wcout << "V(0)        = " << values[0] << endl;
wcout << "V(12500000) = " << values[12500000] << endl;
wcout << "V(24999999) = " << values[24999999] << endl;
/* Output:
   V(0)        = (383,0)
   V(12500000) = (2.1479e+009,0)
   V(24999999) = (4.29497e+009,0)
*/

此示例演示如何为 parallel_radixsort 算法提供哈希函数。 此示例对三维点排序。 根据这些点与参考点的距离对它们进行排序。

// parallel-sort-points.cpp
// compile with: /EHsc
#include <ppl.h>
#include <random>
#include <iostream>

using namespace concurrency;
using namespace std;

// Defines a 3-D point.
struct Point
{
    int X;
    int Y;
    int Z;
};

// Computes the Euclidean distance between two points.
size_t euclidean_distance(const Point& p1, const Point& p2)
{
    int dx = p1.X - p2.X;
    int dy = p1.Y - p2.Y;
    int dz = p1.Z - p2.Z;
    return static_cast<size_t>(sqrt((dx*dx) + (dy*dy) + (dz*dz)));
}

int wmain()
{
    // The central point of reference.
    const Point center = { 3, 2, 7 };

    // Create a few random Point values.
    vector<Point> values(7);
    mt19937 random(42);
    generate(begin(values), end(values), [&random] {
        Point p = { random()%10, random()%10, random()%10 };
        return p;
    });

    // Print the values before sorting them.
    wcout << "Before sorting:" << endl;
    for_each(begin(values), end(values), [center](const Point& p) {
        wcout << L'(' << p.X << L"," << p.Y << L"," << p.Z 
              << L") D = " << euclidean_distance(p, center) << endl;
    });
    wcout << endl;

    // Sort the values based on their distances from the reference point.
    parallel_radixsort(begin(values), end(values), 
        [center](const Point& p) -> size_t {
            return euclidean_distance(p, center);
        });

    // Print the values after sorting them.
    wcout << "After sorting:" << endl;
    for_each(begin(values), end(values), [center](const Point& p) {
        wcout << L'(' << p.X << L"," << p.Y << L"," << p.Z 
              << L") D = " << euclidean_distance(p, center) << endl;
    });
    wcout << endl;
} 
/* Output:
    Before sorting:
    (2,7,6) D = 5
    (4,6,5) D = 4
    (0,4,0) D = 7
    (3,8,4) D = 6
    (0,4,1) D = 7
    (2,5,5) D = 3
    (7,6,9) D = 6

    After sorting:
    (2,5,5) D = 3
    (4,6,5) D = 4
    (2,7,6) D = 5
    (3,8,4) D = 6
    (7,6,9) D = 6
    (0,4,0) D = 7
    (0,4,1) D = 7
*/

为了便于演示,本示例使用相对较小的数据集。 您可以增大向量的初始范围,体验较大数据集中性能的提升。

此示例使用 lambda 表达式作为哈希函数。 还可以使用 std::hash 类的一个内置实现,或者定义自己的专用化。 如本示例所示,您还可以使用自定义哈希函数对象:

// Functor class for computing the distance between points.
class hash_distance
{
public:
    hash_distance(const Point& reference)
        : m_reference(reference)
    {
    }

    size_t operator()(const Point& pt) const {
        return euclidean_distance(pt, m_reference);
    }

private:
    Point m_reference;
};

 

// Use hash_distance to compute the distance between points.
parallel_radixsort(begin(values), end(values), hash_distance(center));

哈希函数必须返回整型类型(std::is_integral::value 必须为 true)。 此整型必须可转换为类型 size_t

选择排序算法

在许多情况下,parallel_sort 会提供速度和内存性能的最佳平衡。 但是,当您增加数据集的大小、可用处理器的数量或比较函数的复杂性时,parallel_buffered_sortparallel_radixsort 性能更佳。 确定在任何给定方案中使用哪种排序算法的最佳方式是:体验并度量在有代表性计算机配置下对典型数据排序需要多长时间。 在选择排序策略时请遵循以下准则。

  • 数据集的大小。 在本文档中,小型数据集包含的元素少于 1,000 个,中型数据集包含的元素介于 10,000 和 100,000 个之间,而大型数据集包含的元素多于 100,000 个。

  • 您的比较函数或哈希函数所执行的工作量。

  • 可用计算资源的量。

  • 数据集的特征。 例如,一种算法对已完成近似排序的数据可能执行效果很好,但对完全未排序的数据执行效果就不那么好了。

  • 区块的大小。 可选的 _Chunk_size 参数将指定算法在将整体排序细分成较小工作单元时何时从并行排序实现切换为串行排序实现。 例如,如果提供的是 512,算法会在工作单元包含 512 个或更少元素时切换到串行实现。 串行实现可以提高整体性能,因为它消除了并行处理数据所需的开销。

以并行方式对小型数据集排序可能不值得,即使是在您拥有大量的可用计算资源或您的比较函数或哈希函数执行相对大量的工作时。 可以使用 std::sort 函数对小型数据集排序。 (当你指定的区块大小大于数据集时,parallel_sortparallel_buffered_sort 会调用 sort;但是,parallel_buffered_sort 将必须分配 O(N) 空间,这样会因锁争用或内存分配而花费更多时间。)

如果您必须节省内存或您的内存分配器容易出现锁争用问题,请使用 parallel_sort 对中型数据集排序。 parallel_sort 不需要额外的空间;其他算法需要 O(N) 空间。

当你的应用程序能够满足额外的 O(N) 空间需求时,使用 parallel_buffered_sort 对中型数据集排序。 当您拥有大量的计算资源或高开销的比较函数或哈希函数时,parallel_buffered_sort 尤其有用。

当你的应用程序能够满足额外的 O(N) 空间需求时,使用 parallel_radixsort 对大型数据集排序。 当等效的比较操作开销较大或两种操作开销都很大时,parallel_radixsort 尤其有用。

注意

好的哈希函数的实现要求你知道数据集范围以及数据集中的每个元素如何转换为对应的无符号值。 由于哈希操作会处理无符号值,如果无法生成无符号哈希值,请考虑使用另外的排序策略。

下面的示例针对相同大小的随机数据集对 sortparallel_sortparallel_buffered_sortparallel_radixsort 的性能进行比较。

// choosing-parallel-sort.cpp
// compile with: /EHsc
#include <ppl.h>
#include <random>
#include <iostream>
#include <windows.h>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

const size_t DATASET_SIZE = 10000000;

// Create
// Creates the dataset for this example. Each call
// produces the same predefined sequence of random data.
vector<size_t> GetData()
{
    vector<size_t> data(DATASET_SIZE);
    generate(begin(data), end(data), mt19937(42));
    return data;
}

int wmain()
{
    // Use std::sort to sort the data.
    auto data = GetData();
    wcout << L"Testing std::sort...";
    auto elapsed = time_call([&data] { sort(begin(data), end(data)); });
    wcout << L" took " << elapsed << L" ms." <<endl;

    // Use concurrency::parallel_sort to sort the data.
    data = GetData();
    wcout << L"Testing concurrency::parallel_sort...";
    elapsed = time_call([&data] { parallel_sort(begin(data), end(data)); });
    wcout << L" took " << elapsed << L" ms." <<endl;

    // Use concurrency::parallel_buffered_sort to sort the data.
    data = GetData();
    wcout << L"Testing concurrency::parallel_buffered_sort...";
    elapsed = time_call([&data] { parallel_buffered_sort(begin(data), end(data)); });
    wcout << L" took " << elapsed << L" ms." <<endl;

    // Use concurrency::parallel_radixsort to sort the data.
    data = GetData();
    wcout << L"Testing concurrency::parallel_radixsort...";
    elapsed = time_call([&data] { parallel_radixsort(begin(data), end(data)); });
    wcout << L" took " << elapsed << L" ms." <<endl;
} 
/* Sample output (on a computer that has four cores):
    Testing std::sort... took 2906 ms.
    Testing concurrency::parallel_sort... took 2234 ms.
    Testing concurrency::parallel_buffered_sort... took 1782 ms.
    Testing concurrency::parallel_radixsort... took 907 ms.
*/

本示例中假设在排序期间分配 O(N) 空间是可以接受的,parallel_radixsort 在此计算机配置下对这个数据集表现得最好。

[返回页首]

Title 说明
如何:编写 parallel_for 循环 演示如何使用 parallel_for 算法执行矩阵乘法。
如何:编写 parallel_for_each 循环 演示如何使用 parallel_for_each 算法并行计算 std::array 对象中的质数计数。
如何:使用 parallel_invoke 来编写并行排序例程 演示如何使用 parallel_invoke 算法提高双调排序算法的性能。
如何:使用 parallel_invoke 来执行并行操作 演示如何使用 parallel_invoke 算法提高对共享数据源执行多项操作的程序的性能。
如何:并行执行映射和减少操作 演示如何使用 parallel_transformparallel_reduce 算法执行用于计算文件中单词出现次数的映射和化简操作。
并行模式库 (PPL) 介绍 PPL,它提供命令式编程模型,可以促进开发并发应用程序的可扩展性和易用性。
PPL 中的取消操作 说明 PPL 中的取消操作的角色、如何取消并行工作,以及如何确定取消任务组的时间。
异常处理 介绍并发运行时中异常处理的角色。

参考

parallel_for 函数

parallel_for_each 函数

parallel_invoke 函数

affinity_partitioner 类

auto_partitioner 类

simple_partitioner 类

static_partitioner 类

parallel_sort 函数

parallel_buffered_sort 函数

parallel_radixsort 函数