并行编程

并行 .NET 应用程序的过去、现在和未来

Stephen Toub

并行编程的过去

以往,开发人员在尝试实现响应及时的客户端应用程序、并行算法和可伸缩服务器时,一直都采用直接线程操作。 然而,这类方法也一直为开发人员制造着麻烦,比如死锁、活锁、锁保护、两步舞曲、争用条件、过度订阅和应用程序中的许多其他缺陷。 从诞生之初,Microsoft .NET Framework 便提供了大量较低级别的工具,用于构建并发应用程序,包括专为这项工作提供的整个命名空间:System.Threading。 借助 .NET Framework 3.5 核心程序集在此命名空间中提供的大约 50 种类型(包括 Thread、ThreadPool、Timer、Monitor、ManualResetEvent、ReaderWriterLock 和 Interlocked 等类型),人们不应责怪 .NET Framework 的线程支持太少。 不过,在我看来,.NET Framework 的以往版本对开发人员提供的真正支持仍有欠缺,不足以让他们在任何位置都能成功构建可伸缩且高度并行化的应用程序。 我十分欣慰和兴奋地告诉大家,这个问题在 .NET Framework 4 中得到了纠正,并且对未来 .NET Framework 版本会继续有大量投资。

有些人可能会质疑托管语言中的丰富子系统对于编写并行代码的价值。 毕竟,并行性和并发性关乎性能,而注重性能的开发人员应寻求本机语言,因为这些语言可提供对硬件的充分访问以及对每个位调整、缓存行操作和联锁操作的完全控制… 对吧? 如果情况确实如此,我会为我们行业的现状感到担忧。 托管语言(如 C#、Visual Basic 和 F#)的存在是为了向所有开发人员 — 无论是无名小卒还是超级英雄 — 提供一个安全且富有成效的环境,用于快速开发强大而高效的代码。 开发人员有成千上万的预生成库类可供使用,还有包含我们所期待的所有先进服务的成熟语言,并且仍设法在几乎最繁重的数字处理和浮点密集型工作负载下实现卓越的性能。 所有这些意味着,托管语言及其关联框架针对构建高性能并发应用程序提供了深层支持,使得使用最新硬件的开发人员也可以从中受益。

我一直认为模式学习是一种很好的学习方法,因此对于当前的主题,我们最好也通过了解一种模式来开始我们的探索。 无论是“进退两难的并行”还是“令人愉快的并行”模式,最常需要的一种“派生-联结”构造都是并行循环,该循环旨在并行处理循环中的每个独立迭代。 了解如何使用前面提到的较低级别基元进行这类处理颇具指导意义,为此我们将演练一下使用 C# 实现的简单并行循环的基本实现。 考虑下面的典型 for 循环:

for (int i=0; i<N; i++) {
  ...
// Process i here
}

我们可以使用线程直接实现此循环的并行化,如图 1 所示。

图 1 并行化 For 循环

int lowerBound = 0, upperBound = N;
int numThreads = Environment.ProcessorCount;
int chunkSize = (upperBound - lowerBound) / numThreads;

var threads = new Thread[numThreads];
for (int t = 0; t < threads.Length; t++) {
  int start = (chunkSize * t) + lowerBound;
  int end = t < threads.Length - 1 ?
start + chunkSize : upperBound;
  threads[t] = new Thread(delegate() {
    for (int i = start; i < end; i++) {
      ...
// Process i here
    }
  });
}

foreach (Thread t in threads) t.Start(); // fork
foreach (Thread t in threads) t.Join();  // join

当然,此并行化方法存在许多问题。 我们将启动专用于该循环的新线程,这不仅会增加开销(特别是循环主体在待完成工作中非常琐细时),而且还可能会导致在同时进行其他工作的进程中出现严重的过度订阅。 我们使用静态分区在线程间划分工作,如果工作负载在迭代空间中分布不均匀,则可能会导致严重的负载不平衡(更不用提迭代数目不能由所用线程数均匀分割,导致最后一个线程承受溢出的情况)。 然而,最糟糕的可能是开发人员可能一开始便被强迫编写此代码。 我们尝试并行化的每个算法都需要类似的代码(十分脆弱的代码)。

当我们认识到并行循环只是并行程序中存在的众多模式中的一个模式时,会进一步放大通过上面代码举例说明的问题。 强迫开发人员以此低级别编码表示所有这类并行模式不会形成良好的编程模型,不会让世界上需要利用高度并行硬件的众多开发人员取得成功。

并行编程的现在

让我们进入 .NET Framework 4。 此版本 .NET Framework 扩充了大量功能,显著降低了开发人员在应用程序中表示并行性的难度,并提高了并行执行的效率。 这远远超出了并行循环的范畴,尽管如此,我们仍将从这里开始。

System.Threading 命名空间在 .NET Framework 4 中通过一个新的子命名空间得到了增强:System.Threading.Tasks。 此命名空间包含一个新类型 Parallel,该类型公开了丰富的静态方法,用以实现并行循环和结构化“派生-联结”模式。 作为其用法的示例,请考虑前面的 for 循环:

for (int i=0; i<N; i++) {
  ...
// Process i here
}

借助 Parallel 类,可以如下所示轻松实现并行化:

Parallel.For(0, N, i => {
  ...
// Process i here
});

在这里,开发人员仍负责确保循环的每个迭代实际上都是独立的,但除此之外,Parallel.For 构造会处理此循环的并行化的所有方面。 该构造跨计算中涉及的所有基础线程动态地对输入范围进行分区,同时仍尽可能减小分区的开销,使其接近于静态分区实现的开销。 该构造动态地增加和减少计算中涉及的线程数,以便发现最适合给定工作负载的最佳线程数(与惯常认知不同的是,最佳线程数并不总是等于硬件线程数)。 该构造还提供前面演示的简单实现中不存在的异常处理功能,等等。 最重要的是,该构造使开发人员不必在线程的较低级别操作系统抽象层考虑并行性,并且无需为工作负载分区、卸载到多个核心以及高效地联接结果而一次又一次地编写谨小慎微的解决方案。 如此,开发人员便可集中时间处理更重要的工作:使开发人员的工作更富收益的业务逻辑。

Parallel.For 还为需要更加精细地控制循环操作的开发人员提供了便利工具。 通过为 For 方法提供的一个选项包,开发人员可以控制循环运行所在的基础计划程序、要使用的最大并行度,以及循环外部实体用于请求在循环方便时尽早正常终止的取消标记:

var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.For(0, N, options, i=> {
  ...
// Process i here
});

此自定义功能突出了 .NET Framework 中此并行化工作的一个目标:在不使编程复杂化的情况下显著降低开发人员利用并行性的难度,同时为更加高级的开发人员提供了对处理和执行进行微调所需的工具。 出于此目的,还支持其他调整。 Parallel.For 的其他重载使开发人员可以尽早中断循环:

Parallel.For(0, N, (i,loop) => {
  ...
// Process i here
  if (SomeCondition()) loop.Break();
});

还有其他重载使开发人员能够使状态流过最终在同一基础线程上运行的迭代,从而显著提高算法实现(如缩减)的效率,例如:

static int SumComputations(int [] inputs, Func<int,int> computeFunc) {
  int total = 0;
  Parallel.For(0, inputs.Length, () => 0, (i,loop,partial)=> {
    return partial + computeFunc(inputs[i]);
  }, 
  partial => Interlocked.Add(ref total, partial));
}

Parallel 类不仅为整数范围提供支持,还为任意 IEnumerable<T> 源(可枚举序列的 .NET Framework 表示形式)提供支持:代码可以对枚举器连续地调用 MoveNext,以便检索下一个 Current 值。 通过这种使用任意可枚举内容的能力可实现任意数据集的并行处理,而无论数据在内存中的表示形式如何;数据源甚至可以根据需要具体化,并在 MoveNext 调用到达源数据的尚未具体化部分时分页:

IEnumerable<string> lines = File.ReadLines("data.txt");
Parallel.ForEach(lines, line => {
  ...
// Process line here
});

与 Parallel.For 一样,Parallel.ForEach 也采用许多自定义功能,但提供比 Parallel.For 更大的控制能力。 例如,ForEach 使开发人员可以自定义对输入数据集进行分区的方式。 这通过一组侧重于分区的抽象类完成,这些抽象类使并行化构造可以请求固定或可变数量的分区,从而允许分区程序将这些分区抽象分派给输入数据集,并根据需要以静态或动态方式将数据分配给这些分区:

Graph<T> graph = ...;
Partitioner<T> data = new GraphPartitioner<T>(graph);
Parallel.ForEach(data, vertex => {
  ...
// Process vertex here
});

Parallel.For 和 Parallel.ForEach 在 Parallel 类上补充提供了一个 Invoke 方法,该方法接受任意数量的待调用操作,并可实现基础系统可以支持的最大并行度。 通过此经典的“派生-联结”构造可以轻松地并行化递归的“分割-解决”算法,如常用的 QuickSort 示例:

static void QuickSort<T>(T [] data, int lower, int upper) {
  if (upper – lower < THRESHOLD) {
    Array.Sort(data, index:lower, length:upper-lower);
  }
  else {
    int pivotPos = Partition(data, lower, upper);
    Parallel.Invoke(
      () => QuickSort(data, lower, pivotPos),
      () => QuickSort(data, pivotPos, upper));
  }
}

尽管有了很大进步,但是 Parallel 类只是可用功能的一小部分。 .NET Framework 4 中实现的更重大的并行化进步之一是引入了并行 LINQ,人们将其亲切地称为 PLINQ(发音为“Pee-link”)。 LINQ(即语言集成查询)是在 .NET Framework 版本 3.5 中引入的。 LINQ 实际包含两方面内容:对一组公开为数据集操作方法的运算符的描述,以及 C# 和 Visual Basic 中用于直接在语言中表示这些查询的上下文关键字。 LINQ 中包含的许多运算符都基于数据库社区多年以来所了解的等效运算,包括 Select、SelectMany、Where、Join、GroupBy 以及大约 50 个其他运算。 .NET Framework 标准查询运算符 API 为这些方法定义了模式,但是未定义这些运算应针对的确切数据集,也未确切定义应如何实现这些运算。 各种“LINQ 提供程序”随后为许多不同数据源和目标环境(内存中集合、SQL 数据库、对象/关系映射系统、HPC Server 计算群集、临时和流数据源等等)实现此模式。 最常用的提供程序之一名为 LINQ to Objects,它提供以 IEnumerable<T> 为基础实现的全套 LINQ 运算符。 这样便可在 C# 和 Visual Basic 中实现查询,如下面的代码段所示,该代码段从文件逐行读取所有数据,从而仅筛选出包含“secret”一词的行并对这些行进行加密。 最终结果是字节数组构成的可枚举内容:

IEnumerable<byte[]> encryptedLines = 
  from line in File.ReadLines("data.txt")
  where line.Contains("secret")
  select DataEncryptor.Encrypt(line);

对于需要大量计算的查询,甚至只对于涉及大量长延迟 I/O 的查询,PLINQ 提供自动并行化功能,从而实现利用端到端并行算法的完整 LINQ 运算符集。 因此,开发人员只需为数据源附加“.AsParallel()”,即可并行化前面的查询:

IEnumerable<byte[]> encryptedLines = 
  from line in File.ReadLines("data.txt").AsParallel()
  where line.Contains("secret")
  select DataEncryptor.Encrypt(line);

与 Parallel 类一样,此模型也选择强迫开发人员评估并行运行计算的后果。 但是,一旦做出并行选择,系统便会处理实际并行化、分区、线程限制等较低级别的细节。 此外,与 Parallel 一样,这些 PLINQ 查询也可通过各种方式进行自定义。 开发人员可以控制如何实现分区、实际使用的并行度、同步与延迟之间的权衡等:

IEnumerable<byte[]> encryptedLines = 
  from line in new OneAtATimePartitioner<string>(
    File.ReadLines("data.txt"))
    .AsParallel()
    .AsOrdered()
    .WithCancellation(someExternalToken)
    .WithDegreeOfParallelism(4)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
  where line.Contains("secret")
  select DataEncryptor.Encrypt(line);

这些用于循环和查询的编程模型功能强大,级别更高,它们构建于同样强大但是级别较低的一组基于任务的 API 之上,以 System.Threading.Tasks 命名空间中的 Task 和 Task<TResult> 类型为中心。 实际上,并行循环和查询引擎属于任务生成器,依靠基础任务基础结构将表示的并行性映射到基础系统中提供的资源。 在其核心,Task 是工作单元(或者,更宽泛地说,是异步单元,即可能生成并在以后通过各种方式进行联接的工作项)的表示形式。 Task 提供 Wait、WaitAll 和 WaitAny 方法,这些方法允许同步阻止向前推进,直至目标任务完成,或直至向这些方法的重载提供的其他约束得到满足(例如,超时或取消标记)。 Task 通过其 IsCompleted 属性支持轮询任务是否完成,更宽泛地说,通过其 Status 属性支持轮询其生命周期处理中的更改。 可能最重要的是,它提供 ContinueWith、ContinueWhenAll 和 ContinueWhenAny 方法,通过这些方法可以创建仅当完成一组特定先行任务时才安排的任务。 通过此续接支持可以轻松实现许多方案,从而可以在计算之间表示依赖关系,以便系统可以基于这些依赖关系的满足情况安排工作:

Task t1 = Task.Factory.StartNew(() => BuildProject(1));
Task t2 = Task.Factory.StartNew(() => BuildProject(2));
Task t3 = Task.Factory.StartNew(() => BuildProject(3));
Task t4 = Task.Factory.ContinueWhenAll(
  new [] { t1, t2 }, _ => BuildProject(4));
Task t5 = Task.Factory.ContinueWhenAll(
  new [] { t2, t3 }, _ => BuildProject(5));
Task t6 = Task.Factory.ContinueWhenAll(
  new [] { t4, t5 }, _ => BuildProject(6));
t6.ContinueWith(_ => Console.WriteLine("Solution build completed."));

通过从 Task 派生的 Task<TResult> 类可以从完成的操作传出结果,从而向 .NET Framework 提供核心“未来”实现:

int SumTree<T>(Node<T> root, Func<T,int> computeFunc) {
  if (root == null) return 0;
  Task<int> left  = Task.Factory.StartNew(() => SumTree(root.Left));
  Task<int> right = Task.Factory.StartNew(() => SumTree(root.Right));
  return computeFunc(root.Data) + left.Result + right.Result;
}

在所有这些模型(循环、查询和任务等)之下,.NET Framework 使用工作窃取方法提供对专门工作负载的更高效处理,并且在默认情况下,它使用爬山试探法随时间推移改变使用的线程数,以便找到最佳处理级别。 试探法也内置在这些组件的各个部分之中,以便在系统认为任何并行化尝试都会导致慢于顺序结果时间时自动回退到顺序处理,不过,与前面讨论的其他默认设置一样,这些试探法也可被覆盖。

Task<TResult> 无需仅表示计算密集型操作。 它还可以用于表示任意异步操作。 请考虑 .NET Framework System.IO.Stream 类,该类提供 Read 方法用于从流中提取数据:

NetworkStream source = ...;
byte [] buffer = new byte[0x1000];
int numBytesRead = source.Read(buffer, 0, buffer.Length);

此 Read 操作是同步阻塞操作,这样便不会将进行 Read 调用的线程用于其他工作,直至基于 I/O 的 Read 操作完成。 为了实现更好的可伸缩性,Stream 类以两个方法的形式为 Read 方法提供了异步对应项:BeginRead 和 EndRead。 从 .NET Framework 诞生之初,这些方法便遵循其中提供的模式,该模式称为 APM(即异步编程模型)。 下面是前面代码示例的异步版本:

NetworkStream source = …;
byte [] buffer = new byte[0x1000];
source.BeginRead(buffer, 0, buffer.Length, delegate(IAsyncResult iar) {
  int numBytesRead = source.EndRead(iar);
}, null);

但是,这种方法会导致可组合性较差。 TaskCompletionSource<TResult> 类型通过使这类异步读取操作可以公开为任务,解决了此问题:

public static Task<int> ReadAsync(
  this Stream source, byte [] buffer, int offset, int count) 
{
  var tcs = new TaskCompletionSource<int>();
  source.BeginRead(buffer, 0, buffer.Length, iar => {
    try { tcs.SetResult(source.EndRead(iar)); }
    catch(Exception exc) { tcs.SetException(exc); }
  }, null);
  return tcs.Task;
}

这样便允许组合多个异步操作,正如计算密集型示例中一样。 下面的示例同时从所有源数据流读取数据,仅当所有操作完成后才写出到控制台:

NetworkStream [] sources = ...;
byte [] buffers = ...;
Task.Factory.ContinueWhenAll(
  (from i in Enumerable.Range(0, sources.Length)
   select sources[i].ReadAsync(buffers[i], 0, buffers[i].Length))
  .ToArray(), 
  _ => Console.WriteLine("All reads completed"));

除了用于启动并行化和并发处理的机制之外,.NET Framework 4 还为任务与线程之间的进一步协调工作提供了基元。 这包括一组线程安全且可伸缩的集合类型,这些类型大体消除了开发人员手动同步对共享集合的访问的需要。 ConcurrentQueue<T> 提供一个线程安全、无锁定、先进先出的集合,该集合可以由任意数量的生产者和任意数量的使用者同时使用。 此外,它还支持并发枚举器的快照语义,以便代码即使在其他线程操作实例时也可以检查队列的状态。 ConcurrentStack<T> 也类似,只是它提供后进先出语义。 ConcurrentDictionary<T> 使用无锁定和精细锁定方法提供线程安全的字典,该字典也支持任意数量的并发读取器、写入器和枚举器。 它还提供几个多步骤操作(如 GetOrAdd 和 AddOrUpdate)的原子实现。 另一种类型 ConcurrentBag<T> 提供使用工作窃取队列的无序集合。

.NET Framework 不会停止开发集合类型。 Lazy<T> 通过采用可配置的方法实现线程安全,提供变量的延迟初始化。 ThreadLocal<T> 提供每线程、每实例数据,这些数据也可以在第一次访问时延迟初始化。 Barrier 类型实现阶段化操作,以便可以通过保持同步的算法进行多个任务或线程。 该列表在不断扩充,并且所有内容都源自一个指导原则:开发人员应无需关注其算法并行化的低级及初级方面,而是让 .NET Framework 为其处理功能机制和效率细节。

并行编程的未来

与 Dickens 不同,.NET Framework 的并行性和并发性未来构建于 .NET Framework 4 中的基础之上,让人满怀期待。 除了提高现有编程模型的性能之外,未来版本 .NET Framework 的一个侧重点是扩展较高级别模型集,这些模型旨在实现更多的并行工作负载模式。 其中一个增强功能是一个新库,用以基于数据流实现并行系统以及使用基于代理的模型构建应用程序。 新的 System.Threading.Tasks.Dataflow 库提供许多“数据流块”,这些块用作数据的缓冲区、处理器和传播器。 数据可以发布到这些块,并且这些数据会基于源块的语义进行处理并自动转发到任何链接的目标。 数据流库也构建于任务之上,包含在内部运行任务以处理和传播数据的块。

站在模式的角度,该库特别适用于处理组成生产者和使用者链的数据流网络。 假设您需要压缩数据然后将其加密并写出到文件中,数据流要到达并流过应用程序。 可以通过配置小型数据流块网络来实现此功能,如下所示:

static byte [] Compress(byte [] data) { ...
}
static byte [] Encrypt(byte [] data) { ...
}
...
var compressor = new TransformBlock<byte[],byte[]>(Compress);
var encryptor = new TransformBlock<byte[],byte[]>(Encrypt);
var saver = new ActionBlock<byte[]>(AppendToFile);
compressor.LinkTo(encryptor);
encryptor.LinkTo(saver);
...
// As data arrives
compressor.Post(byteArray);

然而,除了数据流库,.NET Framework 中用于并行性和并发性的最重要功能可能是 C# 和 Visual Basic 中为生成和异步等待任务而提供的一流语言支持了。 这些语言新增了基于状态机的重写功能,这些功能允许利用所有语言的顺序控制流构造,同时能够异步等待任务完成(Visual Studio 2010 中的 F# 支持相关形式的异步作为其异步工作流功能的一部分,该功能也与任务集成)。 看看下面的方法,该方法将数据从一个 Stream 同步复制到另一个 Stream,并返回复制的字节数:

static long CopyStreamToStream(Stream src, Stream dst) {
  long numCopied = 0;
  byte [] buffer = new byte[0x1000];
  int numRead;
  while((numRead = src.Read(buffer,0,buffer.Length)) > 0) {
    dst.Write(buffer, 0, numRead);
    numCopied += numRead;
  }
  return numCopied;
}

如果通过前面所示 Stream 上的 BeginRead/EndRead 方法之类的支持实现此函数(包括其条件和循环),会导致容易出错和极度难以调试的回调和逻辑噩梦。 请考虑使用 ReadAsync 方法(如前所示),该方法返回 Task<int> 和对应的 WriteAsync 方法(该方法返回 Task)。 使用新的 C# 功能,可以重写前面的方法,如下所示:

static async Task<long> CopyStreamToStreamAsync(Stream src, Stream dst) {
  long numCopied = 0;
  byte [] buffer = new byte[0x1000];
  int numRead;
  while((numRead = await src.ReadAsync(buffer,0,buffer.Length)) > 0) {
    await dst.WriteAsync(buffer, 0, numRead);
    numCopied += numRead;
  }
  return numCopied;
}

请注意将同步方法转换为异步方法的几处小改动。 该函数现在批注为“async”,以告知编译器它应执行函数重写。 这样,只要对 Task 或 Task<TResult> 请求“等待”操作,函数剩余部分的执行实际上便挂接到该任务作为续接操作:在该任务完成之前,此方法调用不会占用线程。 Read 方法调用已转换为 ReadAsync 调用,以便 await 上下文关键字可以用于表示剩余部分应转换为续接操作的生成点;对于 WriteAsync 也是如此。 当此异步方法最终完成时,会使用如前所示用于 TaskCompletionSource<TResult> 的机制,将返回的 long 值提升到返回到 CopyStreamToStreamAsync 的初始调用方的 Task<long>。 我现在可以如同处理任何 Task 一样使用来自 CopyStreamToStreamAsync 的返回值,在它之上等待、为之挂接续接操作、在其他任务上与它组合、甚至是等待它。 借助 ContinueWhenAll 和 WaitAll 等功能,我可以启动多个异步操作并在稍后与之联接,以便实现更高级别的并发并提高应用程序的总体吞吐量。

这种对异步的语言支持不仅改进了 I/O 密集型操作,还改进了 CPU 密集型操作,特别是开发人员能够构建响应及时的客户端应用程序(不会阻塞 UI 线程并使应用程序陷入无响应状态的应用程序),同时仍获得大规模并行处理的好处。 开发人员长期以来的做法都是先离开某个 UI 线程,执行任何处理,然后再返回该 UI 线程以更新 UI 元素并与用户交互,这样做十分麻烦。 对异步的语言支持可与 .NET Framework 的重要组件交互,这样,在默认情况下,可在等待操作完成时自动使操作返回其原始上下文(例如,如果从 UI 线程发出等待操作,则挂接的续接操作会返回 UI 线程继续执行)。 这意味着,可以启动某个任务以在后台运行计算密集型工作,并且开发人员只需等待该任务检索结果并将其存储到 UI 元素中,如下所示:

async void button1_Click(object sender, EventArgs e) {
  string filePath = txtFilePath.Text;
  txtOutput.Text = await Task.Factory.StartNew(() => {
    return ProcessFile(filePath);
  });
}

该后台任务自身也可能启动多个任务,以便并行化后台计算,如通过使用 PLINQ 查询进行:

async void button1_Click(object sender, EventArgs e) {
  string filePath = txtFilePath.Text;
  txtOutput.Text = await Task.Factory.StartNew(() => {
    return File.ReadLines(filePath).AsParallel()
      .SelectMany(line => ParseWords(line))
      .Distinct()
      .Count()
      .ToString();
  });
}

该语言支持还可以与数据流库结合使用,以简化异步生产者/使用者方案的自然表达式。 假设您要实现一组受限制的生产者,其中每个生产者都生成一些数据发送给若干个使用者。 这可以使用 BlockingCollection<T> 之类的类型同步进行(请参阅图 2),该类型在 .NET Framework 4 中引入。

图 2 使用 BlockingCollection

static BlockingCollection<Datum> s_data = 
  new BlockingCollection<Datum>(boundedCapacity:100);
...
static void Producer() {
  for(int i=0; i<N; i++) {
    Datum d = GenerateData();
    s_data.Add(d);
  }
  s_data.CompleteAdding();
}

static void Consumer() {
  foreach(Datum d in s_data.GetConsumingEnumerable()) {
    Process(d);
  }
}
...
var workers = new Task[3];
workers[0] = Task.Factory.StartNew(Producer);
workers[1] = Task.Factory.StartNew(Consumer);
workers[2] = Task.Factory.StartNew(Consumer);
Task.WaitAll(workers);

这是一个很好的模式,前提是其满足应用程序的目标,即生产者和使用者同时阻塞线程。 如果这不可接受,则您可以编写异步实现,从而利用另一个数据流块 BufferBlock<T> 以及与块之间进行异步发送和接收的能力,如图 3 所示。

图 3 使用 BufferBlock

static BufferBlock<Datum> s_data = new BufferBlock<Datum>(
  new DataflowBlockOptions { BoundedCapacity=100 });
...
static async Task ProducerAsync() {
  for(int i=0; i<N; i++) {
    Datum d = GenerateData();
    await s_data.SendAsync(d);
  }
  s_data.Complete();
}

static async Task ConsumerAsync() {
  Datum d;
  while(await s_data.OutputAvailableAsync()) {
    while(s_data.TryReceive(out d)) {
      Process(d);
    }
  }
}
...
var workers = new Task[3];
workers[0] = ProducerAsync();
workers[1] = ConsumerAsync();
workers[2] = ConsumerAsync();
await Task.WhenAll(workers);

这里,SendAsync 和 OutputAvailableAsync 方法都返回任务,从而使编译器可以挂接续接操作并使整个进程可异步运行。

不再稀缺

并行编程长期以来都是开发专家(精通将代码扩展到多个核心的独一无二的合格人员)的领地。 这些专家拥有多年的培训及实际工作经验。 他们十分宝贵,属于稀缺资源。 在无处不在的多核和众核美丽新世界中,这种仅由专家来实现并行性的模型已不再够用。 无论应用程序或组件是旨在作为公开提供的软件包、仅用于内部使用还是仅作为完成更重要工作的工具,并行性现在都已成为每个开发人员至少必须考虑的功能,以及成千上万使用托管语言的开发人员必须能够利用的功能,即使他们通过自己封装并行性的组件实现也是如此。 为了实现这一美好未来,我们需要 .NET Framework 4 中公开的以及未来版本 .NET Framework 中提供的并行编程模型。

Stephen Toub 是 Microsoft 并行计算平台团队的首席架构师。

衷心感谢以下技术专家对本文的审阅:Joe HoagDanny Shih