数据流(任务并行库)

任务并行库 (TPL) 提供数据流组件,可帮助提高启用并发的应用程序的可靠性。 这些数据流组件统称为 TPL 数据流库。 这种数据流模型通过向粗粒度的数据流和管道任务提供进程内消息传递来促进基于角色的编程。 数据流组件基于 TPL 的类型和计划基础结构,并集成了 C#、Visual Basic 和 F# 语言的异步编程支持。 当您有必须相互异步沟通的多个操作或者想要在数据可用时对其处理时,这些数据流组件就非常有用。 例如,请考虑一个处理网络摄像机图像数据的应用程序。 通过使用数据流模型,当图像帧可用时,应用程序就可以处理它们。 如果应用程序增强图像帧(例如执行灯光修正或消除红眼),则可以创建数据流组件的管道。 管道的每个阶段可以使用更粗粒度的并行功能(例如 TPL 提供的功能)来转换图像。

本文档对 TPL 数据流库进行了概述。 它介绍编程模型,预定义的数据流块类型,以及如何配置数据流块来满足应用程序的特定要求。

注意

TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow 包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow

编程模型

TPL 数据流库向具有高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操作的应用程序的并行化和消息传递提供了基础。 它还能显式控制缓存数据的方式以及在系统中移动的方式。 为了更好地了解数据流编程模型,请考虑一个以异步方式从磁盘加载图像并创建复合图像的应用程序。 传统编程模型通常需要使用回调和同步对象(例如锁)来协调任务和访问共享数据。 通过使用数据流编程模型,您可以从磁盘读取时创建处理图像的数据流对象。 在数据流模型下,您可以声明当数据可用时的处理方式,以及数据之间的所有依赖项。 由于运行时管理数据之间的依赖项,因此通常可以避免这种要求来同步访问共享数据。 此外,因为运行时计划基于数据的异步到达,所以数据流可以通过有效管理基础线程提高响应能力和吞吐量。 有关在 Windows 窗体应用程序中使用数据流编程模型实现图像处理的示例,请参阅演练:在 Windows 窗体应用程序中使用数据流

源和目标

TPL 数据流库包括数据流块,它是缓冲并处理数据的数据结构。 TPL 定义了三种数据流块:源块、目标块和传播器块。 源块作为数据源,可以读取。 目标块作为数据接收方,可以写入。 传播器块作为源块和目标块,可以读取和写入。 TPL 定义 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 接口来表示源,System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 表示目标以及 System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> 表示传播器。 IPropagatorBlock<TInput,TOutput> 继承自 ISourceBlock<TOutput>ITargetBlock<TInput>

TPL 数据流库提供了多个预定义的数据流块类型,可以实现 ISourceBlock<TOutput>ITargetBlock<TInput>IPropagatorBlock<TInput,TOutput> 接口。 这些数据流块类型在本文档的预定义的数据流块类型部分进行了说明。

连接块

可以连接数据流块来形成管道(这是数据流块的线性序列),或网络(这是数据流块的图形)。 管道是网络的一种形式。 在管道或网络中,当数据可用时源向目标异步传播数据。 ISourceBlock<TOutput>.LinkTo 方法将源数据流块链接到目标块。 源可以链接到零个或多个目标;目标可以从零个或多个源进行链接。 您可以同时向管道或网络中添加或从其移除数据流块。 预定义的数据流块类型处理所有的建立或释放链接的线程安全性。

有关连接数据流块以形成基本管道的示例,请参阅演练:创建数据流管道。 有关连接数据流块以形成更复杂网络的示例,请参阅演练:在 Windows 窗体应用程序中使用数据流。 有关在源向目标传递消息后从源取消目标链接的示例,请参阅如何:取消链接数据流块

筛选

当您调用 ISourceBlock<TOutput>.LinkTo 方法将源链接到目标时,您可以根据消息的值提供一个委托来决定目标块是接受还是拒绝该消息。 这种筛选机制很有用,它可以保证数据流块只接收特定值。 对于大多数预定义的数据流块类型,如果源块连接到多个目标块,那么当目标块拒绝消息时,源将向下一个目标提供该消息。 源向目标提供消息的顺序是按源定义的,可以根据源类型的不同而不同。 一个目标接受消息后,大多数源块类型会停止提供该消息。 此规则的例外情况是 BroadcastBlock<T> 类,这个类向所有目标提供每条消息,即使某些目标拒绝消息。 有关使用筛选功能来仅处理特定消息的示例,请参阅演练:在 Windows 窗体应用程序中使用数据流

重要

由于每个预定义源数据流块类型确保了消息是按照它们接收的顺序来传播的,因此每一条消息都必须在源块可以处理下一条消息之前从源块读取。 因此,当您使用筛选向一个源连接多个目标时,请确保至少一个目标块能够接收每一条消息。 否则,您的应用程序可能发生死锁。

消息传递

数据流编程模型与消息传递这一概念相关,其中程序的独立组件通过发送消息相互通信。 在应用组件间传播消息的一种方法是,调用 PostDataflowBlock.SendAsync 方法,向目标数据流块发送消息(Post 同步运行,SendAsync 异步运行),再调用 ReceiveReceiveAsyncTryReceive 方法接收源数据流块发送的消息。 您可以通过向头节点(目标块)发送输入数据,从管道的终端节点或网络的终端节点(一个或多个源块)接收输出数据来使用数据流管道或网络组合使用这些方法。 您还可以使用 Choose 方法从提供的第一个拥有可用数据的源读取数据,并对该数据执行操作。

源数据流块通过调用方法 ITargetBlock<TInput>.OfferMessage 向目标数据流块提供数据。 目标块通过以下三种方式之一来回应提供的消息:它可以接受消息,拒绝消息或推迟消息。 当目标接受消息时,OfferMessage 方法会返回 Accepted。 当目标拒绝消息时,OfferMessage 方法会返回 Declined。 当目标要求它不再接收来自源的任何消息时,OfferMessage 会返回 DecliningPermanently。 预定义的源块类型在这些返回值接收后不会向链接的目标提供消息,并且它们会自动取消这些目标的链接。

当目标块推迟消息以备日后使用时,OfferMessage 方法会返回 Postponed。 推迟消息的目标块可以稍后调用 ISourceBlock<TOutput>.ReserveMessage 方法,以尝试暂留所提供的消息。 此时,消息仍可用,并且可由该目标块使用,否则表明该消息已由另一个目标接收。 如果目标数据流块稍后需要消息或不再需要消息,它会分别调用 ISourceBlock<TOutput>.ConsumeMessageReleaseReservation 方法。 消息预留通常由以非贪婪模式运行的数据流块类型使用。 非贪婪模式将在本文档的后面详细介绍。 除了保留推迟的消息,目标块也可以使用 ISourceBlock<TOutput>.ConsumeMessage 方法来尝试直接使用推迟的消息。

数据流块完成

数据流块也支持完成概念。 完成状态的数据流块不执行任何进一步的工作。 每个数据流块都有相关的 System.Threading.Tasks.Task 对象(称为“完成任务”),表示数据流块的完成状态。 因为您可以使用完成任务等待 Task 对象完成,所以您可以等待数据流网络的一个或更多终端节点来完成任务。 IDataflowBlock 接口定义 Complete 方法(该方法向数据流块通知它完成的请求)和 Completion 属性(该属性返回数据流块的完成任务)。 ISourceBlock<TOutput>ITargetBlock<TInput> 都继承 IDataflowBlock 接口。

有两种方法来确定数据流块完成时是否没有出错、遇到一个或多个错误或已取消。 第一种方法是在 try-catch 块(在 Visual Basic 中为 Try-Catch)中对完成任务调用 Task.Wait 方法。 下面的示例创建一个 ActionBlock<TInput> 对象,该对象在其输入值小于零时会引发 ArgumentOutOfRangeException。 当此示例在完成任务后调用 AggregateException 时,将引发 Wait。 通过 ArgumentOutOfRangeException 对象的 InnerExceptions 属性来访问 AggregateException

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

此示例演示在执行数据流块的委托中异常变成不可处理的情况。 建议您在这样的块主体中处理异常。 然而,你如果没能这么做,块就表现得好像是它被取消了,而且不会处理传入消息。

当显式取消数据流块时,AggregateException 对象在 OperationCanceledException 属性中包含 InnerExceptions。 有关数据流取消的详细信息,请参阅启用取消部分。

第二种确定数据流块的完成状态的方法是使用延续执行完成任务,或者使用 C# 和 Visual Basic 的异步语言功能以异步方式等待完成任务。 您提供给 Task.ContinueWith 方法的委托采用表示前面任务的 Task 对象。 就 Completion 属性来说,延续的委托自行采用完成任务。 下面的示例与前一个示例相似,不同之处在于它也使用 ContinueWith 方法创建输出整个数据流操作状态的延续任务。

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

您也可以使用类似延续任务主体中的属性(例如 IsCanceled)来确定有关数据流块的完成状态的其他信息。 若要深入了解延续任务及其与取消和错误处理如何相关,请参阅使用延续任务链接任务任务取消异常处理

预定义的数据流块类型

TPL 数据流库提供了多个预定义的数据流块类型。 这些类型分为三个类别:缓冲块、执行块和分组块。 以下部分描述了组成这些类别的块类型。

缓冲块

缓冲块存放的数据供数据使用者使用。 TPL 数据流库提供三种缓冲块类型:System.Threading.Tasks.Dataflow.BufferBlock<T>System.Threading.Tasks.Dataflow.BroadcastBlock<T>System.Threading.Tasks.Dataflow.WriteOnceBlock<T>

BufferBlock(T)

BufferBlock<T> 类表示一般用途的异步消息结构。 此类存储先进先出 (FIFO) 消息队列,此消息队列可由多个源写入或从多个目标读取。 在目标收到来自 BufferBlock<T> 对象的消息时,将从消息队列中删除此消息。 因此,虽然一个 BufferBlock<T> 对象可以具有多个目标,但只有一个目标将接收每条消息。 需将多条消息传递给另一个组件,且该组件必须接收每条消息时,BufferBlock<T> 类十分有用。

下面的基本示例将多个 Int32 值发送到 BufferBlock<T> 对象,然后从该对象读回这些值。

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

有关展示了如何对 BufferBlock<T> 对象执行消息写入和读取操作的完整示例,请参阅如何:对数据流块执行消息写入和读取操作

BroadcastBlock(T)

若您必须将多条消息传递给另一个组件,而该组件只需要最新的值,则 BroadcastBlock<T> 类很有用。 需向多个组件广播消息时,此类也很有用。

下面的基本示例将 Double 值发送给 BroadcastBlock<T> 对象,然后多次从该对象读回该值。 由于值在被读取之后不会从 BroadcastBlock<T> 对象中移除,因此每一次的可用值都相同。

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

有关展示了如何使用 BroadcastBlock<T> 将消息广播给多个目标数据流块的完整示例,请参阅如何:在数据流块中指定任务计划程序

WriteOnceBlock(T)

WriteOnceBlock<T> 类与 BroadcastBlock<T> 类相似,不同之处在于 WriteOnceBlock<T> 对象仅可被写入一次。 可以将 WriteOnceBlock<T> 视作类似于 C# 中的 readonly(Visual Basic 中的 ReadOnly)关键字,不同之处在于 WriteOnceBlock<T> 对象在收到值后(而不是在构造时)成为不可变对象。 与 BroadcastBlock<T> 类相似,在目标收到来自 WriteOnceBlock<T> 对象的消息时,不会从该目标删除此消息。 因此,多个目标将接收到该消息的副本。 当您想要仅传播多条消息中的第一条时,WriteOnceBlock<T> 类很有用。

下面的基本示例将多个 String 值发送给 WriteOnceBlock<T> 对象,然后从该对象读回该值。 由于 WriteOnceBlock<T> 对象在 WriteOnceBlock<T> 对象接收消息后只能写入一次,因此它放弃后续消息。

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

有关展示了如何使用 WriteOnceBlock<T> 接收完成的第一项操作值的完整示例,请参阅如何:取消链接数据流块

执行块

执行块为每条接收数据调用用户提供的委托。 TPL 数据流库提供三种执行块类型:ActionBlock<TInput>System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>

ActionBlock(T)

ActionBlock<TInput> 类在接收数据时是调用委托的目标块。 将 ActionBlock<TInput> 对象视为数据可用时异步运行的委托。 您提供给 ActionBlock<TInput> 对象的委托可以是类型 Action<T> 或类型 System.Func<TInput, Task>。 当通过 ActionBlock<TInput> 使用 Action<T> 对象时,每个输入元素的处理在委托返回时视为已完成。 当您通过 ActionBlock<TInput> 使用 System.Func<TInput, Task> 对象时,只有当返回的 Task 对象完成时,每个输入元素的处理才可以视为已完成。 使用这两种机制,您可使用 ActionBlock<TInput> 同步和异步处理每个输入元素。

下面的基本示例将多个 Int32 值发送给 ActionBlock<TInput> 对象。 ActionBlock<TInput> 对象将这些值输出到控制台中。 然后此示例将该块设置为已完成状态,并等待所有数据流任务完成。

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

有关展示了如何结合使用委托和 ActionBlock<TInput> 类的完整示例,请参阅如何:在数据流块收到数据时执行操作

TransformBlock(TInput, TOutput)

TransformBlock<TInput,TOutput> 类与 ActionBlock<TInput> 类相似,不同之处在于它可以同时充当源和目标。 传递给 TransformBlock<TInput,TOutput> 对象的委托返回类型为 TOutput 的值。 您提供给 TransformBlock<TInput,TOutput> 对象的委托可以是类型 System.Func<TInput, TOutput> 或类型 System.Func<TInput, Task<TOutput>>。 当您搭配使用 TransformBlock<TInput,TOutput>System.Func<TInput, TOutput> 对象时,每个输入元素的处理在委托返回时视为已完成。 当您搭配使用 TransformBlock<TInput,TOutput>System.Func<TInput, Task<TOutput>> 对象时,只有当返回的 Task<TResult> 对象完成时,每个输入元素的处理才可以视为已完成。 像 ActionBlock<TInput> 一样,通过使用这两种机制,您可使用 TransformBlock<TInput,TOutput> 同步和异步处理每个输入元素。

下面的基本示例所创建的 TransformBlock<TInput,TOutput> 对象用于计算输入的平方根。 TransformBlock<TInput,TOutput> 对象采用 Int32 值作为输入并生成 Double 值作为输出。

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

有关展示了如何在数据流块网络中使用 TransformBlock<TInput,TOutput> 在 Windows 窗体应用中执行图像处理的完整示例,请参阅演练:在 Windows 窗体应用中使用数据流

TransformManyBlock(TInput, TOutput)

TransformManyBlock<TInput,TOutput> 类与 TransformBlock<TInput,TOutput> 类相似,不同之处在于 TransformManyBlock<TInput,TOutput> 为每一个输入值生成零个或多个输出值,而不是为每个输入值仅生成一个输出值。 您提供给 TransformManyBlock<TInput,TOutput> 对象的委托可以是类型 System.Func<TInput, IEnumerable<TOutput>> 或类型 System.Func<TInput, Task<IEnumerable<TOutput>>>。 当您搭配使用 TransformManyBlock<TInput,TOutput>System.Func<TInput, IEnumerable<TOutput>> 对象时,每个输入元素的处理在委托返回时视为已完成。 当您搭配使用 TransformManyBlock<TInput,TOutput>System.Func<TInput, Task<IEnumerable<TOutput>>> 对象时,只有当返回的 System.Threading.Tasks.Task<IEnumerable<TOutput>> 对象完成时,每个输入元素的处理才可以视为已完成。

下面的基本示例所创建的 TransformManyBlock<TInput,TOutput> 对象将字符串拆分为单个字符序列。 TransformManyBlock<TInput,TOutput> 对象采用 String 值作为输入并生成 Char 值作为输出。

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

有关展示了如何使用 TransformManyBlock<TInput,TOutput> 在数据流管道中为每个输入生成多个独立输出的完整示例,请参阅演练:创建数据流管道

并行度

每个 ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 对象都缓冲输入消息,直到块准备处理它们。 默认情况下,这些类以接收消息的顺序处理消息,一次处理一条消息。 您还可以指定并行度,使 ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 对象同时处理多条消息。 有关并行执行的详细信息,请参阅本文档后面的“指定并行度”部分。 有关设置并行度使执行数据流块能够一次处理多条消息的示例,请参阅如何:指定数据流块中的并行度

委托类型摘要

下表汇总了可提供给 ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> 对象的委托类型。 此表还指出委托类型是同步执行还是异步执行。

类型 同步委托类型 异步委托类型
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

当处理执行块类型时,还可以使用 lambda 表达式。 有关演示如何使用 lambda 表达式处理执行块的示例,请参阅如何:在数据流块收到数据时执行操作

分组块

分组块在各种约束下合并一个或多个源的数据。 TPL 数据流库提供三种联接块类型:BatchBlock<T>JoinBlock<T1,T2>BatchedJoinBlock<T1,T2>

BatchBlock(T)

BatchBlock<T> 类将一系列输入数据合并到输出数据数组,即批处理。 在创建 BatchBlock<T> 对象时,指定每个批的大小。 当 BatchBlock<T> 对象接收指定数量的输入元素时,它会异步传播含这些元素的数组。 如果 BatchBlock<T> 对象设置为已完成状态,但不包含足够的元素形成批,则会传播包含其余输入元素的最终数组。

BatchBlock<T> 类可以在贪婪或非贪婪模式下运行。 在默认贪婪模式下,BatchBlock<T> 对象接受它提供的每条消息,并在接收指定数量的元素后传播数组。 在非贪婪模式下,BatchBlock<T> 对象推迟所有传入的消息,直到足够的源给块提供消息来形成批。 贪婪模式处理开销较少,所以通常比非贪婪模式执行得更有效。 但是,当您必须以基本方式协调来自多个源的消耗时,可以使用非贪婪模式。 在 Greedy 构造函数的 False 参数中,通过将 dataflowBlockOptions 设置为 BatchBlock<T> 来指定非贪婪模式。

下面的基本示例将多个 Int32 值发送给一批中能容纳十个元素的 BatchBlock<T> 对象。 为了确保所有值从 BatchBlock<T> 传播,此示例调用 Complete 方法。 Complete 方法将 BatchBlock<T> 对象设置为已完成状态,因此,BatchBlock<T> 对象作为最终批传播剩余的所有元素。

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

有关展示了如何使用 BatchBlock<T> 提高数据库插入操作效率的完整示例,请参阅演练:使用 BatchBlock 和 BatchedJoinBlock 提高效率

JoinBlock(T1, T2, ...)

JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 类收集输入元素并传播包含这些元素的 System.Tuple<T1,T2>System.Tuple<T1,T2,T3> 对象。 JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 类不能从 ITargetBlock<TInput> 继承。 而是提供属性 Target1Target2Target3 来实现 ITargetBlock<TInput>

像在贪婪或非贪婪模式下运行的 BatchBlock<T>JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 一样。 在默认贪婪模式下,JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 对象在其每个目标接收至少一条消息之后接受提供的每条消息并传播元组。 在非贪婪模式下,JoinBlock<T1,T2>JoinBlock<T1,T2,T3> 对象推迟所有传入的消息,直到向任何目标提供了创建元组所需的数据。 此时,块参与两阶段提交协议,以原子方式从源中检索所有必需的项。 此延迟使得其他实体可以同时使用数据,这使整个系统取得进展。

下面的基本示例演示 JoinBlock<T1,T2,T3> 对象需要多个数据来计算值的情况。 此示例创建了需要两个 JoinBlock<T1,T2,T3> 值和一个 Int32 值来执行算术运算的 Char 对象。

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

有关展示了如何在非贪婪模式下使用 JoinBlock<T1,T2> 对象合作共享资源的完整示例,请参阅如何:使用 JoinBlock 读取多个源的数据

BatchedJoinBlock(T1, T2, ...)

BatchedJoinBlock<T1,T2>BatchedJoinBlock<T1,T2,T3> 类收集各批输入元素,并传播包含这些元素的 System.Tuple(IList(T1), IList(T2))System.Tuple(IList(T1), IList(T2), IList(T3)) 对象。 将 BatchedJoinBlock<T1,T2> 视为 BatchBlock<T>JoinBlock<T1,T2> 的组合。 在创建 BatchedJoinBlock<T1,T2> 对象时,指定每个批的大小。 BatchedJoinBlock<T1,T2> 还提供了属性 Target1Target2 来实现 ITargetBlock<TInput>。 当从所有目标收到指定数量的输入元素时,BatchedJoinBlock<T1,T2> 对象会异步传播包含这些元素的 System.Tuple(IList(T1), IList(T2)) 对象。

下面的基本示例创建了一个包含结果、BatchedJoinBlock<T1,T2> 值和 Int32 对象错误的 Exception 对象。 此示例执行多个操作,然后将结果写入 Target1 属性,将错误写入 Target2 对象的 BatchedJoinBlock<T1,T2> 属性。 由于成功和失败操作的计数事先是未知的,因此 IList<T> 对象使每个目标都能收到零个或多个值。

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

有关展示了如何在程序从数据库读取数据时使用 BatchedJoinBlock<T1,T2> 捕获结果和任何异常的完整示例,请参阅演练:使用 BatchBlock 和 BatchedJoinBlock 提高效率

配置数据流块行为

可以通过给数据流块类型的构造函数提供 System.Threading.Tasks.Dataflow.DataflowBlockOptions 对象来启用其他选项。 这些选项控制这类管理基础任务和并行度的调度程序的行为。 DataflowBlockOptions 还包含派生类型,用以指定特定于某些数据流块类型的行为。 下表汇总了与每个数据流块类型相关的选项类型。

数据流块类型 DataflowBlockOptions 类型
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

以下各节提供了有关重要数据流块选项(通过 System.Threading.Tasks.Dataflow.DataflowBlockOptionsSystem.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsSystem.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions 类提供)的其他信息。

指定任务计划程序

每个预定义的数据流块在数据可用时使用 TPL 任务计划机制执行一些活动,例如,将数据传播到目标、接收来自源的数据并运行用户定义的委托。 TaskScheduler 是抽象类,表示将任务排队成线程的任务计划程序。 默认任务计划程序 Default 使用 ThreadPool 类进行排队并执行工作。 构造数据流块对象时,您可以通过设置 TaskScheduler 属性重写默认任务计划程序。

当同一个任务计划程序管理多个数据流块时,它可在它们之间强制实施策略。 例如,如果多个数据流块分别配置为面向同一 ConcurrentExclusiveSchedulerPair 对象的独占计划程序,则会序列化这些块间运行的所有工作。 同样,如果这些块配置为面向同一 ConcurrentExclusiveSchedulerPair 对象的并发计划程序,而该计划程序配置为具有最大并发级,则这些块中所有的工作都会受到并发操作数的限制。 有关展示了如何使用 ConcurrentExclusiveSchedulerPair 类让读取操作并行执行(但写入操作独立于其他所有操作)的示例,请参阅如何:在数据流块中指定任务计划程序。 有关 TPL 中的任务计划程序的详细信息,请参阅 TaskScheduler 类主题。

指定并行度

默认情况下,TPL 数据流库提供三种执行块类型(ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>),一次处理一条消息。 这些数据流块类型也会按照接收消息的顺序对消息进行处理。 若要使这些数据流块同时处理该消息,请在构造数据流对象块时设置 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 属性。

MaxDegreeOfParallelism 的默认值为 1,这保证了数据流块一次处理一条消息。 将该属性设置为大于 1 的值将使数据流块可以同时处理多条消息。 将该属性设置为 DataflowBlockOptions.Unbounded 将使基础任务计划程序管理最大并发程度。

重要

当指定大于 1 的最大并行度时,会同时处理多条消息,因此,消息可能不会按照接收的顺序进行处理。 然而,从块输出消息的顺序与接收消息的顺序相同。

由于 MaxDegreeOfParallelism 属性表示最大并行度,因此数据流块执行时的并行度可能小于指定的值。 为了达到功能要求或因为缺少可用的系统资源,数据流块可能使用较小的并行度。 数据流块选择的并行度不会超过您指定的值。

MaxDegreeOfParallelism 属性的值对于每个数据流块对象而言,都是特有的。 例如,如果四个数据流对象块中的每一个都指定 1 作为最大并行度,则所有四个数据流对象块可以并行运行。

有关设置最大并行度以启用并行冗长操作的示例,请参阅如何:指定数据流块中的并行度

指定每个任务的消息数

预定义的数据流块类型使用任务来处理多个输入元素。 这有助于最大限度地减少需要处理数据的任务对象数,从而使应用程序可以更有效地运行。 但是,当一个数据流块集合中的任务处理数据时,其他数据流块的任务可能需要按照队列消息等待处理时间。 若要使数据流任务更加公平,请设置 MaxMessagesPerTask 属性。 当 MaxMessagesPerTask 设置为 DataflowBlockOptions.Unbounded 默认值时,数据流块使用的任务会处理尽可能多的消息。 当 MaxMessagesPerTask 设置为 Unbounded 以外的值时,数据流块为每个 Task 对象至多处理这个数量的消息。 虽然设置 MaxMessagesPerTask 属性可以提高任务间的公平性,但它可能会导致该系统创建多个非必要的任务,这会降低性能。

启用取消

TPL 提供了一种机制,能使任务以一种合作的方式协调取消。 若要启用数据流块参与此取消机制,请设置 CancellationToken 属性。 当此 CancellationToken 对象设置为已取消状态时,所有监视该标记的数据流块都会完成当前项目的执行,但不会开始处理后续项。 这些数据流块也会清除所有缓冲的消息,释放所有源和目标块的连接,并转换为已取消状态。 通过转换为已取消状态,Completion 属性具有设置为 StatusCanceled 属性,除非在处理过程中出现异常。 在这种情况下,Status 会设置为 Faulted

有关演示如何在 Windows 窗体应用程序中使用取消的示例,请参阅如何:取消数据流块。 若要深入了解 TPL 中的取消,请参阅任务取消

指定贪婪与非贪婪行为

几个分组数据流块类型可以在贪婪或非贪婪模式下运行。 默认情况下,预定义的数据流块类型在贪婪模式下运行。

对于联接块类型(如 JoinBlock<T1,T2>),贪婪模式意味着块立即接受数据,即使相应的数据联接不可用。 非贪婪模式意味着块推迟所有传入的消息,直到在其每个目标上有一个可完成联接。 如果任何推迟的消息不再可用,则联接块会释放所有推迟的消息并重新启动该过程。 对于 BatchBlock<T> 类,贪婪和非贪婪行为非常相似,不同之处在于在非贪婪模式下,BatchBlock<T> 对象推迟所有传入的消息,直到不同源中有足够消息可用于完成批作业。

若要为数据流块指定非贪婪模式,请将 Greedy 设置为 False。 有关演示如何使用非贪婪模式使多个联接块更高效地共享数据源的示例,请参阅如何:使用 JoinBlock 从多个源读取数据

自定义数据流块

尽管 TPL 数据流库提供了许多预定义块类型,但是您还是可以创建执行自定义行为的其他块类型。 直接实现 ISourceBlock<TOutput>ITargetBlock<TInput> 接口或使用 Encapsulate 方法生成封装现有块类型行为的复杂块。 有关演示如何实现自定义数据流块功能的示例,请参阅演练:创建自定义数据流块类型

Title 描述
如何:将消息写入数据流块和从数据流块读取消息 演示如何向 BufferBlock<T> 对象写入和读取消息。
如何:实现制造者-使用者数据流模式 描述如何使用数据流模型实现制造者-使用方模式,在这个模型中制造者向数据流块发送消息,而使用方从该块中读取消息。
如何:在数据流块收到数据时执行操作 描述如何向执行数据流块类型(ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>)提供委托。
演练:创建数据流管道 描述如何创建从 Web 下载文本并对该文本执行操作的数据流管道。
如何:取消链接数据流块 展示了如何在源向目标提供消息后,使用 LinkTo 方法取消链接源数据流块和目标数据流块。
演练:在 Windows 窗体应用程序中使用数据流 演示如何创建在 Windows 窗体应用程序中执行图像处理的数据流块网络。
如何:取消数据流块 演示如何在 Windows 窗体应用程序中使用取消。
如何:使用 JoinBlock 从多个源读取数据 解释如何在多个源的数据可用时使用 JoinBlock<T1,T2> 类执行操作,以及如何使用非贪婪模式使多个联接块更有效地共享数据源。
如何:指定数据流块中的并行度 描述如何设置 MaxDegreeOfParallelism 属性使执行数据流块一次处理多条消息。
如何:在数据流块中指定任务计划程序 演示在应用程序中使用数据流时如何关联特定任务计划程序。
演练:使用 BatchBlock 和 BatchedJoinBlock 提高效率 描述如何使用 BatchBlock<T> 类改进数据库插入操作的效率,以及如何使用 BatchedJoinBlock<T1,T2> 类获取程序从数据库中读取数据时产生的结果和发生的任何异常。
演练:创建自定义数据流块类型 演示创建实现自定义行为的数据流块类型的两种方法。
任务并行库 (TPL) 介绍 TPL,一个可在 .NET Framework 应用程序里简化并行和并发编程的库。