Parallel Extensions:使用 .NET 构建多核应用程序
概述
现代计算机在处理器和可供系统使用的内核数量方面取得了举世瞩目的突破。系统开发人员可以在他们的软件中以各种形式利用这些强大特性,特别是在处理复杂算法或较大的数据集时。
微软的并行计算平台 (Parallel Computing Platform, PCP) 所提供的工具支持开发人员以有效、可维护和可伸缩的方式利用这种强大特性。并行扩展在 .NET Framework 工具集中引入了一些重要的概念:任务并行库 (Task Parallel Library, TPL) 和并行 LINQ (Parallel LINQ, PLINQ) 提供了命令和任务并行机制,允许开发人员采用声明的方式处理数据并行机制。
目标
在本次动手实验中,您将学习如何:
• 通过使用 Parallel 帮助程序类以及自动处理并发表达式,并行化已有算法。
• 创建并运行能够在运行过程中取消运行的任务。
• 使用 Task<T> 类创建和运行可返回值的任务。
• 使用并行 LINQ (PLINQ) 优化 LINQ 查询,以便在并行环境中执行。
系统要求
您必须拥有以下内容才能完成本实验:
• Microsoft Visual Studio 2010 Beta 2
• .Net Framework 4
安装
使用 Configuration Wizard 验证本实验的所有先决条件。要确保正确配置所有内容,请按照以下步骤进行。
注意:要执行安装步骤,您需要使用管理员权限在命令行窗口中运行脚本。
1. 如果之前没有执行,运行 Training Kit 的 Configuration Wizard。为此,运行位于 %TrainingKitInstallationFolder%\Labs\IntroToWF\Setup 文件夹下的 CheckDependencies.cmd 脚本。安装先决条件中没有安装的软件(如有必要请重新扫描),并完成向导。
注意:为了方便,本实验中管理的许多代码都可用于 Visual Studio 代码片段。CheckDependencies.cmd 文件启动 Visual Studio 安装程序文件安装该代码片段。
练习
本次动手实验由以下练习组成:
• 使用静态 Parallel 类来并行化已有算法。
• 创建和运行并行化任务。
• 使用 Task<T> 类创建和运行可返回值的任务。
• 使用 PLINQ 并行化 LINQ 查询。
完成本实验的估计时间:60 分钟。
下一步:
练习 1:使用静态 Parallel 帮助程序类并行化已有算法
练习 1:使用静态 Parallel 帮助程序类并行化已有算法
在本练习中,您将学习如何使用静态 Parallel 帮助程序类并行化已有算法。这允许我们将 for() 替换为 Parallel.For()。
注意:要验证每个步骤是否正确执行,建议在每次任务结束时构建解决方案。
任务 1 –并行化持续运行的服务
在此任务中,您将编写一些简单的示例代码,模拟持续运行的服务调用。
您将使用本练习的初始解决方案提供的 PayrollServices.GetPayrollDeduction() 方法。这种持续运行的代码非常符合您对并行的实际需求。
1. 从 Start | All Programs | Microsoft Visual Studio 2010 | Microsoft Visual Studio 2010 打开 Microsoft Visual Studio 2010。
2. 打开 %TrainingKitInstallationFolder%\Labs\IntroToParallelExtensions\Source\Ex01-UsingStaticParallelHelper\begin 下的解决方案文件 ParallelExtLab.sln。
注意:此解决方案为您的工作提供了一个起始点,其中的帮助程序类 EmployeeList 包含您需要处理的数据。
3. 在 Visual Studio 中,打开 Program 类并导航到它的 Main() 方法。首先,我们需要创建一个员工列表,因此添加一个类变量并在 Main() 中对它执行初始化:
(代码片段 – 并行扩展库简介实验 - 练习 1 创建员工列表)
C#
class Program
{
private static EmployeeList employeeData;
static void Main(string[] args)
{
employeeData = new EmployeeList();
Console.WriteLine("Payroll process started at {0}", DateTime.Now);
var sw = Stopwatch.StartNew();
// Methods to call
Console.WriteLine("Payroll finished at {0} and took {1}",
DateTime.Now, sw.Elapsed.TotalSeconds);
Console.WriteLine();
Console.ReadLine();
}
}
4. 现在,将以下方法添加到 Program 类中。该方法将使用一个标准的 for 循环来迭代 Employees 列表(由预建代码提供)并调用持续运行的 PayrollServices.GetPayrollDeduction() 方法。代码应如下所示:
(代码片段 – 并行扩展库简介实验 - 练习 1 Ex1Task1_ParallelizeLongRunningService)
C#
private static void Ex1Task1_ParallelizeLongRunningService()
{
Console.WriteLine("Non-parallelized for loop");
for (int i = 0; i < employeeData.Count; i++)
{
Console.WriteLine("Starting process for employee id {0}",
employeeData[i].EmployeeID);
decimal span =
PayrollServices.GetPayrollDeduction(employeeData[i]);
Console.WriteLine("Completed process for employee id {0}" +
"process took {1} seconds",
employeeData[i].EmployeeID, span);
Console.WriteLine();
}
}
5. 从 Main() 中调用 Ex1Task1_ParallelizeLongRunningService 方法。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex1Task1_ParallelizeLongRunningService();
...
}
6. 编译并运行应用程序。
7. 您应该可以看到全部员工按 ID 顺序进行处理,类似于下图(完成处理的准确时间可能不同):
图 1
非并行调用持续运行服务的输出
8. 要使用并行特性,将以下方法添加到 Program 类中。该代码使用静态 Parallel 对象中的 For() 方法:
(代码片段 – 并行扩展库简介实验 - 练习 1 Ex1Task1_UseParallelForMethod)
C#
private static void Ex1Task1_UseParallelForMethod()
{
Parallel.For(0, employeeData.Count, i =>
{
Console.WriteLine("Starting process for employee id {0}",
employeeData[i].EmployeeID);
decimal span =
PayrollServices.GetPayrollDeduction(employeeData[i]);
Console.WriteLine("Completed process for employee id {0}",
employeeData[i].EmployeeID);
Console.WriteLine();
});
}
9. 将 Main() 中的当前方法调用替换为对 Ex1Task1_UseParallelForMethod() 方法的调
用。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex1Task1_UseParallelForMethod();
...
}
注意:在以上代码片段中,您可能看到了一些不熟悉的内容:
Parallel.ForEach(employeeData, ed => { /* …interesting code here… */ });
这些代码是一种 C# 3.0 语言特性,即 lambda 语句,它起源于 lambda 运算。简单来讲,lambdas 是匿名代理和/或闭包的简洁表示。
10. 编译并运行应用程序。
11. 您应该可以看到不一定要按员工的 ID 顺序来进行处理。您还可以注意到,在第一个调用返回之前调用了多次 GetPayrollDeduction() 方法。最后,您应该可以看到,与串行方式相比,并行运行调用将整个任务的完成速度提高了很多。
图 2
并行调用持续运行服务的输出
注意:由于循环采用并行方式执行,因此每次迭代都将按计划在可用的内核上运行。这意味着并不一定要按顺序来处理列表,从而避免对代码造成显著影响。您在设计代码的时候应该确保循环中的各个迭代在彼此之间都是完全独立的。任何迭代的正确运行都不能依赖于其他迭代。
12. 并行扩展库还提供了并行版本的 foreach 结构。以下代码演示了实现此结构的并行方式。将以下代码添加到 Program 类中。
(代码片段 – 并行扩展库简介实验 - 练习 1 Ex1Task1_StandardForEach)
C#
private static void Ex1Task1_StandardForEach()
{
foreach (Employee employee in employeeData)
{
Console.WriteLine("Starting process for employee id {0}",
employee.EmployeeID);
decimal span =
PayrollServices.GetPayrollDeduction(employee);
Console.WriteLine("Completed process for employee id {0}",
employee.EmployeeID);
Console.WriteLine();
}
}
13. 在 Main() 方法中,将 Parallel.For(…) 循环替换为以下代码:
C#
static void Main(string[] args)
{
...
// Methods to call
Ex1Task1_StandardForEach();
...
}
14. 编译并运行应用程序。
注意:您应该可以注意到,程序还是按 ID 顺序处理员工。还需注意完成任务所花费的总时间(准确时间会有所不同)
图 3
非并行 for…each 实现的输出
15. 要利用 for…each 结构的并行扩展实现,您需要让代码使用静态 Parallel 类中的 ForEach() 方法:
(代码片段 – 并行扩展库简介实验 - 练习 1 Ex1Task1_ParallelForEach)
C#
private static void Ex1Task1_ParallelForEach()
{
Parallel.ForEach(employeeData, ed =>
{
Console.WriteLine("Starting process for employee id {0}",
ed.EmployeeID);
decimal span = PayrollServices.GetPayrollDeduction(ed);
Console.WriteLine("Completed process for employee id {0}",
ed.EmployeeID);
Console.WriteLine();
});
}
16. 将 Main() 中的当前代码替换为一个 Ex1Task1_ParallelForEach 方法调用。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex1Task1_ParallelForEach();
...
}
17. 编译并运行应用程序。
18. 您会再次发现,并不一定要按 ID 顺序来处理员工,因为各循环是并行运行的,循环中的每次迭代都是单独在可用的内核上运行的。另外,由于应用程序利用了所有可用的内核,因此完成任务的速度要比串行方式更快。
图 4
并行 for…each 实现的输出
注意:并行扩展库还提供了一个实用的 Invoke() 方法,开发人员可以通过静态 Parallel 类访问它,从而能够并行执行匿名方法或 lambda 表达式。为了演示如何使用 Invoke 方法,我们将分析一个常用的树遍历算法,然后展示如何通过并行方式实现它,从而缩短遍历整个树所需的总时间。
在本例中,我们将遍历一个员工层次结构,并对我们遇到的各个员工调用 GetPayrollDeduction() 方法。
19. 将 Main() 中的当前方法调用替换为一个 Ex1Task1_WalkTree() 方法调用。此代码将实例化员工层次结构并调用树遍历方法。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex1Task1_WalkTree();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 1 Ex1Task1_WalkTree)
C#
private static void Ex1Task1_WalkTree()
{
EmployeeHierarchy employeeHierarchy = new EmployeeHierarchy();
WalkTree(employeeHierarchy);
}
20. 将以下方法添加到 Program 类中:
(代码片段 – 并行扩展库简介实验 - 练习 1 WalkTree)
C#
private static void WalkTree(Tree<Employee> node)
{
if (node == null)
return;
if (node.Data != null)
{
Employee emp = node.Data;
Console.WriteLine("Starting process for employee id {0}",
emp.EmployeeID);
decimal span = PayrollServices.GetPayrollDeduction(emp);
Console.WriteLine("Completed process for employee id {0}",
emp.EmployeeID);
Console.WriteLine();
}
WalkTree(node.Left);
WalkTree(node.Right);
}
21. 编译并运行应用程序。
22. 您会注意到员工按 ID 顺序进行处理。还需注意完成遍历树花费的总时间(准确时间会有所不同)
图 5
非并行树遍历的输出
注意:使用上述非并行算法遍历树时,树的结构必须能让数据按 ID 顺序写出。
23. 要采用并行方式遍历树,需要将 WalkTree() 方法结束部分的 WalkTree() 调用替换为静态 Parallel 类的 Invoke() 方法调用。
C#
private static void WalkTree(Tree<Employee> node)
{
if (node == null)
return;
if (node.Data != null)
{
Employee emp = node.Data;
Console.WriteLine("Starting process for employee id {0}",
emp.EmployeeID);
decimal span = PayrollServices.GetPayrollDeduction(emp);
Console.WriteLine("Completed process for employee id {0}",
emp.EmployeeID);
Console.WriteLine();
}
Parallel.Invoke(delegate { WalkTree(node.Left); }, delegate { WalkTree(node.Right); });
}
24. 编译并运行应用程序。
25. 您会注意到,树中的员工不再采用相同处理方式,一些节点在其他节点还没有完成时就开始处理了。还需要注意它遍历整个树所花的时间更少。
图 6
并行树遍历的输出
注意:Invoke() 方法根据内核可用性单独计划各个 WalkTree() 调用。这意味着不一定要以可预测的方式来遍历树。在设计代码时需要记住这一点。
下一步:
练习 2:创建和运行并行化任务
练习 2:创建和运行并行化任务
并行扩展库提供了一个 Task 类,可用于跨多个内核并行执行工作项。基本上,您可以将 Task 对象看作一个轻量级的工作单元,TaskManager 可以决定是否并行计划它的运行。
当 Task 对象创建之后,您需要为它们提供一个包含要执行的逻辑的代理或 lambda 语句。然后,并行扩展库的实际核心 TaskManager 将调度 Task,在不同内核的不同线程上运行。
注意:要验证每个步骤是否正确执行,建议在每次任务结束时构建解决方案。
任务 1 –本机运行并行化任务
1. 从 Start | All Programs | Microsoft Visual Studio 2010 | Microsoft Visual Studio 2010 打开 Microsoft Visual Studio 2010。
2. 打开 %TrainingKitInstallationFolder%\Labs\IntroToParallelExtensions\Source\Ex02-CreateAndRunParallelizedTasks\begin 下的解决方案文件 ParallelExtLab.sln。您也可以继续使用上一个练习完成时获得的解决方案。
3. 将 Main() 中的当前方法调用替换为 Ex2Task1_NativeParallelTasks() 方法调用。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex2Task1_NativeParallelTasks();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 2 Ex2Task1_NativeParallelTasks)
C#
private static void Ex2Task1_NativeParallelTasks()
{
Task task1 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); });
Task task2 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); });
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });
}
4. 编译并运行应用程序。
5. 您应该会注意到,在并行运行时,一些任务可能必须在 Ex2Task1_NativeParallelTasks 方法退出并且控制权返回到 Main 之后才会完成。鉴于此,输出时间也无法反映总处理时间,因为任务在控制权返回到 Main 之前并未完成。
图 7
并行运行多个任务的输出
任务 2 –使用 Wait() 和 WaitAll() 方法
并行执行任务的优势是速度更快,并且可以利用多核处理器。但是,您还应该注意到在当前实现中,主应用程序可能在处理任务的线程结束之前退出。
我们可以通过对各个 Task 对象调用 Wait() 方法来解决此可能情形。这会造成主线程等待指定任务完成之后才继续下一条指令。
1. 将 Main() 中的当前方法替换为 Ex2Task2_WaitHandling() 调用。此代码将为我们示例添加等待处理。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex2Task2_WaitHandling();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 2 Ex2Task2_WaitHandling)
C#
private static void Ex2Task2_WaitHandling()
{
Task task1 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); });
Task task2 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); });
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });
task1.Wait();
task2.Wait();
task3.Wait();
}
2. 编译并运行应用程序。
3. 您应该注意到,这次所有三个任务都在报告最终时间之前完成了,这意味着主线程已结束。
图 8
使用单独的 Wait() 条件并行运行任务的输出
注意:主线程等待 Task 对象完成之后才会继续操作。这种方法比使用 ThreadPool.QueueUserWorkItem 更加简单直观,后者涉及创建和管理手动重置事件,并且可能还需要添加互锁操作。
4. 除了各个 Task 对象上的 Wait() 方法之外,静态 Task 类还提供了一个 WaitAll() 方法,可允许您通过一次调用来等待指定的任务列表。要实际运行此方法,将任务 1、任务 2 和任务 3 的 Wait() 调用替换为以下内容:
C#
static void Main(string[] args)
{
...
// Methods to call
Ex2Task2_WaitHandlingWaitAll();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 2 Ex2Task2_WaitHandlingWaitAll)
C#
private static void Ex2Task2_WaitHandlingWaitAll()
{
Task task1 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); });
Task task2 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); });
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });
Task.WaitAll(task1, task2, task3);
}
5. 编译并运行应用程序。
6. 您应该注意到主应用程序等待所有单独的任务完成之后才会继续执行。
图 9
使用 WaitAll() 方法并行运行任务的输出
任务 3 –使用 IsCompleted 属性
您有时可能需要检查某个 Task 的完成状态,然后再执行其他任务(比如说,您可能需要运行另一个独立于第一个任务的任务),但是您并不希望使用 Wait() 方法,因为 Wait() 会阻塞 Task 所依托线程的执行。对于这些情形,Task 类公开了一个 IsCompleted 属性。这使我们能够在继续其他处理之前检查 Task 对象是否完成了自己的任务。
1. 将 Main() 中的当前方法替换为 Ex2Task3_TaskIsCompleted() 调用。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex2Task3_TaskIsCompleted();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 2 Ex2Task3_TaskIsCompleted)
C#
private static void Ex2Task3_TaskIsCompleted()
{
Task task1 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); });
while (!task1.IsCompleted)
{
Thread.Sleep(1000);
Console.WriteLine("Waiting on task 1");
}
Task task2 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); });
while (!task2.IsCompleted)
{
Thread.Sleep(1000);
Console.WriteLine("Waiting on task 2");
}
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });
while (!task3.IsCompleted)
{
Thread.Sleep(1000);
Console.WriteLine("Waiting on task 3");
}
}
2. 编译并运行应用程序。
3. 您应该注意到,任务 2 和任务 3 直到之前任务的 IsCompleted 属性变为 true 时才开始启
动。
图 10
并行运行任务并使用 IsCompleted 属性的输出
任务 4 –使用 ContinueWith() 方法
虽然 IsCompleted 属性非常适用于轮询 Task 以检查它是否完成,以便能够触发更多工作,但 Task 类提供了一种更加便捷的方法。使用 ContinueWith() 方法可以更加轻松地将任务以特定的顺序串接起来。
作为参数传递给 ContinueWith() 方法的功能将在 Task 对象的逻辑继续执行时执行。
1. 将 Main() 中的当前方法替换为 Ex2Task4_ContinueWith() 调用。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex2Task4_ContinueWith();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 2 Ex2Task4_ContinueWith)
C#
private static void Ex2Task4_ContinueWith()
{
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); })
.ContinueWith(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); })
.ContinueWith(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });
task3.Wait();
}
注意:此处,我们像往常一样创建了第一个 Task,但我们使用 ContinueWith() 方法让运行时按顺序执行后续调用。
2. 编译并运行应用程序。
3. 您应该注意到任务是按顺序执行的——员工 1、员工 2、员工 3 依次执行。
图 11
并行运行任务并使用 ContinueWith 确保其顺序和等待条件时的输出。
下一步:
练习 3:使用 Task<T> 类创建和运行可返回值的任务
练习 3:使用 Task<T> 类创建和运行可返回值的任务
如您所见,Tasks 可用于在并行环境中启动功能单元;它们还提供了一种在执行功能单元后返回值的机制。
为了演示此机制,我们将创建一个新的 Task<decimal> 实例,然后使用静态 Task.Factory.StartNew() 方法,以便以允许获取返回值的方式来执行 GetPayrollDeduction() 方法。
任务 1 –捕获任务的返回值
1. 从 Start | All Programs | Microsoft Visual Studio 2010 | Microsoft Visual Studio 2010 打开 Microsoft Visual Studio 2010。
2. 打开 %TrainingKitInstallationFolder%\Labs\IntroToParallelExtensions\Source\Ex03-UseTaskResult\begin 下的解决方案文件 ParallelExtLab.sln。您也可以继续使用上一个练习完成时获得的解决方案。
3. 将 Main() 中的当前方法替换为 Ex3Task1_TaskReturnValue() 调用。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex3Task1_TaskReturnValue();
...
}
(代码片段 – 并行扩展库简介实验 - 练习3 Ex3Task1_TaskReturnValue)
C#
private static void Ex3Task1_TaskReturnValue()
{
Console.WriteLine("Calling parallel task with return value");
var data = Task.Factory.StartNew(() =>
PayrollServices.GetPayrollDeduction(employeeData[0]));
Console.WriteLine("Parallel task returned with value of {0}",
data.Result);
}
注意:我们将通过检查 data.Result 属性来获取值。如果任务在 Result 属性被调用时已完成,那么它会立即返回所捕获的值,否则它会阻塞代码执行,直到任务完成并且可以检索值。在上面的示例中,我们刚才调用了 Result 属性,但它并不是理想的情形。Task<T> 最适合的情形是:在触发工作单元时,需要在稍后的某个时刻检索返回值。
4. 编译并运行应用程序。
5. 您会注意到任务已完成并且提供了一个返回值。
图 12
运行任务以捕获返回值的输出
下一步:
练习 4:使用 PLINQ 并行化 LINQ 查询
练习 4:使用 PLINQ 并行化 LINQ 查询
开发人员可以使用并行 LINQ (PLINQ) 优化他们的 LINQ 查询,使它们能够并行执行。
并行扩展库提供了许多不同的方式来为 LINQ 查询实现并行机制。PLINQ 为我们提供了一个 System.Linq.ParallelEnumerable 类,它提供了与 System.Linq.Enumerable 类相似的功能。
任务 1 –使用 ParallelEnumerable 类的静态方法并行化 LINQ
在此任务中,您将继续使用之前练习中的解决方案。
1. 从 Start | All Programs | Microsoft Visual Studio 2010 | Microsoft Visual Studio 2010 打开 Microsoft Visual Studio 2010。
2. 打开%TrainingKitInstallationFolder%\Labs\IntroToParallelExtensions\Source\Ex04-PLINQ\begin 下的解决方案文件 ParallelExtLab.sln。您也可以继续使用上一个练习完成时获得的解决方案。
3. 将 Main() 中的当前方法替换为 Ex4Task1_PLINQ() 调用。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex4Task1_PLINQ();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 4 Ex4Task1_PLINQ)
C#
static void Ex4Task1_PLINQ()
{
var q = Enumerable.Select(
Enumerable.OrderBy(
Enumerable.Where(employeeData,
x => x.EmployeeID % 2 == 0),
x => x.EmployeeID),
x => PayrollServices.GetEmployeeInfo(x))
.ToList();
foreach (var e in q)
{
Console.WriteLine(e);
}
}
注意:Select()、OrderBy() 和 Where() 方法是对 IEnumerable<T> 类的扩展方法,但我们在此处将以静态方式访问它们。我们稍后将演示一种更加简单的用法。
ToList() 调用供演示之用,而在生产代码中通常是不需要的。此处使用它的原因是,我们希望立即触发 LINQ 查询来收集所有 Employee Info 字符串,并在稍后将它们输出到屏幕。
如果将 ToList() 删除,则查询仍然可以按 Employee ID 的顺序触发,但各个 GetEmployeeInfo() 调用必须在 foreach 循环迭代了 IEnumerable<T> 之后才会触
发。这称作延迟执行。
有关更多信息,请参阅 Scott Wisniewski 的文章:https://msdn.microsoft.com/en-us/magazine/cc163378.aspx。
4. 编译并运行应用程序。
5. 您应该会注意到,LINQ 查询是按员工 ID 的顺序执行的。还需注意完成任务花费的总时间(准确时间会有所不同):
图 13
非并行 LINQ 查询的输出
6. 可以使用静态 ParallelEnumerable 类中的相同 LINQ 方法来并行化此查询。另外,您需要向查询的数据源添加一个 AsParallel() 调用。修改 Main() 方法,让它只调用 PLINQAsParallel 方法。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex4Task1_PLINQAsParallel();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 4 Ex4Task1_PLINQAsParallel)
C#
static void Ex4Task1_PLINQAsParallel()
{
var q = ParallelEnumerable.Select(
ParallelEnumerable.OrderBy(
ParallelEnumerable.Where(employeeData.AsParallel(),
x => x.EmployeeID % 2 == 0),
x => x.EmployeeID),
x => PayrollServices.GetEmployeeInfo(x))
.ToList();
foreach (var e in q)
{
Console.WriteLine(e);
}
}
注意:Select()、OrderBy() 和 Where() 方法(之前由 Enumerable 静态类调用)现在将由 ParallelEnumerable 类调用。还需注意,数据源中添加了一个 AsParallel() 调用。
7. 编译并运行应用程序。
8. 您应该注意到 LINQ 查询不再以特定顺序执行操作。还需注意,在本例中,并行版本的程序要比非并行版本的程序运行更快(在双核机器上,运行时间大概减少了一半;您的结果因所使用的硬件而异)。
图 14
并行 LINQ 查询的输出
注意:操作将以物理内核允许的最大并行操作数量并行执行。
任务 2 –使用 ParallelEnumerable 类的扩展方法并行化 LINQ
如前所述,使用 Enumerable 和 ParallelEnumerable 类的静态 LINQ 的一种更加简洁的方法是将它们作为扩展方法使用。
1. 将使用扩展方法实现的非并行 LINQ 查询转换成 PLINQ 查询非常简单。将 Main() 方法中的 PLINQ 查询替换为以下 LINQ 查询:
C#
static void Main(string[] args)
{
...
// Methods to call
Ex4Task2_Extensions();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 4 Ex4Task2_Extensions)
C#
private static void Ex4Task2_Extensions()
{
var q = employeeData.
Where(x => x.EmployeeID % 2 == 0).OrderBy(x => x.EmployeeID)
.Select(x => PayrollServices.GetEmployeeInfo(x))
.ToList();
foreach (var e in q)
{
Console.WriteLine(e);
}
}
注意: ToList() 仍然用于立即执行 LINQ 查询,而不是等待 foreach 在稍后迭代执行 Select() 返回的 IEnumerable<T> 时才执行。我们将避免延迟执行。
2. 编译并运行应用程序。
3. 您应该会注意到,LINQ 查询是按员工 ID 的顺序执行的。还需注意完成任务花费的总时间(准确时间会有所不同):
图 15
使用扩展方法的非并行 LINQ 查询的输出
4. 要并行化此 LINQ 查询,只需要将 Main() 中的当前方法替换为 AsParallel() 方法。
C#
static void Main(string[] args)
{
...
// Methods to call
Ex4Task2_ConvertToParallelExtensions();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 4 Ex4Task2_ConvertToParallelExtensions)
C#
private static void Ex4Task2_ConvertToParallelExtensions()
{
var q = employeeData.AsParallel()
.Where(x => x.EmployeeID % 2 == 0).OrderBy(x => x.EmployeeID)
.Select(x => PayrollServices.GetEmployeeInfo(x))
.ToList();
foreach (var e in q)
{
Console.WriteLine(e);
}
}
5. 编译并运行应用程序。
6. 您应该会注意到,与基于 ParallelEnumerable 静态类的 LINQ 查询类似,作为扩展方法实现的 PLINQ 查询不会再按 EmployeeID 的顺序执行操作。操作将以物理内核允许的最大并行操作数量并行执行。还需注意,与之前的并行化 LINQ 示例相同,并行程序完成所需的时间大约只有非并行程序的一半。
图 16
使用扩展方法的并行 LINQ 查询的输出
任务 3 –使用支持查询理解语法 (Query Comprehension Syntax) 的 AsParallel()
在此任务中,您将使用并行扩展库和 AsParallel() 方法创建使用查询理解语法的并行 LINQ 查询。
1. 将 Main() 方法中的 PLINQ 查询替换为以下查询理解语法:
C#
static void Main(string[] args)
{
...
// Methods to call
Ex4Task3_PLINQComprehensionSyntax();
...
}
(代码片段 – 并行扩展库简介实验 - 练习 4 Ex4Task3_PLINQComprehensionSyntax)
C#
private static void Ex4Task3_PLINQComprehensionSyntax()
{
var q = from e in employeeData.AsParallel()
where e.EmployeeID % 2 == 0
orderby e.EmployeeID
select PayrollServices.GetEmployeeInfo(e);
foreach (var e in q)
{
Console.WriteLine(e);
}
}
2. 编译并运行应用程序。
3. 您应该会注意到,虽然 LINQ 语法发生了变化,但数据仍然采用与 ParallelEnumerable 扩展方法中相同的并行方式执行。
图 17
使用查询理解语法的并行 LINQ 查询的输出
下一步:
总结
总结
在本实验中,您通过操作并行扩展库理解了它的各种特性,从而能以简单、可控的方式执行并行任务。您已经了解了如何使用 Parallel 和 Task 等并行扩展类来管理工作单元。您使用了
Wait()、WaitAll()、IsComplete() 和 ContinueWith() 等并行扩展特性来控制执行流程。您还了解了如何使用 PLINQ 来处理并行化查询。
本实验帮助您很好地掌握了并行扩展库以及它的优势。有关更多信息,我们建议您访问以下站点:
• MSDN 上的并行扩展博客:https://blogs.msdn.com/pfxteam/
• MSDN 上的并行计算论坛:https://forums.microsoft.com/MSDN/default.aspx?ForumGroupID=551&SiteID=1
并行计算开发人员中心:https://msdn.microsoft.com/en-us/library/ms792872.aspx