借助 C++ 进行 Windows 开发

Windows 中的线程和 I/O 的演变

Kenny Kerr

 

Kenny Kerr当开始一个新项目时,您是否会自问一下,您的程序将是计算密集型的还是 I/O 密集型的?应该问一下。我发现,在大多数情况下,程序要么是计算密集型的,要么是 I/O 密集型的。您可能正在处理拥有大量数据的分析库,并让一组处理器保持忙碌状态,同时将数据分解为一系列聚合。此外,您的代码可能将其大多数时间花在等待发生事件、等待数据通过网络到达、等待用户单击某项内容或执行某种复杂的六指手势等方面。在这种情况下,程序中的线程派不上大用场。当然,也存在程序同时属于 I/O 密集型和计算密集型的情况。SQL Server 数据库引擎此时可发挥作用,但它对于现今的计算机编程并不太常用。您的程序往往要执行协调他人工作的任务。这可能是 Web 服务器或客户端与 SQL 数据库通信、将一些计算推送到 GPU 或提供某些内容供用户进行交互。考虑到所有这些不同情况,您如何决定您的程序需要什么样的线程功能,以及什么样的并发性构造块是必需或有用的呢?当然,这通常是很难回答的问题,当您着手新项目时,您需要进行某些分析。但它有助于我们理解线程在 Windows 和 C++ 中的演变过程,以便您能够基于所提供的实用选择做出明智的决定。

当然,对于用户而言,无论如何线程也不提供任何直接值。如果您使用的线程数量是另一个程序的两倍,则您的程序将不再令人感到麻烦。原因就是这些线程发挥了作用。为了阐述这些想法以及线程随时间推移演化的方式,我现在以从某个文件中读取一些数据为例进行说明。我将跳过 C 和 C++ 库,因为它们对于 I/O 的支持主要是为了适应同步 I/O 或阻塞 I/O;这通常没什么意义,除非您构建简单的控制台程序。当然,这也没什么不妥的。我所喜欢的程序中就有一些是控制台程序,这些程序就执行一项任务,而且效果确实不错。但这确实没什么让人真正感兴趣的,因此我就继续介绍其他内容了。

一个线程

首先,我介绍 Windows API 以及老掉牙但好用且名称非常贴切的 ReadFile 函数。在可以开始读取文件的内容之前,我需要一个指向此文件的句柄,此句柄由功能极其强大的 CreateFile 函数提供:

auto fn = L"C:\\data\\greeting.txt";
auto f = CreateFile(fn, GENERIC_READ, 
  FILE_SHARE_READ, nullptr,
  OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr);
ASSERT(f);

为了让示例保持简洁,我只使用 ASSERT 和 VERIFY 宏作为占位符,以指示您需要在何处添加一些错误处理,以管理由各种 API 函数报告的任何故障。 在此代码段中,CreateFile 函数用于打开文件,而不是创建文件。 此同一个函数用于执行这两个操作。 名称中的 Create(创建)更多地强调创建内核文件对象这一事实,而不侧重说明是否在文件系统中创建了文件。 参数非常容易理解,与此处的谈论没多大关系,但倒数第二个参数例外,通过此参数可以指定一组标记和属性,以指示内核中您需要的 I/O 行为类型。 在此情况下,我使用了 FILE_ATTRIBUTE_NORMAL 常量,该常量仅仅指示应打开此文件以实现正常的同步 I/O。 当您准备就绪后,记得调用 CloseHandle 函数,以释放此文件上的内核锁定。 句柄包装类(例如我在 2011 年 7 月的专栏“C++ 和 Windows API”中介绍的一个句柄包装类,文章网址为 msdn.microsoft.com/magazine/hh288076)将获得成功。

现在我们可以继续,调用 ReadFile 函数以将文件的内容读取到内存中:

char b[64];
DWORD c;
VERIFY(ReadFile(f, b, sizeof(b), &c, nullptr));
printf("> %.*s\n", c, b);

正如您所预期的那样,第一个参数指定指向此文件的句柄。 接下来的两个参数描述应将文件的内容读取到其中的内存。 如果可用字节数少于所请求的字节数,则 ReadFile 还将返回复制的实际字节数。 最后一个参数仅用于异步 I/O,稍后我将回过头来介绍此参数。 在这个简单的示例中,我仅仅输出了从文件中实际读取的字符。 当然,如果需要,您可能需要多次调用 ReadFile。

两个线程

这种 I/O 模型很容易掌握,当然对于许多小型程序非常有用,尤其适用于基于控制台的程序。 但它无法很好地进行扩展。 如果您需要同时读取两个单独的文件(可能是为了支持多个用户),您将需要两个线程。 没问题,CreateThread 函数可派上用场了。 下面是一个简单的示例:

auto t = CreateThread(nullptr, 0, 
  [] (void *) -> DWORD
{
  CreateFile/ReadFile/CloseHandle
  return 0;
},
nullptr, 0, nullptr);
ASSERT(t);
WaitForSingleObject(t, INFINITE);

在此,我使用一个无状态的 lambda(而非回调函数)来表示线程过程。 Visual C++ 2012 编译器符合 C++11 语言规范,因为无状态 lambda 必须可隐式转换为函数指针。 这样就很方便了,Visual C++ 编译器通过在 x86 体系结构(此体系结构支持各种调用约定)上自动生成适当的调用约定,效果将更胜一筹。

CreateThread 函数返回一个表示线程的句柄,然后我使用 WaitForSingleObject 函数进行等待。 当读取文件时,线程自身就会受阻。 通过这种方法,我可以让多个线程协同执行不同的 I/O 操作。 然后,我可以调用 WaitForMultipleObjects 进行等待,直到所有线程都已完成。 也请记得调用 CloseHandle,以释放内核中与线程相关的资源。

但是,这种技术只有在用户或文件较少的情况下才具有可扩展性,但无论如何,可扩展性向量对于您的程序而言都至关重要。 很明显,这并不是多个未完成的读取操作无法扩展。 恰恰相反! 而正是线程和同步开销终止了程序的可扩展性。

回到一个线程

此问题的一种解决方案是通过异步过程调用 (APC) 使用称为可报警 I/O 的概念。 在此模型中,程序依赖于内核将其与每个线程关联的 APC 的队列。 APC 有两种变体:内核模式和用户模式。 也就是说,队列中的过程或函数可能属于用户模式下的程序,或者属于某个内核模式驱动程序。 后者对于内核而言是一种简单的方法,使驱动程序能够在线程的用户模式地址空间的上下文中执行某些代码,以便它能够访问其虚拟内存。 但这种方法也可供用户模式编程人员使用。 因为从根本上讲,I/O 在硬件上(因此在内核中)无论如何都是异步的,所以,应该开始读取文件的内容,当 I/O 最终结束时,应让内核将 APC 排入队列中。

首先,传递到 CreateFile 函数的标记和属性必须更新以提供重叠的 I/O,这样,内核就不会对针对此文件的操作进行序列化。 术语“异步”和“重叠”在 Windows API 中可互换使用,它们具有相同的含义。 不管怎样,当创建文件句柄时,必须使用 FILE_FLAG_OVERLAPPED 常量:

auto f = CreateFile(fn, GENERIC_READ, FILE_SHARE_READ, nullptr,
  OPEN_EXISTING, FILE_FLAG_OVERLAPPED, nullptr);

再次说明,此代码段中的唯一差别是我将 FILE_ATTRIBUTE_NORMAL 常量替换为 FILE_FLAG_OVERLAPPED 常量,但运行时的差异非常大。 为了实际提供内核可在 I/O 完成时排队的 APC,我需要使用可选的 ReadFileEx 函数。 尽管可以使用 ReadFile 发起异步 I/O,但只有 ReadFileEx 能让您在 I/O 结束时提供要调用的 APC。 然后,线程可以继续执行其他有用的工作,可能是启动其他异步操作,而 I/O 在后台完成。

再次说明,由于采用 C++11 和 Visual C++,因此可以使用 lambda 来表示 APC。 问题在于:APC 可能想要访问新填充的缓冲区,但这不是 APC 的参数之一,并且由于只允许使用无状态的 lambda,所以,无法使用 lambda 来捕获缓冲区变量。 解决方案是让缓冲区在 OVERLAPPED 结构中挂起(可以这样说)。 因为指向 OVERLAPPED 结构的指针可用于 APC,所以您只需将结果强制转换为您选择的结构。 图 1 提供了一个简单的示例。

图 1 可警报 I/O 以及 APC

struct overlapped_buffer
{
  OVERLAPPED o;
  char b[64];
};
overlapped_buffer ob = {};
VERIFY(ReadFileEx(f, ob.b, sizeof(ob.b), 
  &ob.o, [] (DWORD e, DWORD c,
  OVERLAPPED * o)
{
  ASSERT(ERROR_SUCCESS == e);
  auto ob = reinterpret_cast<overlapped_buffer *>(o);
  printf("> %.*s\n", c, ob->b);
}));
SleepEx(INFINITE, true);

除了 OVERLAPPED 指针之外,APC 还提供错误代码作为其第一个参数,并提供复制的字节数作为其第二个参数。 在某一时间,I/O 结束,但为了使 APC 运行,必须将同一个线程放入可警报的状态中。 为此,最简单的方法是使用 SleepEx 函数,只要 APC 排入队列,此函数就会唤醒线程,并在返回控制权之前执行任何 APC。 当然,如果队列中已经有 APC,则线程可能根本不会挂起。 还可以检查 SleepEx 的返回值,以找出是什么导致线程得到恢复。 您甚至可以使用零值(而非 INFINITE)来刷新 APC 队列,然后继续操作而不延迟。

但是,使用 SleepEx 并非在所有情况下都有用,并容易导致不择手段的编程人员轮询 APC,这从来就不是一个好主意。 很可能发生的情况是:如果您从单个线程使用异步 I/O,则此线程也是您程序的消息循环。 再者,您还可以使用 MsgWaitForMultipleObjectsEx 函数来等待除 APC 之外的其他对象,并为您的程序构建更具有吸引力的单线程运行时。 APC 的潜在缺点是它们可能引入一些复杂的重入错误,因此请务必记住这一点。

每个处理器一个线程

当您查找程序要执行的更多任务时,您可能会注意到:运行程序线程的处理器变得越来越忙,而计算机上的其他处理器却无所事事正在等待任务。 尽管 APC 是最高效地执行异步 I/O 的方法,但它们具有明显的缺点:它们只在启动操作的同一个线程上完成。 此时的问题是要制订一个解决方案,将此工作扩展到所有可用的处理器上。 您可能会构思出您自己的独特设计,或许是通过可警报的消息循环在若干线程间协调工作,但您所做的将无法实现 I/O 完成端口的全部性能和可扩展性,这在很大程度上是因为它与内核的不同部分深度集成。

尽管 APC 使异步 I/O 操作能够在单一线程上完成,但完成端口允许任何线程开始 I/O 操作并让任意线程处理结果。 完成端口是您创建的一个内核对象,创建后,您可以将其与任何数量的文件对象、套接字、管道等相关联。 完成端口公开一个排队接口,通过此接口,内核可以在 I/O 完成时向队列中推送一个完成包,而程序可以在任意可用线程上取消该包排队,并根据需要处理此包。 如果需要,您甚至可以将自己的完成包排入队列。 主要难题在于容易产生混淆的 API。 图 2 提供了完成端口的一个简单包装类,同时明确了如何使用函数以及函数如何相关。

图 2 完成端口包装

class completion_port
{
  HANDLE h;
  completion_port(completion_port const &);
  completion_port & operator=(completion_port const &);
public:
  explicit completion_port(DWORD tc = 0) :
    h(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, tc))
  {
    ASSERT(h);
  }
  ~completion_port()
  {
    VERIFY(CloseHandle(h));
  }
  void add_file(HANDLE f, ULONG_PTR k = 0)
  {
    VERIFY(CreateIoCompletionPort(f, h, k, 0));
  }
  void queue(DWORD c, ULONG_PTR k, OVERLAPPED * o)
  {
    VERIFY(PostQueuedCompletionStatus(h, c, k, o));
  }
  void dequeue(DWORD & c, ULONG_PTR & k, OVERLAPPED *& o)
  {
    VERIFY(GetQueuedCompletionStatus(h, &c, &k, &o, INFINITE));
  }
};

主要混淆围绕着 Create­IoCompletionPort 函数执行的双重职责:首先是实际创建一个完成端口对象,然后将其与重叠的文件对象相关联。 完成端口只创建一次,然后与任何数量的文件关联。 从技术上来说,您可以在单一调用中同时执行这两个步骤,但这仅当您将完成端口用于单一文件时才有用,那这样又有什么意义呢?

当创建完成端口时,唯一的注意事项是指示线程计数的最后一个参数。 这是允许用来并发对完成包取消排队的最大线程数。 如果将此参数设置为零,则意味着内核将允许每个处理器一个线程。

添加文件在技术上称作关联;要注意的主要事项是一个参数,此参数指示要与文件关联的键。 因为您无法在句柄结束时挂起额外信息(但使用 OVERLAPPED 结构时可以挂起),所以该键提供了一种方法,使您能够将某些程序特定的信息与文件相关联。 只要内核将与此文件相关的完成包排入队列,也就将包括此键。 这一点特别重要,因为文件句柄甚至不包括在完成包中。

正如前面所述,您可以将自己的完成包排入队列。 在这种情况下,您提供的值完全由您决定。 内核不关注这些值,也不试图以任何方式解释它们。 这样,您可以提供一个伪 OVERLAPPED 指针,完全相同的地址将存储在完成包中。

但是,在大多数情况下,一旦异步 I/O 操作完成,您将等待内核将完成包排入队列。 通常,程序会对每个处理器创建一个或多个线程,并在无限循环中调用 GetQueuedCompletionStatus 或我的取消队列包装函数。 当程序需要结束并且您希望这些线程终止时,您可能要将一个特殊的控制完成包排入队列(每个线程一个包)。 对于 APC,您可以在 OVERLAPPED 结构中挂起更多信息,以便将额外信息与每个 I/O 操作关联:

completion_port p;
p.add_file(f);
overlapped_buffer ob = {};
ReadFile(f, ob.b, sizeof(ob.b), nullptr, &ob.o);

此处,我再次使用最初的 ReadFile 函数,但在此情况下,我提供一个指向 OVERLAPPED 结构的指针作为其最后一个参数。 一个等待线程可能会取消完成包的排队,如下所示:

DWORD c;
ULONG_PTR k;
OVERLAPPED * o;
p.dequeue(c, k, o);
auto ob = reinterpret_cast<overlapped_buffer *>(o);

池线程

如果您关注我的专栏已经有一段时间了,您会记得我去年花了五个月时间详细介绍了 Windows 线程池。 此同一个线程池 API 是使用 I/O 完成端口实现的,同时提供了这一相同的工作队列模型但无需您自行管理线程,这一点对您来说也不会觉得奇怪。 它还提供了一系列功能和便利性,这使其成为一种具有吸引力的替代方法,使您不必直接使用完成端口对象。 如果您尚未采用上述方法,我建议您阅读这些专栏,以尽快采用 Windows 线程池 API。 可通过 bit.ly/StHJtH 获得我的在线专栏列表。

至少,您可以使用 TrySubmitThreadpoolCallback 函数来获取线程池,以便在内部创建其工作对象之一,并将其回调立即提交以供执行。 这一过程非常简单,如下所示:

TrySubmitThreadpoolCallback([](PTP_CALLBACK_INSTANCE, void *)
{
  // Work goes here!
},
nullptr, nullptr);

如果您需要多一些控制权,您当然可以直接创建一个工作对象,并将其与线程池环境和清理组相关联。 这种方法还可以向您提供最佳性能。

当然,此处讨论的内容是关于重叠 I/O 的,线程池只是为这种情况提供 I/O 对象。 我不想在这方面花费很多时间,因为我已经在我的 2011 年 12 月的专栏“线程池计时器和 I/O”(msdn.microsoft.com/magazine/hh580731) 中探讨了这一内容,但图 3 提供了一个新示例。

图 3 线程池 I/O

OVERLAPPED o = {};
char b[64];
auto io = CreateThreadpoolIo(f, [] (PTP_CALLBACK_INSTANCE, 
  void * b,   void *, ULONG e, ULONG_PTR c, PTP_IO)
{
  ASSERT(ERROR_SUCCESS == e);
  printf("> %.*s\n", c, static_cast<char *>(b));
},
b, nullptr);
ASSERT(io);
StartThreadpoolIo(io);
auto r = ReadFile(f, b, sizeof(b), nullptr, &o);
if (!r && ERROR_IO_PENDING != GetLastError())
{
  CancelThreadpoolIo(io);
}
WaitForThreadpoolIoCallbacks(io, false);
CloseThreadpoolIo(io);

考虑到 CreateThreadpoolIo 让我向排队的回调传递一个附加的上下文参数,因此我不需要将缓冲区从 OVERLAPPED 结构中挂起,尽管需要时我完全可以这么做。 要记住的主要事项是必须在开始异步 I/O 操作之前调用 StartThreadpoolIo,并且如果 I/O 操作失败或内联完成,则必须调用 CancelThreadpoolIo。

快速、流畅的线程

在将线程池的概念提到新的高度后,适用于 Windows 应用商店应用程序的新 Windows API 还提供了线程池抽象,但这种抽象要简单得多并且其功能也少得多。 幸运的是,没有什么可以阻止您使用适合您的编译器和库的备选线程池。 无论您过去是怎样获得这一线程池的,友好的 Windows 应用商店管理者都是另一回事。 然而,Windows 应用商店应用程序的线程池还是值得一提,它集成了由适用于 Windows 应用商店应用程序的 Windows API 所体现的异步模式。

通过使用灵活的 C++/CX 扩展,可以提供相对简单的 API 来以异步方式运行一些代码:

ThreadPool::RunAsync(ref new WorkItemHandler([] (IAsyncAction ^)
{
  // Work goes here!
}));

从语法上讲,这是十分简单的。 我们甚至希望,如果编译器可以自动从 lambda 生成 C++/CX 委托(至少在概念上是这样),这与它目前针对函数指针生成此类委托一样,则上述代码在将来的 Visual C++ 版本中将变得更简单。

然而,这种比较简单的语法掩盖了大量的复杂性。 从较高层次来说,ThreadPool 是一个静态类(这是从 C# 语言中借来的术语),因此无法创建。 它提供了静态 RunAsync 方法的一些开销,仅此而已。 每个线程池都至少将一个委托作为其第一个参数。 此处我使用 lambda 构造此委托。 RunAsync 方法还返回一个 IAsyncAction 接口,同时提供对异步操作的访问。

为方便起见,假设这种方法效果非常好,并与适用于 Windows 应用商店应用程序的 Windows API 中普遍采用的异步编程模型完美集成。 例如,您可以在一个并行模式库 (PPL) 任务中包装由 RunAsync 方法返回的 IAsyncAction 接口并实现可组合性级别,这一可组合性级别与我在以下文章中介绍的级别相似:我的九月份和十月份专栏“追求高效的可组合异步系统”(msdn.microsoft.com/magazine/jj618294) 和“回到可恢复函数构筑的未来”(msdn.microsoft.com/magazine/jj658968)。

但是,认识到这些看上去乏味的代码真正表示什么,这是非常有用的并且在一定程度上会使您冷静下来。 C++/CX 扩展的核心是基于 COM 的运行时及其 IUnknown 接口。 此类基于接口的对象模型不可能提供静态方法。 此时必须有一个可成为接口的对象,并且必须有某种类工厂来创建该对象,而实际上确实就有。

Windows 运行时定义了称为运行时类的对象,它与传统的 COM 类非常相似。 如果您是守旧派,您甚至可以在 IDL 文件中定义该类,并通过专门针对此任务的新版 MIDL 编译器来运行该类,此时它将生成 .winmd 元数据文件和适当的标头。

运行时类可以同时具有实例方法和静态方法。 它们是使用单独的接口定义的。 在生成的元数据中,包含实例方法的接口成为类的默认接口,而包含静态方法的接口归属于运行时类。 在这种情况下,ThreadPool 运行时类缺少可激活的属性且没有默认接口,但一旦创建,就可以对静态接口进行查询,然后可以调用那些不是那么静态的方法。 图 4 提供了其中可能包含的内容的示例。 请记住,其中大部分内容是编译器生成的,但它会让您很好地了解如下这一点:进行这种简单的静态方法调用来以异步方式运行委托,真正的价值体现在哪些方面。

图 4 WinRT 线程池

class WorkItemHandler :
  public RuntimeClass<RuntimeClassFlags<ClassicCom>,
  IWorkItemHandler>
{
  virtual HRESULT __stdcall Invoke(IAsyncAction *)
  {
    // Work goes here!
return S_OK;
  }
};
auto handler = Make<WorkItemHandler>();
HSTRING_HEADER header;
HSTRING clsid;
auto hr = WindowsCreateStringReference(
  RuntimeClass_Windows_System_Threading_ThreadPool, 
  _countof(RuntimeClass_Windows_System_Threading_ThreadPool)
  - 1, &header, &clsid);
ASSERT(S_OK == hr);
ComPtr<IThreadPoolStatics> tp;
hr = RoGetActivationFactory(
  clsid, __uuidof(IThreadPoolStatics),
  reinterpret_cast<void **>(tp.GetAddressOf()));
ASSERT(S_OK == hr);
ComPtr<IAsyncAction> a;
hr = tp->RunAsync(handler.Get(), a.GetAddressOf());
ASSERT(S_OK == hr);

毫无疑问,要实现调用 TrySubmitThreadpoolCallback 函数的相对简单性和高效率,还有很长一段路要走。 了解您所使用的抽象的代价很有帮助,即使您已经通过某种效率度量手段证明了这种代价是物有所值的也不例外。 我简短地展开介绍一下。

WorkItemHandler 委托实际上是一个基于 IUnknown 并带有一个 Invoke 方法的 IWorkItemHandler 接口。 此接口的实现不是由 API 而是由编译器提供的。 这样就合情合理了,因为它为 lambda 捕获的任何变量提供了一个便利的容器,并且 lambda 的主体自然位于编译器生成的 Invoke 方法中。 在本例中,我仅仅依赖 Windows 运行时库 (WRL) RuntimeClass 模板类来实现 IUnknown。 然后,我可以使用便利的 Make 模板函数来创建我的 WorkItemHandler 的实例。 对于无状态的 lambda 和函数指针,我进一步预期编译器将生成静态实现以及 IUnknown 的 no-op 实现,以避免动态分配开销。

为了创建运行时类的实例,我需要调用 RoGet­ActivationFactory 函数。 然而,它需要类 ID。 注意,这不是传统 COM 的 CLSID,而是类型(在这种情况下为 Windows.System.Threading.ThreadPool)的完全限定名称。 此处我使用 MIDL 编译器生成的常量数组,以避免不得不在运行时对字符串计数。 似乎这还不够,我还需要创建此类 ID 的 HSTRING 版本。 此处,我使用 WindowsCreateStringReference 函数,此函数与常规 WindowsCreateString 函数不同,它不创建源字符串的副本。 为方便起见,WRL 还提供了包装此功能的 HStringReference 类。 现在我可以调用 RoGetActivationFactory 函数,同时直接请求 IThreadPoolStatics 接口并将生成的指针存储在 WRL 提供的智能指针中。

现在,我最终可以对此接口调用 RunAsync 方法,向其提供我的 IWorkItemHandler 实现以及表示所生成的操作对象的 IAsyncAction 智能指针的地址。

此线程池 API 提供的功能和灵活性与核心 Windows 线程池 API 或并发运行时提供的功能和灵活性无法相提并论,这一点可能也并不奇怪。 然而,C++/CX 和运行时类的优势是沿着程序与运行时自身之间的边界实现的。 作为 C++ 编程人员,您可能对 Windows 8 不是全新平台以及传统的 Windows API 仍可在您需要时供您随意使用而感到欣慰。

Kenny Kerr 是一位热衷于本机 Windows 开发的软件专家。 您可以通过 kennykerr.ca 与他联系。

衷心感谢以下技术专家对本文的审阅: James P. McNellis