データフロー (タスク並列ライブラリ)Dataflow (Task Parallel Library)

タスク並列ライブラリ (TPL) はデータ フロー コンポーネントを提供し、コンカレンシー対応アプリケーションの堅牢性を強化します。The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications. これらのデータ フロー コンポーネントは TPL データ フロー ライブラリと総称されます。These dataflow components are collectively referred to as the TPL Dataflow Library. データ フロー モデルは、粒度の粗いデータ フローおよびパイプライン処理タスクのためのインプロセス メッセージ パッシングを提供し、アクター ベースのプログラミング モデルを推進します。This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks. データ フロー コンポーネントは、TPL の種類とスケジュール インフラストラクチャの上でビルドされ、非同期プログラミングをサポートするために C#、Visual Basic、および F# 言語と統合されています。The dataflow components build on the types and scheduling infrastructure of the TPL and integrate with the C#, Visual Basic, and F# language support for asynchronous programming. 相互に非同期通信を行う必要がある複数の操作を行う場合、またはデータが使用可能になったときにデータを処理する場合に、これらのデータ フロー コンポーネントは役立ちます。These dataflow components are useful when you have multiple operations that must communicate with one another asynchronously or when you want to process data as it becomes available. たとえば、Web カメラからのイメージ データを処理するアプリケーションを考えてみます。For example, consider an application that processes image data from a web camera. データ フロー モデルを使用すると、イメージ フレームが使用可能になったときに、それをアプリケーションで処理できます。By using the dataflow model, the application can process image frames as they become available. たとえば、アプリケーションが輝度修正や赤目補正などを実行してイメージ フレームを向上させる場合、データ フロー コンポーネントのパイプラインを作成できます。If the application enhances image frames, for example, by performing light correction or red-eye reduction, you can create a pipeline of dataflow components. パイプラインの各ステージは、イメージを変換するために、TPL が提供する機能のような、粒度の粗い並列機能を使用する場合があります。Each stage of the pipeline might use more coarse-grained parallelism functionality, such as the functionality that is provided by the TPL, to transform the image.

ここでは、TPL データ フロー ライブラリの概要を示します。This document provides an overview of the TPL Dataflow Library. プログラミング モデル、定義済みのデータ フロー ブロックの型、およびアプリケーションの特定の要件を満たすためのデータ フロー ブロックの構成方法を説明します。It describes the programming model, the predefined dataflow block types, and how to configure dataflow blocks to meet the specific requirements of your applications.

注意

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. または、.Net Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow を実行します。Alternatively, to install it using the .Net Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

このドキュメントは、次のトピックに分かれています。This document contains the following sections:

プログラミング モデルProgramming Model

TPL データ フロー ライブラリはメッセージ パッシングおよび、並列化され、CPU 負荷が高く、I/O 負荷が高く、スループットが高く、待機時間が短いアプリケーションのための基盤を提供します。The TPL Dataflow Library provides a foundation for message passing and parallelizing CPU-intensive and I/O-intensive applications that have high throughput and low latency. また、システム上でのデータのバッファリング方法や移動について、明示的に制御できます。It also gives you explicit control over how data is buffered and moves around the system. データ フロー プログラミング モデルの理解を深めるため、非同期的にディスクからイメージを読み込み、それらの合成イメージを作成するアプリケーションを考えてみます。To better understand the dataflow programming model, consider an application that asynchronously loads images from disk and creates a composite of those images. 従来のプログラミング モデルでは、通常、タスクを協調させ、共有データにアクセスするには、コールバックおよびロックなどの同期オブジェクトを使用する必要があります。Traditional programming models typically require that you use callbacks and synchronization objects, such as locks, to coordinate tasks and access to shared data. データ フロー プログラミング モデルを使用すると、イメージがディスクから読み込まれたときにそれを処理する、データ フロー オブジェクトを作成できます。By using the dataflow programming model, you can create dataflow objects that process images as they are read from disk. データ フロー モデルでは、データが使用可能になったときの処理方法と、データ間の依存関係を宣言します。Under the dataflow model, you declare how data is handled when it becomes available, and also any dependencies between data. ランタイムがデータ間の依存関係を管理するため、通常は共有データへのアクセスの同期要件を回避できます。Because the runtime manages dependencies between data, you can often avoid the requirement to synchronize access to shared data. さらに、ランタイムのスケジュールは非同期のデータの到着に基づいて動作するため、基になるスレッドを効率的に管理することによって、データ フローの応答性とスループットが向上します。In addition, because the runtime schedules work based on the asynchronous arrival of data, dataflow can improve responsiveness and throughput by efficiently managing the underlying threads. Windows フォーム アプリケーションでイメージ処理を実装するためにデータフロー プログラミング モデルを使う例については、「チュートリアル:Windows フォーム アプリケーションでのデータフローの使用」を参照してください。For an example that uses the dataflow programming model to implement image processing in a Windows Forms application, see Walkthrough: Using Dataflow in a Windows Forms Application.

ソースとターゲットSources and Targets

TPL データ フロー ライブラリは、データのバッファリングと処理を行うデータ構造体であるデータ フロー ブロックで構成されます。The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. TPL はソース ブロックターゲット ブロック、および伝達子ブロックの 3 種類のデータ フロー ブロックを定義します。The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. ソース ブロックはデータのソースとして機能し、それを読み取ることができます。A source block acts as a source of data and can be read from. ターゲット ブロックはデータのレシーバーとして機能し、それに書き込むことができます。A target block acts as a receiver of data and can be written to. 伝達子ブロックは、ソース ブロックとターゲット ブロックのどちらとしても機能し、読み取りも書き込みもできます。A propagator block acts as both a source block and a target block, and can be read from and written to. TPL は System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> インターフェイスを定義してソースを表し、System.Threading.Tasks.Dataflow.ITargetBlock<TInput> によってターゲットを表し、System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> によって伝達子を表します。The TPL defines the System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> interface to represent sources, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to represent targets, and System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> to represent propagators. IPropagatorBlock<TInput,TOutput> は、ISourceBlock<TOutput>ITargetBlock<TInput> の両方から継承します。IPropagatorBlock<TInput,TOutput> inherits from both ISourceBlock<TOutput>, and ITargetBlock<TInput>.

TPL データ フロー ライブラリは、ISourceBlock<TOutput>ITargetBlock<TInput>、および IPropagatorBlock<TInput,TOutput> のインターフェイスを実装する、複数の定義済みのデータ フロー ブロックの型を提供します。The TPL Dataflow Library provides several predefined dataflow block types that implement the ISourceBlock<TOutput>, ITargetBlock<TInput>, and IPropagatorBlock<TInput,TOutput> interfaces. これらのデータ フロー ブロックの型については、このドキュメントの「定義済みのデータ フロー ブロックの型」のセクションで説明します。These dataflow block types are described in this document in the section Predefined Dataflow Block Types.

ブロックの接続Connecting Blocks

データ フロー ブロックを接続して、データ フロー ブロックのリニア シーケンスであるパイプラインを作成するか、またはデータ フロー ブロックのグラフであるネットワークを作成できます。You can connect dataflow blocks to form pipelines, which are linear sequences of dataflow blocks, or networks, which are graphs of dataflow blocks. パイプラインは、ネットワークの 1 つの形態です。A pipeline is one form of network. パイプラインまたはネットワークでは、データが使用可能になると、ソースはターゲットに非同期的にデータを伝達します。In a pipeline or network, sources asynchronously propagate data to targets as that data becomes available. ISourceBlock<TOutput>.LinkTo メソッドは、ターゲット ブロックにソースのデータ フロー ブロックをリンクします。The ISourceBlock<TOutput>.LinkTo method links a source dataflow block to a target block. ソースは、ターゲットにリンクしないか、または複数のターゲットにリンクできます。ターゲットはソースからリンクされないか、または複数のソースからリンクできます。A source can be linked to zero or more targets; targets can be linked from zero or more sources. パイプラインまたはネットワークとの間でデータ フロー ブロックを同時に追加または削除できます。You can add or remove dataflow blocks to or from a pipeline or network concurrently. 定義済みのデータ フロー ブロックの型は、リンクとリンク解除のすべてのスレッド セーフな側面を処理します。The predefined dataflow block types handle all thread-safety aspects of linking and unlinking.

データフロー ブロックを接続して基本的なパイプラインを作成する例については、「チュートリアル:データフロー パイプラインの作成」を参照してください。For an example that connects dataflow blocks to form a basic pipeline, see Walkthrough: Creating a Dataflow Pipeline. データフロー ブロックを接続して、より複雑なネットワークを作成する例については、「チュートリアル:Windows フォーム アプリケーションでのデータフローの使用」を参照してください。For an example that connects dataflow blocks to form a more complex network, see Walkthrough: Using Dataflow in a Windows Forms Application. ソースがターゲットにメッセージを提供した後でソースからターゲットのリンクを解除する例については、「方法:データ フロー ブロックのリンクを解除する」を参照してください。For an example that unlinks a target from a source after the source offers the target a message, see How to: Unlink Dataflow Blocks.

フィルター処理Filtering

ISourceBlock<TOutput>.LinkTo メソッドを呼び出してソースをターゲットにリンクする場合、デリゲートを指定して、ターゲット ブロックがメッセージの値に基づいて、メッセージを受け入れるか拒否するかを決めることができます。When you call the ISourceBlock<TOutput>.LinkTo method to link a source to a target, you can supply a delegate that determines whether the target block accepts or rejects a message based on the value of that message. このフィルター機構は、データ フロー ブロックが特定の値のみを確実に受信するために便利な方法です。This filtering mechanism is a useful way to guarantee that a dataflow block receives only certain values. 定義済みのデータ フロー ブロックの型のほとんどの場合に、ソース ブロックが複数のターゲット ブロックに接続されている場合は、ターゲット ブロックがメッセージを拒否すると、ソースはそのメッセージを次のターゲットに提供します。For most of the predefined dataflow block types, if a source block is connected to multiple target blocks, when a target block rejects a message, the source offers that message to the next target. ソースがターゲットにメッセージを提供する順序はソースによって定義され、ソースの種類によって異なる場合があります。The order in which a source offers messages to targets is defined by the source and can vary according to the type of the source. 1 つのターゲットがメッセージを受け入れると、ほとんどのソース ブロックの型がメッセージの提供を停止します。Most source block types stop offering a message after one target accepts that message. この規則の 1 つの例外は、BroadcastBlock<T> クラスです。いくつかのターゲットがメッセージを拒否した場合でも、各メッセージをすべてのターゲットに提供します。One exception to this rule is the BroadcastBlock<T> class, which offers each message to all targets, even if some targets reject the message. フィルター処理を使って、特定のメッセージのみを処理する例については、「チュートリアル:Windows フォーム アプリケーションでのデータフローの使用」を参照してください。For an example that uses filtering to process only certain messages, see Walkthrough: Using Dataflow in a Windows Forms Application.

重要

それぞれの定義済みのソースのデータ フロー ブロックの型は、メッセージを受信した順にそれが伝達されることを保証するため、ソース ブロックは各メッセージを読み取ってから、次のメッセージを処理する必要があります。Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, every message must be read from the source block before the source block can process the next message. したがって、フィルター処理を使用して複数のターゲットをソースに接続する場合、各メッセージを少なくとも 1 つのターゲット ブロックが受信するようにします。Therefore, when you use filtering to connect multiple targets to a source, make sure that at least one target block receives each message. そうしない場合、アプリケーションでデッドロックが発生する可能性があります。Otherwise, your application might deadlock.

メッセージ パッシングMessage Passing

データ フロー プログラミング モデルは、プログラムの独立したコンポーネントがメッセージの送信によって相互に通信する、メッセージ パッシングの概念に関連しています。The dataflow programming model is related to the concept of message passing, where independent components of a program communicate with one another by sending messages. アプリケーション コンポーネントの間でメッセージを伝達する方法の 1 つは、Post および DataflowBlock.SendAsync メソッドを呼び出して、ターゲット データ フロー ブロック ポストにメッセージを送信し (Post は同期的に動作し、SendAsync は非同期的に動作します)、ReceiveReceiveAsync、および TryReceive メソッドを呼び出して、ソース ブロックからメッセージを受け取ることです。One way to propagate messages among application components is to call the Post and DataflowBlock.SendAsync methods to send messages to target dataflow blocks post (Post acts synchronously; SendAsync acts asynchronously) and the Receive, ReceiveAsync, and TryReceive methods to receive messages from source blocks. 入力データをヘッド ノード (ターゲット ブロック) に送信し、出力データをパイプラインのターミナル ノードまたはネットワーク (1 つ以上のソース ブロック) のターミナル ノードから受信することにより、これらのメソッドとデータ フロー パイプラインまたはネットワークを結合することができます。You can combine these methods with dataflow pipelines or networks by sending input data to the head node (a target block), and receiving output data from the terminal node of the pipeline or the terminal nodes of the network (one or more source blocks). Choose メソッドを使用して、データが使用可能な、指定されたソースの先頭から読み取り、そのデータにアクションを実行することもできます。You can also use the Choose method to read from the first of the provided sources that has data available and perform action on that data.

ソース ブロックは、ITargetBlock<TInput>.OfferMessage メソッドを呼び出して、ターゲット ブロックにデータを提供します。Source blocks offer data to target blocks by calling the ITargetBlock<TInput>.OfferMessage method. ターゲット ブロックは、提供されたメッセージに、3 つの方法のうちの 1 つで応答します。メッセージを受け入れるか、メッセージを拒否するか、またはメッセージを延期できます。The target block responds to an offered message in one of three ways: it can accept the message, decline the message, or postpone the message. ターゲットがメッセージを受け入れると、OfferMessage メソッドは Accepted を返します。When the target accepts the message, the OfferMessage method returns Accepted. ターゲットがメッセージを拒否すると、OfferMessage メソッドは Declined を返します。When the target declines the message, the OfferMessage method returns Declined. ターゲットがソースからメッセージを受け取らないことを要求すると、OfferMessageDecliningPermanently を返します。When the target requires that it no longer receives any messages from the source, OfferMessage returns DecliningPermanently. その戻り値を受信した後は、定義済みのソース ブロックの型は、リンクされたターゲットにメッセージを提供せず、自動的にそのターゲットからリンク解除します。The predefined source block types do not offer messages to linked targets after such a return value is received, and they automatically unlink from such targets.

後で使用するために、ターゲット ブロックがメッセージを延期すると、OfferMessage メソッドは Postponed を返します。When a target block postpones the message for later use, the OfferMessage method returns Postponed. メッセージを延期したターゲット ブロックは、後から ISourceBlock<TOutput>.ReserveMessage メソッドを呼び出して、提供されたメッセージの予約を試みることができます。A target block that postpones a message can later call the ISourceBlock<TOutput>.ReserveMessage method to try to reserve the offered message. この時点で、メッセージはまだ使用できてターゲット ブロックから使用できるか、または他のターゲットに取得されています。At this point, the message is either still available and can be used by the target block, or the message has been taken by another target. ターゲット ブロックがメッセージを後から必要とする場合、またはメッセージを必要としない場合には、それぞれ ISourceBlock<TOutput>.ConsumeMessage または ReleaseReservation メソッドを呼び出します。When the target block later requires the message or no longer needs the message, it calls the ISourceBlock<TOutput>.ConsumeMessage or ReleaseReservation method, respectively. 通常は、メッセージの予約は、最短一致のモードで動作するデータ フロー ブロックの型で使用されます。Message reservation is typically used by the dataflow block types that operate in non-greedy mode. 最短一致のモードについては、このドキュメントの後で説明します。Non-greedy mode is explained later in this document. 延期されたメッセージを予約せずに、ターゲット ブロックは ISourceBlock<TOutput>.ConsumeMessage メソッドを使用して、直接延期されたメッセージを使用しようとすることもできます。Instead of reserving a postponed message, a target block can also use the ISourceBlock<TOutput>.ConsumeMessage method to attempt to directly consume the postponed message.

データ フロー ブロックの完了Dataflow Block Completion

データ フロー ブロックは、完了の概念をサポートしています。Dataflow blocks also support the concept of completion. 完了した状態にあるデータ フロー ブロックは、処理を実行しません。A dataflow block that is in the completed state does not perform any further work. 各データ フロー ブロックには、ブロックの完了ステータスを表す "完了タスク" と呼ばれる System.Threading.Tasks.Task オブジェクトが関連付けられています。Each dataflow block has an associated System.Threading.Tasks.Task object, known as a completion task, that represents the completion status of the block. Task オブジェクトの終了を待機できるため、完了タスクを使用して、データ フロー ネットワークの 1 つ以上のターミナル ノードが終了するまで待機できます。Because you can wait for a Task object to finish, by using completion tasks, you can wait for one or more terminal nodes of a dataflow network to finish. IDataflowBlock インターフェイスは、データ フロー ブロックに完了の要求を通知する Complete メソッドを定義し、またデータ フロー ブロックの完了タスクを返す Completion プロパティを定義します。The IDataflowBlock interface defines the Complete method, which informs the dataflow block of a request for it to complete, and the Completion property, which returns the completion task for the dataflow block. ISourceBlock<TOutput>ITargetBlock<TInput> はどちらも IDataflowBlock インターフェイスを継承します。Both ISourceBlock<TOutput> and ITargetBlock<TInput> inherit the IDataflowBlock interface.

データ フロー ブロックがエラーなしで完了したか、1 つのエラーが発生したか、複数のエラーが発生したか、取り消されたかを知る 2 つの方法があります。There are two ways to determine whether a dataflow block completed without error, encountered one or more errors, or was canceled. 最初の方法は、try-catch ブロック (Visual Basic では Try-Catch) の完了タスクで Task.Wait メソッドを呼び出すことです。The first way is to call the Task.Wait method on the completion task in a try-catch block (Try-Catch in Visual Basic). 次の例では、入力値が 0 未満の場合に ActionBlock<TInput> をスローする ArgumentOutOfRangeException オブジェクトを作成します。The following example creates an ActionBlock<TInput> object that throws ArgumentOutOfRangeException if its input value is less than zero. この例では、完了タスクで AggregateException を呼び出すと、Wait がスローされます。AggregateException is thrown when this example calls Wait on the completion task. ArgumentOutOfRangeException には InnerExceptions オブジェクトの AggregateException プロパティを使用してアクセスします。The ArgumentOutOfRangeException is accessed through the InnerExceptions property of the AggregateException object.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}", 
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
         ' Create an ActionBlock<int> object that prints its input
         ' and throws ArgumentOutOfRangeException if the input
         ' is less than zero.
         Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
             Console.WriteLine("n = {0}", n)
             If n < 0 Then
                 Throw New ArgumentOutOfRangeException()
             End If
         End Sub)

         ' Post values to the block.
         throwIfNegative.Post(0)
         throwIfNegative.Post(-1)
         throwIfNegative.Post(1)
         throwIfNegative.Post(-2)
         throwIfNegative.Complete()

         ' Wait for completion in a try/catch block.
         Try
            throwIfNegative.Completion.Wait()
         Catch ae As AggregateException
            ' If an unhandled exception occurs during dataflow processing, all
            ' exceptions are propagated through an AggregateException object.
            ae.Handle(Function(e)
                Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                Return True
            End Function)
         End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

この例は、例外が実行データ フロー ブロックのデリゲートで処理されない場合を示します。This example demonstrates the case in which an exception goes unhandled in the delegate of an execution dataflow block. そのようなブロックの本体で例外を処理することをお勧めします。We recommend that you handle exceptions in the bodies of such blocks. そのようにできない場合は、ブロックはそれが取り消されたように動作し、受信メッセージを処理しません。However, if you are unable to do so, the block behaves as though it was canceled and does not process incoming messages.

データ フロー ブロックが明示的に取り消されると、AggregateException オブジェクトには OperationCanceledException プロパティの InnerExceptions が含まれます。When a dataflow block is canceled explicitly, the AggregateException object contains OperationCanceledException in the InnerExceptions property. データ フローの取り消しの詳細については、「取り消しの有効化」セクションを参照してください。For more information about dataflow cancellation, see Enabling Cancellation section.

データ フロー ブロックの完了ステータスを判定する 2 番目の方法は、完了タスクの継続を使用するか、または C# と Visual Basic の非同期言語機能を使用して、完了したタスクを非同期的に待つことです。The second way to determine the completion status of a dataflow block is to use a continuation of the completion task, or to use the asynchronous language features of C# and Visual Basic to asynchronously wait for the completion task. Task.ContinueWith メソッドに提供するデリゲートは、継続元タスクを表す Task オブジェクトを受け取ります。The delegate that you provide to the Task.ContinueWith method takes a Task object that represents the antecedent task. Completion プロパティの場合は、継続のデリゲートは完了タスクを受け取ります。In the case of the Completion property, the delegate for the continuation takes the completion task itself. 次の例は前の例に似ていますが、ContinueWith メソッドを使用してデータ フロー操作全体のステータスを出力する継続タスクも作成している点が異なります。The following example resembles the previous one, except that it also uses the ContinueWith method to create a continuation task that prints the status of the overall dataflow operation.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall 
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.", 
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
         ' Create an ActionBlock<int> object that prints its input
         ' and throws ArgumentOutOfRangeException if the input
         ' is less than zero.
         Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
             Console.WriteLine("n = {0}", n)
             If n < 0 Then
                 Throw New ArgumentOutOfRangeException()
             End If
         End Sub)

         ' Create a continuation task that prints the overall 
         ' task status to the console when the block finishes.
         throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

         ' Post values to the block.
         throwIfNegative.Post(0)
         throwIfNegative.Post(-1)
         throwIfNegative.Post(1)
         throwIfNegative.Post(-2)
         throwIfNegative.Complete()

         ' Wait for completion in a try/catch block.
         Try
            throwIfNegative.Completion.Wait()
         Catch ae As AggregateException
            ' If an unhandled exception occurs during dataflow processing, all
            ' exceptions are propagated through an AggregateException object.
            ae.Handle(Function(e)
                Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                Return True
            End Function)
         End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

また、継続タスクの本体で IsCanceled などのプロパティを使用して、データ フロー ブロックの完了ステータスに関する追加情報を決定することもできます。You can also use properties such as IsCanceled in the body of the continuation task to determine additional information about the completion status of a dataflow block. 継続タスク、および継続タスクと取り消し処理やエラー処理との関連の詳細については、「継続タスクを使用したタスクの連結」、「タスクのキャンセル」、および例外処理に関するページを参照してください。For more information about continuation tasks and how they relate to cancellation and error handling, see Chaining Tasks by Using Continuation Tasks, Task Cancellation, and Exception Handling.

[ページのトップへ][go to top]

定義済みのデータ フロー ブロックの型Predefined Dataflow Block Types

TPL データ フロー ライブラリは、いくつかの定義済みのデータ フロー ブロックの型を提供します。The TPL Dataflow Library provides several predefined dataflow block types. これらの型は、バッファリング ブロック実行ブロックグループ化ブロックの 3 種類に分けられます。These types are divided into three categories: buffering blocks, execution blocks, and grouping blocks. 次のセクションでは、これらの種類を構成するブロックの型について説明します。The following sections describe the block types that make up these categories.

バッファリング ブロックBuffering Blocks

バッファリング ブロックはデータ コンシューマーが使用するデータを保持します。Buffering blocks hold data for use by data consumers. TPL データ フロー ライブラリは System.Threading.Tasks.Dataflow.BufferBlock<T>System.Threading.Tasks.Dataflow.BroadcastBlock<T>System.Threading.Tasks.Dataflow.WriteOnceBlock<T> の 3 つのバッファリング ブロックの型を提供します。The TPL Dataflow Library provides three buffering block types: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>, and System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock(T)BufferBlock(T)

BufferBlock<T> クラスは、汎用的な非同期メッセージング構造体を表します。The BufferBlock<T> class represents a general-purpose asynchronous messaging structure. このクラスでは、複数のソースが書き込むことができるメッセージ、または複数のターゲットが読み取ることができるメッセージの先入れ先出し (FIFO) のキューを格納します。This class stores a first in, first out (FIFO) queue of messages that can be written to by multiple sources or read from by multiple targets. ターゲットが BufferBlock<T> オブジェクトからメッセージを受信すると、そのメッセージはメッセージ キューから削除されます。When a target receives a message from a BufferBlock<T> object, that message is removed from the message queue. そのため、BufferBlock<T> オブジェクトには複数のターゲットを設定できますが、各メッセージを受信するターゲットは 1 つだけです。Therefore, although a BufferBlock<T> object can have multiple targets, only one target will receive each message. BufferBlock<T> クラスは、複数のメッセージを別のコンポーネントに渡し、そのコンポーネントで各メッセージを受信する必要がある場合に便利です。The BufferBlock<T> class is useful when you want to pass multiple messages to another component, and that component must receive each message.

次の基本的な例は、Int32 オブジェクトに複数の BufferBlock<T> の値をポストし、その値をそのオブジェクトから読み込みます。The following basic example posts several Int32 values to a BufferBlock<T> object and then reads those values back from that object.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
         ' Create a BufferBlock<int> object.
         Dim bufferBlock = New BufferBlock(Of Integer)()

         ' Post several messages to the block.
         For i As Integer = 0 To 2
            bufferBlock.Post(i)
         Next i

         ' Receive the messages back from the block.
         For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
         Next i

'          Output:
'            0
'            1
'            2
'          

BufferBlock<T> オブジェクトに対するメッセージの書き込みと読み取りの方法を示す詳しい例については、「方法:データフロー ブロックに対してメッセージの読み取りと書き込みを行う」を参照してください。For a complete example that demonstrates how to write messages to and read messages from a BufferBlock<T> object, see How to: Write Messages to and Read Messages from a Dataflow Block.

BroadcastBlock(T)BroadcastBlock(T)

BroadcastBlock<T> クラスは、複数のメッセージを別のコンポーネントに渡すときに、そのコンポーネントで必要になるのが最新の値のみである場合に便利です。The BroadcastBlock<T> class is useful when you must pass multiple messages to another component, but that component needs only the most recent value. また、このクラスは、メッセージを複数のコンポーネントにブロードキャストする場合にも便利です。This class is also useful when you want to broadcast a message to multiple components.

次の基本的な例は、Double の値を BroadcastBlock<T> オブジェクトにポストし、その値をそのオブジェクトから複数回読み込みます。The following basic example posts a Double value to a BroadcastBlock<T> object and then reads that value back from that object several times. 値は読み取られた後も BroadcastBlock<T> オブジェクトから削除されないため、毎回同じ値を使用できます。Because values are not removed from BroadcastBlock<T> objects after they are read, the same value is available every time.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
         ' Create a BroadcastBlock<double> object.
         Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

         ' Post a message to the block.
         broadcastBlock.Post(Math.PI)

         ' Receive the messages back from the block several times.
         For i As Integer = 0 To 2
            Console.WriteLine(broadcastBlock.Receive())
         Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

BroadcastBlock<T> を使って複数のターゲット ブロックにメッセージをブロードキャストする方法を示す詳しい例については、「方法:データフロー ブロックのタスク スケジューラを指定する」を参照してください。For a complete example that demonstrates how to use BroadcastBlock<T> to broadcast a message to multiple target blocks, see How to: Specify a Task Scheduler in a Dataflow Block.

WriteOnceBlock(T)WriteOnceBlock(T)

WriteOnceBlock<T> クラスは BroadcastBlock<T> クラスに似ていますが、WriteOnceBlock<T> オブジェクトに 1 回しか書き込むことができない点が異なります。The WriteOnceBlock<T> class resembles the BroadcastBlock<T> class, except that a WriteOnceBlock<T> object can be written to one time only. WriteOnceBlock<T> は C# の readonly (Visual Basic では ReadOnly) キーワードと似ていると考えることができますが、構築時でなく、値を読み取った後は、WriteOnceBlock<T> オブジェクトを変更できなくなる点が異なります。You can think of WriteOnceBlock<T> as being similar to the C# readonly (ReadOnly in Visual Basic) keyword, except that a WriteOnceBlock<T> object becomes immutable after it receives a value instead of at construction. BroadcastBlock<T> クラスと同様に、ターゲットが WriteOnceBlock<T> オブジェクトからメッセージを受信しても、そのメッセージはそのオブジェクトから削除されません。Like the BroadcastBlock<T> class, when a target receives a message from a WriteOnceBlock<T> object, that message is not removed from that object. そのため、複数のターゲットがメッセージのコピーを受信します。Therefore, multiple targets receive a copy of the message. WriteOnceBlock<T> のクラスは、複数のメッセージの最初のメッセージだけを伝達する場合に便利です。The WriteOnceBlock<T> class is useful when you want to propagate only the first of multiple messages.

次の基本的な例は、String オブジェクトに複数の WriteOnceBlock<T> の値をポストし、その値をそのオブジェクトから読み込みます。The following basic example posts multiple String values to a WriteOnceBlock<T> object and then reads the value back from that object. WriteOnceBlock<T> オブジェクトには 1 回だけ書き込むことができるため、WriteOnceBlock<T> オブジェクトは 1 つのメッセージを受信した後は、それ以降のメッセージを破棄します。Because a WriteOnceBlock<T> object can be written to one time only, after a WriteOnceBlock<T> object receives a message, it discards subsequent messages.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first 
// message to be received is written to the block. 
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
         ' Create a WriteOnceBlock<string> object.
         Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

         ' Post several messages to the block in parallel. The first 
         ' message to be received is written to the block. 
         ' Subsequent messages are discarded.
         Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

         ' Receive the message from the block.
         Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

WriteOnceBlock<T> を使って終了した最初の操作の値を受け取る方法を示した詳しい例については、「方法:データフロー ブロックのリンクを解除する」を参照してください。For a complete example that demonstrates how to use WriteOnceBlock<T> to receive the value of the first operation that finishes, see How to: Unlink Dataflow Blocks.

実行ブロックExecution Blocks

実行ブロックは、受け取ったデータのそれぞれに、ユーザーが指定したデリゲートを呼び出します。Execution blocks call a user-provided delegate for each piece of received data. TPL データ フロー ライブラリは ActionBlock<TInput>System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> の 3 つの実行ブロックの型を提供します。The TPL Dataflow Library provides three execution block types: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>, and System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock(T)ActionBlock(T)

ActionBlock<TInput> クラスは、データを受け取るとデリゲートを呼び出すターゲット ブロックです。The ActionBlock<TInput> class is a target block that calls a delegate when it receives data. ActionBlock<TInput> オブジェクトは、データが使用可能になったときに非同期的に実行できるデリゲートと考えることができます。Think of a ActionBlock<TInput> object as a delegate that runs asynchronously when data becomes available. ActionBlock<TInput> オブジェクトに提供するデリゲートは、Action<T> 型またはSystem.Func<TInput, Task> 型を使用できます。The delegate that you provide to an ActionBlock<TInput> object can be of type Action<T> or type System.Func<TInput, Task>. ActionBlock<TInput> オブジェクトを Action<T> と共に使用すると、各入力要素の処理はデリゲートが返されたときに完了したと見なされます。When you use an ActionBlock<TInput> object with Action<T>, processing of each input element is considered completed when the delegate returns. ActionBlock<TInput> オブジェクトを System.Func<TInput, Task> と共に使用すると、各入力要素の処理は返された Task オブジェクトが終了した場合にのみ、完了したと見なされます。When you use an ActionBlock<TInput> object with System.Func<TInput, Task>, processing of each input element is considered completed only when the returned Task object is completed. この 2 つの方法を使って、ActionBlock<TInput> を使用して各入力要素を同期的にも非同期的にも処理することができます。By using these two mechanisms, you can use ActionBlock<TInput> for both synchronous and asynchronous processing of each input element.

次の基本的な例では、Int32 オブジェクトに複数の ActionBlock<TInput> の値をポストします。The following basic example posts multiple Int32 values to an ActionBlock<TInput> object. ActionBlock<TInput> オブジェクトは、コンソールにそれらの値を出力します。The ActionBlock<TInput> object prints those values to the console. 次にこの例では、ブロックを完了した状態に設定し、すべてのデータ フロー タスクの終了を待機します。This example then sets the block to the completed state and waits for all dataflow tasks to finish.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all 
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
         ' Create an ActionBlock<int> object that prints values
         ' to the console.
         Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

         ' Post several messages to the block.
         For i As Integer = 0 To 2
            actionBlock.Post(i * 10)
         Next i

         ' Set the block to the completed state and wait for all 
         ' tasks to finish.
         actionBlock.Complete()
         actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

ActionBlock<TInput> クラスとデリゲートの使用方法を示した詳しい例については、「方法:データフロー ブロックでデータを受信したときにアクションを実行する」を参照してください。For complete examples that demonstrate how to use delegates with the ActionBlock<TInput> class, see How to: Perform Action When a Dataflow Block Receives Data.

TransformBlock(TInput, TOutput)TransformBlock(TInput, TOutput)

TransformBlock<TInput,TOutput> クラスは ActionBlock<TInput> クラスに似ていますが、ソースとしてもターゲットとしても動作する点が異なります。The TransformBlock<TInput,TOutput> class resembles the ActionBlock<TInput> class, except that it acts as both a source and as a target. TransformBlock<TInput,TOutput> オブジェクトに渡すデリゲートは TOutput 型の値を返します。The delegate that you pass to a TransformBlock<TInput,TOutput> object returns a value of type TOutput. TransformBlock<TInput,TOutput> オブジェクトに提供するデリゲートは、System.Func<TInput, TOutput> 型またはSystem.Func<TInput, Task<TOutput>> 型を使用できます。The delegate that you provide to a TransformBlock<TInput,TOutput> object can be of type System.Func<TInput, TOutput> or type System.Func<TInput, Task<TOutput>>. TransformBlock<TInput,TOutput> オブジェクトを System.Func<TInput, TOutput> と共に使用すると、各入力要素の処理はデリゲートが返されたときに完了したと見なされます。When you use a TransformBlock<TInput,TOutput> object with System.Func<TInput, TOutput>, processing of each input element is considered completed when the delegate returns. TransformBlock<TInput,TOutput> オブジェクトを System.Func<TInput, Task<TOutput>> と共に使用すると、各入力要素の処理は返された Task<TResult> オブジェクトが終了した場合にのみ、完了したと見なされます。When you use a TransformBlock<TInput,TOutput> object used with System.Func<TInput, Task<TOutput>>, processing of each input element is considered completed only when the returned Task<TResult> object is completed. ActionBlock<TInput> と同様に、この 2 つの方法を使って、TransformBlock<TInput,TOutput> を使用して各入力要素を同期的にも非同期的にも処理することができます。As with ActionBlock<TInput>, by using these two mechanisms, you can use TransformBlock<TInput,TOutput> for both synchronous and asynchronous processing of each input element.

次の基本的な例では、入力の平方根を計算する TransformBlock<TInput,TOutput> オブジェクトを作成します。The following basic example creates a TransformBlock<TInput,TOutput> object that computes the square root of its input. TransformBlock<TInput,TOutput> オブジェクトは、入力として Int32 の値を受け取り、出力として Double の値を生成します。The TransformBlock<TInput,TOutput> object takes Int32 values as input and produces Double values as output.

// Create a TransformBlock<int, double> object that 
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
         ' Create a TransformBlock<int, double> object that 
         ' computes the square root of its input.
         Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

         ' Post several messages to the block.
         transformBlock.Post(10)
         transformBlock.Post(20)
         transformBlock.Post(30)

         ' Read the output messages from the block.
         For i As Integer = 0 To 2
            Console.WriteLine(transformBlock.Receive())
         Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Windows フォーム アプリケーションでイメージ処理を実行するデータフロー ブロックのネットワークにおいて TransformBlock<TInput,TOutput> を使用する詳しい例については、「チュートリアル:Windows フォーム アプリケーションでのデータフローの使用」を参照してください。For complete examples that uses TransformBlock<TInput,TOutput> in a network of dataflow blocks that performs image processing in a Windows Forms application, see Walkthrough: Using Dataflow in a Windows Forms Application.

TransformManyBlock(TInput, TOutput)TransformManyBlock(TInput, TOutput)

TransformManyBlock<TInput,TOutput> クラスは TransformBlock<TInput,TOutput> クラスに似ていますが、各入力値に 1 つの出力値を生成するのでなく、TransformManyBlock<TInput,TOutput> は各入力値に出力値を生成しないか、または 1 つ以上の出力値を生成する点が異なります。The TransformManyBlock<TInput,TOutput> class resembles the TransformBlock<TInput,TOutput> class, except that TransformManyBlock<TInput,TOutput> produces zero or more output values for each input value, instead of only one output value for each input value. TransformManyBlock<TInput,TOutput> オブジェクトに提供するデリゲートは、System.Func<TInput, IEnumerable<TOutput>> 型またはSystem.Func<TInput, Task<IEnumerable<TOutput>>> 型を使用できます。The delegate that you provide to a TransformManyBlock<TInput,TOutput> object can be of type System.Func<TInput, IEnumerable<TOutput>> or type System.Func<TInput, Task<IEnumerable<TOutput>>>. TransformManyBlock<TInput,TOutput> オブジェクトを System.Func<TInput, IEnumerable<TOutput>> と共に使用すると、各入力要素の処理はデリゲートが返されたときに完了したと見なされます。When you use a TransformManyBlock<TInput,TOutput> object with System.Func<TInput, IEnumerable<TOutput>>, processing of each input element is considered completed when the delegate returns. TransformManyBlock<TInput,TOutput> オブジェクトを System.Func<TInput, Task<IEnumerable<TOutput>>> と共に使用すると、各入力要素の処理は返された System.Threading.Tasks.Task<IEnumerable<TOutput>> オブジェクトが終了した場合にのみ、完了したと見なされます。When you use a TransformManyBlock<TInput,TOutput> object with System.Func<TInput, Task<IEnumerable<TOutput>>>, processing of each input element is considered complete only when the returned System.Threading.Tasks.Task<IEnumerable<TOutput>> object is completed.

次の基本的な例は、文字列を個別の文字のシーケンスに分割する TransformManyBlock<TInput,TOutput> オブジェクトを作成します。The following basic example creates a TransformManyBlock<TInput,TOutput> object that splits strings into their individual character sequences. TransformManyBlock<TInput,TOutput> オブジェクトは、入力として String の値を受け取り、出力として Char の値を生成します。The TransformManyBlock<TInput,TOutput> object takes String values as input and produces Char values as output.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
         ' Create a TransformManyBlock<string, char> object that splits
         ' a string into its individual characters.
         Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

         ' Post two messages to the first block.
         transformManyBlock.Post("Hello")
         transformManyBlock.Post("World")

         ' Receive all output values from the block.
         For i As Integer = 0 To ("Hello" & "World").Length - 1
            Console.WriteLine(transformManyBlock.Receive())
         Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

TransformManyBlock<TInput,TOutput> を使ってデータフロー パイプラインの各入力に複数の独立した出力を生成する詳しい例については、「チュートリアル:データフロー パイプラインの作成」を参照してください。For complete examples that use TransformManyBlock<TInput,TOutput> to produce multiple independent outputs for each input in a dataflow pipeline, see Walkthrough: Creating a Dataflow Pipeline.

並列化の次数Degree of Parallelism

すべての ActionBlock<TInput>, TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> オブジェクトは、ブロックが入力メッセージを処理できるようになるまで、入力メッセージをバッファリングします。Every ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput> object buffers input messages until the block is ready to process them. 既定では、これらのクラスはメッセージを受信した順序で、一度に 1 つずつ処理します。By default, these classes process messages in the order in which they are received, one message at a time. ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> のオブジェクトを有効化し、並列化の次数を指定して、複数のメッセージを同時に処理することもできます。You can also specify the degree of parallelism to enable ActionBlock<TInput>, TransformBlock<TInput,TOutput> and TransformManyBlock<TInput,TOutput> objects to process multiple messages concurrently. 同時実行に関する詳細については、このドキュメントの後の「並列化の次数の指定」のセクションを参照してください。For more information about concurrent execution, see the section Specifying the Degree of Parallelism later in this document. 並列化の次数を設定して実行データフロー ブロックを有効化し、同時に複数のメッセージを処理する例については、「方法:データ フロー ブロックで並列処理の範囲を指定する」を参照してください。For an example that sets the degree of parallelism to enable an execution dataflow block to process more than one message at a time, see How to: Specify the Degree of Parallelism in a Dataflow Block.

デリゲート型の概要Summary of Delegate Types

次の表に、ActionBlock<TInput>TransformBlock<TInput,TOutput>、および TransformManyBlock<TInput,TOutput> オブジェクトに指定できるデリゲート型の概要を示します。The following table summarizes the delegate types that you can provide to ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput> objects. このテーブルでは、デリゲート型が同期的または非同期的に動作するかどうかについても示しています。This table also specifies whether the delegate type operates synchronously or asynchronously.

Type 同期的なデリゲート型Synchronous Delegate Type 非同期的なデリゲート型Asynchronous Delegate Type
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

実行ブロックの型を使用する場合にラムダ式を使用することもできます。You can also use lambda expressions when you work with execution block types. 実行ブロックと共にラムダ式を使用する方法を示した例については、「方法:データフロー ブロックでデータを受信したときにアクションを実行する」を参照してください。For an example that shows how to use a lambda expression with an execution block, see How to: Perform Action When a Dataflow Block Receives Data.

グループ化ブロックGrouping Blocks

グループ化ブロックは、さまざまな制約の下で 1 つ以上のソースからデータをまとめます。Grouping blocks combine data from one or more sources and under various constraints. TPL データ フロー ライブラリは BatchBlock<T>JoinBlock<T1,T2>BatchedJoinBlock<T1,T2> の 3 つの結合ブロックの型を提供します。The TPL Dataflow Library provides three join block types: BatchBlock<T>, JoinBlock<T1,T2>, and BatchedJoinBlock<T1,T2>.

BatchBlock(T)BatchBlock(T)

BatchBlock<T> クラスは、バッチと呼ばれる入力データのセットを、出力データの配列に結合します。The BatchBlock<T> class combines sets of input data, which are known as batches, into arrays of output data. BatchBlock<T> オブジェクトを作成するときに、各バッチのサイズを指定します。You specify the size of each batch when you create a BatchBlock<T> object. BatchBlock<T> オブジェクトは、指定した数の入力要素を受け取ると、その要素を含む配列を非同期的に伝達します。When the BatchBlock<T> object receives the specified count of input elements, it asynchronously propagates out an array that contains those elements. BatchBlock<T> オブジェクトが完了状態に設定されているが、バッチを形成するために十分な要素を含んでいない場合には、残りの入力要素を含む最終的な配列を伝達します。If a BatchBlock<T> object is set to the completed state but does not contain enough elements to form a batch, it propagates out a final array that contains the remaining input elements.

BatchBlock<T> クラスは、"最長一致" モードまたは "最短一致" モードのどちらかで動作します。The BatchBlock<T> class operates in either greedy or non-greedy mode. 既定では最長一致モードで、BatchBlock<T> オブジェクトは提供されたすべてのメッセージを受け取り、指定された数の要素を受け取った後に、配列を伝達します。In greedy mode, which is the default, a BatchBlock<T> object accepts every message that it is offered and propagates out an array after it receives the specified count of elements. 最短一致モードでは、BatchBlock<T> オブジェクトは、バッチを作成するために十分なソースがブロックへのメッセージを提供するまで、すべての受信メッセージを延期します。In non-greedy mode, a BatchBlock<T> object postpones all incoming messages until enough sources have offered messages to the block to form a batch. 最長一致モードでは通常、最短一致モードよりも処理のオーバーヘッドが低いため、パフォーマンスがよくなります。Greedy mode typically performs better than non-greedy mode because it requires less processing overhead. ただし、アトミックな方法で複数のソースの使用量を調整する必要がある場合、最短一致モードを使用できます。However, you can use non-greedy mode when you must coordinate consumption from multiple sources in an atomic fashion. 最短一致モードを指定するには、Greedy コンストラクターの False パラメーターの dataflowBlockOptionsBatchBlock<T> を設定します。Specify non-greedy mode by setting Greedy to False in the dataflowBlockOptions parameter in the BatchBlock<T> constructor.

次の基本的な例では、複数の Int32 の値を、バッチに 10 個の要素を持つ BatchBlock<T> オブジェクトにポストします。The following basic example posts several Int32 values to a BatchBlock<T> object that holds ten elements in a batch. BatchBlock<T> からすべての値が伝達されることを保証するために、この例では Complete メソッドを呼び出します。To guarantee that all values propagate out of the BatchBlock<T>, this example calls the Complete method. Complete メソッドは BatchBlock<T> オブジェクトを完了状態に設定するため、BatchBlock<T> オブジェクトは残りのすべての要素を最終バッチとして伝達します。The Complete method sets the BatchBlock<T> object to the completed state, and therefore, the BatchBlock<T> object propagates out any remaining elements as a final batch.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
         ' Create a BatchBlock<int> object that holds ten
         ' elements per batch.
         Dim batchBlock = New BatchBlock(Of Integer)(10)

         ' Post several values to the block.
         For i As Integer = 0 To 12
            batchBlock.Post(i)
         Next i
         ' Set the block to the completed state. This causes
         ' the block to propagate out any any remaining
         ' values as a final batch.
         batchBlock.Complete()

         ' Print the sum of both batches.

         Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

         Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

BatchBlock<T> を使用してデータベースの挿入操作の効率を向上させる詳しい例ついては、「チュートリアル:BatchBlock および BatchedJoinBlock を使用した効率の向上」を参照してください。For a complete example that uses BatchBlock<T> to improve the efficiency of database insert operations, see Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency.

JoinBlock(T1, T2, ...)JoinBlock(T1, T2, ...)

JoinBlock<T1,T2>JoinBlock<T1,T2,T3> クラスは、入力要素を収集して、それらの要素を含む System.Tuple<T1,T2> または System.Tuple<T1,T2,T3> オブジェクトを伝達します。The JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> classes collect input elements and propagate out System.Tuple<T1,T2> or System.Tuple<T1,T2,T3> objects that contain those elements. JoinBlock<T1,T2> クラスと JoinBlock<T1,T2,T3> クラスは、ITargetBlock<TInput> を継承しません。The JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> classes do not inherit from ITargetBlock<TInput>. 代わりに、Target1 を実装する Target2Target3、および ITargetBlock<TInput> のプロパティを提供します。Instead, they provide properties, Target1, Target2, and Target3, that implement ITargetBlock<TInput>.

BatchBlock<T> と同様に、JoinBlock<T1,T2> および JoinBlock<T1,T2,T3> は、最長一致モードまたは最短一致モードのどちらかで動作します。Like BatchBlock<T>, JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> operate in either greedy or non-greedy mode. 既定では最長一致モードで、JoinBlock<T1,T2> または JoinBlock<T1,T2,T3> オブジェクトは提供されたすべてのメッセージを受け取り、各ターゲットが少なくとも 1 つのメッセージを受け取った後で、タプルを伝達します。In greedy mode, which is the default, a JoinBlock<T1,T2> or JoinBlock<T1,T2,T3> object accepts every message that it is offered and propagates out a tuple after each of its targets receives at least one message. 最短一致モードでは、JoinBlock<T1,T2> または JoinBlock<T1,T2,T3> オブジェクトは、すべてのターゲットがタプルを作成するために必要なデータを提供されるまで、すべての受信メッセージを延期します。In non-greedy mode, a JoinBlock<T1,T2> or JoinBlock<T1,T2,T3> object postpones all incoming messages until all targets have been offered the data that is required to create a tuple. この時点で、ブロックは 2 フェーズ コミット プロトコルによって、ソースからすべての必要な項目をアトミックに取得します。At this point, the block engages in a two-phase commit protocol to atomically retrieve all required items from the sources. この延期により、他のエンティティは当面の間データを使用でき、システム全体が進行できます。This postponement makes it possible for another entity to consume the data in the meantime, to allow the overall system to make forward progress.

次の基本的な例は、値を計算するために JoinBlock<T1,T2,T3> オブジェクトが複数のデータを必要とするケースを示します。The following basic example demonstrates a case in which a JoinBlock<T1,T2,T3> object requires multiple data to compute a value. この例では、算術演算を実行するために 2 つの JoinBlock<T1,T2,T3> の値と 1 つの Int32 の値を必要とする Char オブジェクトを作成します。This example creates a JoinBlock<T1,T2,T3> object that requires two Int32 values and a Char value to perform an arithmetic operation.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
         ' Create a JoinBlock<int, int, char> object that requires
         ' two numbers and an operator.
         Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

         ' Post two values to each target of the join.

         joinBlock.Target1.Post(3)
         joinBlock.Target1.Post(6)

         joinBlock.Target2.Post(5)
         joinBlock.Target2.Post(4)

         joinBlock.Target3.Post("+"c)
         joinBlock.Target3.Post("-"c)

         ' Receive each group of values and apply the operator part
         ' to the number parts.

         For i As Integer = 0 To 1
            Dim data = joinBlock.Receive()
            Select Case data.Item3
               Case "+"c
                  Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
               Case "-"c
                  Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
               Case Else
                  Console.WriteLine("Unknown operator '{0}'.", data.Item3)
            End Select
         Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

最短一致モードで JoinBlock<T1,T2> オブジェクトを使って協調的にリソースを共有する詳しい例については、「方法:JoinBlock を使用して複数のソースからデータを読み込む」を参照してください。For a complete example that uses JoinBlock<T1,T2> objects in non-greedy mode to cooperatively share a resource, see How to: Use JoinBlock to Read Data From Multiple Sources.

BatchedJoinBlock(T1, T2, ...)BatchedJoinBlock(T1, T2, ...)

BatchedJoinBlock<T1,T2>BatchedJoinBlock<T1,T2,T3> クラスは、入力要素のバッチを収集して、それらの要素を含む System.Tuple(IList(T1), IList(T2)) または System.Tuple(IList(T1), IList(T2), IList(T3)) オブジェクトを伝達します。The BatchedJoinBlock<T1,T2> and BatchedJoinBlock<T1,T2,T3> classes collect batches of input elements and propagate out System.Tuple(IList(T1), IList(T2)) or System.Tuple(IList(T1), IList(T2), IList(T3)) objects that contain those elements. BatchedJoinBlock<T1,T2>BatchBlock<T>JoinBlock<T1,T2> の組み合わせであると考えることができます。Think of BatchedJoinBlock<T1,T2> as a combination of BatchBlock<T> and JoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> オブジェクトを作成するときに、各バッチのサイズを指定します。Specify the size of each batch when you create a BatchedJoinBlock<T1,T2> object. BatchedJoinBlock<T1,T2> は、Target1 を実装する Target2 および ITargetBlock<TInput> のプロパティを提供します。BatchedJoinBlock<T1,T2> also provides properties, Target1 and Target2, that implement ITargetBlock<TInput>. すべてのターゲットから指定した数の入力要素を受け取ると、BatchedJoinBlock<T1,T2> オブジェクトは、それらの要素を含む System.Tuple(IList(T1), IList(T2)) オブジェクトを非同期的に伝達します。When the specified count of input elements are received from across all targets, the BatchedJoinBlock<T1,T2> object asynchronously propagates out a System.Tuple(IList(T1), IList(T2)) object that contains those elements.

次の基本的な例では、結果を保持する BatchedJoinBlock<T1,T2> オブジェクト、Int32 の値、エラーである Exception オブジェクトを作成します。The following basic example creates a BatchedJoinBlock<T1,T2> object that holds results, Int32 values, and errors that are Exception objects. この例では複数の操作を実行し、結果を Target1 オブジェクトの Target2 プロパティに書き込み、エラーを BatchedJoinBlock<T1,T2> プロパティに書き込みます。This example performs multiple operations and writes results to the Target1 property, and errors to the Target2 property, of the BatchedJoinBlock<T1,T2> object. 操作の成功と失敗の数はあらかじめ不明であるため、IList<T> オブジェクトは、各ターゲット値を受け取らないことも、複数の値を受け取ることも可能です。Because the count of successful and failed operations is unknown in advance, the IList<T> objects enable each target to receive zero or more values.

 // For demonstration, create a Func<int, int> that 
 // returns its argument, or throws ArgumentOutOfRangeException
 // if the argument is less than zero.
 Func<int, int> DoWork = n =>
 {
    if (n < 0)
       throw new ArgumentOutOfRangeException();
    return n;
 };

 // Create a BatchedJoinBlock<int, Exception> object that holds 
 // seven elements per batch.
 var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

 // Post several items to the block.
 foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
 {
    try
    {
       // Post the result of the worker to the 
       // first target of the block.
       batchedJoinBlock.Target1.Post(DoWork(i));
    }
    catch (ArgumentOutOfRangeException e) 
    {
       // If an error occurred, post the Exception to the 
       // second target of the block.
       batchedJoinBlock.Target2.Post(e); 
    }
 }

 // Read the results from the block.
 var results = batchedJoinBlock.Receive();

 // Print the results to the console.

 // Print the results.
 foreach (int n in results.Item1)
 {
    Console.WriteLine(n);
 }
 // Print failures.
 foreach (Exception e in results.Item2)
 {
    Console.WriteLine(e.Message);
 }

 /* Output:
    5
    6
    13
    55
    0
    Specified argument was out of the range of valid values.
    Specified argument was out of the range of valid values.
  */
         ' For demonstration, create a Func<int, int> that 
         ' returns its argument, or throws ArgumentOutOfRangeException
         ' if the argument is less than zero.
         Dim DoWork As Func(Of Integer, Integer) = Function(n)
            If n < 0 Then
               Throw New ArgumentOutOfRangeException()
            End If
            Return n
         End Function

         ' Create a BatchedJoinBlock<int, Exception> object that holds 
         ' seven elements per batch.
         Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

         ' Post several items to the block.
         For Each i As Integer In New Integer() { 5, 6, -7, -22, 13, 55, 0 }
            Try
               ' Post the result of the worker to the 
               ' first target of the block.
               batchedJoinBlock.Target1.Post(DoWork(i))
            Catch e As ArgumentOutOfRangeException
               ' If an error occurred, post the Exception to the 
               ' second target of the block.
               batchedJoinBlock.Target2.Post(e)
            End Try
         Next i

         ' Read the results from the block.
         Dim results = batchedJoinBlock.Receive()

         ' Print the results to the console.

         ' Print the results.
         For Each n As Integer In results.Item1
            Console.WriteLine(n)
         Next n
         ' Print failures.
         For Each e As Exception In results.Item2
            Console.WriteLine(e.Message)
         Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

BatchedJoinBlock<T1,T2> を使って、プログラムがデータベースを読み取る間にその結果と発生する例外の両方をキャプチャする詳しい例については、「チュートリアル:BatchBlock および BatchedJoinBlock を使用した効率の向上」を参照してください。For a complete example that uses BatchedJoinBlock<T1,T2> to capture both the results and any exceptions that occur while the program reads from a database, see Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency.

[ページのトップへ][go to top]

データフロー ブロックの動作を構成するConfiguring Dataflow Block Behavior

データ フロー ブロックの型のコンストラクターに System.Threading.Tasks.Dataflow.DataflowBlockOptions のオブジェクトを提供することによって、追加のオプションを有効にできます。You can enable additional options by providing a System.Threading.Tasks.Dataflow.DataflowBlockOptions object to the constructor of dataflow block types. これらのオプションは、基になるタスクと並列化の次数を管理するスケジューラなどの動作を制御します。These options control behavior such the scheduler that manages the underlying task and the degree of parallelism. DataflowBlockOptions には、特定のデータ フロー ブロックの型に固有の動作を指定する派生型があります。The DataflowBlockOptions also has derived types that specify behavior that is specific to certain dataflow block types. 各データ フロー ブロックの型に関連付けられているオプション型の概要を次の表に示します。The following table summarizes which options type is associated with each dataflow block type.

データ フロー ブロックの型Dataflow Block Type DataflowBlockOptionsDataflowBlockOptions type
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

以下のセクションでは、System.Threading.Tasks.Dataflow.DataflowBlockOptionsSystem.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsSystem.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions クラスを通じて利用できる、重要なデータ フロー ブロックのオプションに関する追加情報を提供します。The following sections provide additional information about the important kinds of dataflow block options that are available through the System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions, and System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions classes.

タスク スケジューラの指定Specifying the Task Scheduler

すべての定義済みのデータ フロー ブロックは TPL タスク スケジューリング メカニズムを使って、ターゲットへのデータの伝達、ソースからのデータの受け取り、データが使用可能になったときのユーザー定義のデリゲートの実行、などのアクティビティを実行します。Every predefined dataflow block uses the TPL task scheduling mechanism to perform activities such as propagating data to a target, receiving data from a source, and running user-defined delegates when data becomes available. TaskScheduler は、タスクをスレッドのキューに置くタスク スケジューラを表す抽象クラスです。TaskScheduler is an abstract class that represents a task scheduler that queues tasks onto threads. 既定のタスク スケジューラである Default は、ThreadPool クラスを使用して作業をキューに置き、実行します。The default task scheduler, Default, uses the ThreadPool class to queue and execute work. データ フロー ブロック オブジェクトを構成する場合、TaskScheduler プロパティを設定して、既定のタスク スケジューラをオーバーライドできます。You can override the default task scheduler by setting the TaskScheduler property when you construct a dataflow block object.

同じタスク スケジューラが複数のデータ フロー ブロックを管理する場合、それらにポリシーを強制的に適用できます。When the same task scheduler manages multiple dataflow blocks, it can enforce policies across them. たとえば、複数のデータ フロー ブロックが、それぞれ同じ ConcurrentExclusiveSchedulerPair オブジェクトの排他スケジューラを対象とするように構成されている場合、これらのブロックにわたって実行されるすべての操作がシリアル化されます。For example, if multiple dataflow blocks are each configured to target the exclusive scheduler of the same ConcurrentExclusiveSchedulerPair object, all work that runs across these blocks is serialized. 同様に、これらのブロックが同じ ConcurrentExclusiveSchedulerPair オブジェクトのコンカレント スケジューラをターゲットとして構成され、そのスケジューラが最大コンカレンシー レベルに構成されている場合、これらのブロックのすべての操作は、そのコンカレント操作の数に制限されます。Similarly, if these blocks are configured to target the concurrent scheduler of the same ConcurrentExclusiveSchedulerPair object, and that scheduler is configured to have a maximum concurrency level, all work from these blocks is limited to that number of concurrent operations. ConcurrentExclusiveSchedulerPair クラスを使って、読み取り操作は並列に行うが、書き込み操作は他のすべての操作で排他的に行う例については、「方法:データフロー ブロックのタスク スケジューラを指定する」を参照してください。For an example that uses the ConcurrentExclusiveSchedulerPair class to enable read operations to occur in parallel, but write operations to occur exclusively of all other operations, see How to: Specify a Task Scheduler in a Dataflow Block. TPL のタスク スケジューラの詳細については、TaskScheduler クラスに関するトピックを参照してください。For more information about task schedulers in the TPL, see the TaskScheduler class topic.

並列化の次数の指定Specifying the Degree of Parallelism

既定では、TPL データ フロー ライブラリが提供する 3 つの実行ブロックの型、ActionBlock<TInput>TransformBlock<TInput,TOutput> および TransformManyBlock<TInput,TOutput> は、一度に 1 つのメッセージを処理します。By default, the three execution block types that the TPL Dataflow Library provides, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput>, process one message at a time. これらのデータ フロー ブロックの型は、メッセージを受け取った順番でそれを処理します。These dataflow block types also process messages in the order in which they are received. データ フロー ブロックのオブジェクトを構築するときに、これらのデータ フロー ブロックがメッセージを同時に操作できるようにするには、ExecutionDataflowBlockOptions.MaxDegreeOfParallelism のプロパティを設定します。To enable these dataflow blocks to process messages concurrently, set the ExecutionDataflowBlockOptions.MaxDegreeOfParallelism property when you construct the dataflow block object.

MaxDegreeOfParallelism の既定値は 1 で、これはデータ フロー ブロックが一度に 1 つのメッセージを処理することを保証します。The default value of MaxDegreeOfParallelism is 1, which guarantees that the dataflow block processes one message at a time. このプロパティに 1 を超える値に設定すると、データ フロー ブロックは複数のメッセージを同時に処理できます。Setting this property to a value that is larger than 1 enables the dataflow block to process multiple messages concurrently. このプロパティを DataflowBlockOptions.Unbounded に設定すると、基になるタスク スケジューラは最大のコンカレンシーの程度を管理することができます。Setting this property to DataflowBlockOptions.Unbounded enables the underlying task scheduler to manage the maximum degree of concurrency.

重要

並列処理の最大範囲に 1 を超える数を指定すると、複数のメッセージを同時に処理するため、メッセージが受信した順序で処理されない場合があります。When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore messages might not be processed in the order in which they are received. ただし、ブロックからのメッセージが出力される順序は、メッセージが受信された順序と同じです。The order in which the messages are output from the block is, however, the same one in which they are received.

MaxDegreeOfParallelism プロパティは並列処理の最大範囲を表すため、データ フロー ブロックは、指定より低い並列化の度合いで実行される場合があります。Because the MaxDegreeOfParallelism property represents the maximum degree of parallelism, the dataflow block might execute with a lesser degree of parallelism than you specify. 機能要件を満たすため、または使用可能なシステム リソースの不足のため、データ フロー ブロックは、より低い並列化の度合いを使う場合があります。The dataflow block might use a lesser degree of parallelism to meet its functional requirements or because there is a lack of available system resources. データ フロー ブロックは、指定より大きな並列化を選択することはありません。A dataflow block never chooses more parallelism than you specify.

MaxDegreeOfParallelism プロパティの値は、各データ フロー ブロックのオブジェクトに排他的です。The value of the MaxDegreeOfParallelism property is exclusive to each dataflow block object. たとえば、4 つのデータ フロー ブロックのオブジェクトが並列処理の最大範囲にそれぞれ 1 を指定した場合、4 つすべてのデータ フロー ブロック オブジェクトを並列に実行できる場合もあります。For example, if four dataflow block objects each specify 1 for the maximum degree of parallelism, all four dataflow block objects can potentially run in parallel.

並列処理の最大範囲を設定して時間のかかる操作を並列に行う例については、「方法:データ フロー ブロックで並列処理の範囲を指定する」を参照してください。For an example that sets the maximum degree of parallelism to enable lengthy operations to occur in parallel, see How to: Specify the Degree of Parallelism in a Dataflow Block.

タスクごとのメッセージ数の指定Specifying the Number of Messages per Task

定義済みのデータ フロー ブロックの型は、複数の入力要素を処理するタスクを使用します。The predefined dataflow block types use tasks to process multiple input elements. これによって、データを処理するために必要なタスクのオブジェクトの数を最小限に抑えることができ、アプリケーションを効率的に実行できます。This helps minimize the number of task objects that are required to process data, which enables applications to run more efficiently. ただし、一連のデータ フロー ブロックのタスクがデータを処理する場合、他のデータ フロー ブロックのタスクはメッセージをキューに置いて処理の時間を待機する必要がある場合があります。However, when the tasks from one set of dataflow blocks are processing data, the tasks from other dataflow blocks might need to wait for processing time by queuing messages. データ フロー タスク間での高い公平性を有効にするには、MaxMessagesPerTask プロパティを設定します。To enable better fairness among dataflow tasks, set the MaxMessagesPerTask property. MaxMessagesPerTask が既定値である DataflowBlockOptions.Unbounded に設定されている場合、データ フロー ブロックが使用するタスクは、可能な限り多くのメッセージを処理します。When MaxMessagesPerTask is set to DataflowBlockOptions.Unbounded, which is the default, the task used by a dataflow block processes as many messages as are available. MaxMessagesPerTaskUnbounded 以外の値に設定されている場合、データ フロー ブロックは、Task オブジェクトごとに最大でこの数のメッセージを処理します。When MaxMessagesPerTask is set to a value other than Unbounded, the dataflow block processes at most this number of messages per Task object. MaxMessagesPerTask プロパティを設定すると、タスク間の公平性を向上できますが、システムが必要以上のタスクを作成する場合があり、その場合にはパフォーマンスが低下する場合があります。Although setting the MaxMessagesPerTask property can increase fairness among tasks, it can cause the system to create more tasks than are necessary, which can decrease performance.

取り消しの有効化Enabling Cancellation

TPL はタスクが協調的な方法によって取り消しの調整をできる機構を提供します。The TPL provides a mechanism that enables tasks to coordinate cancellation in a cooperative manner. データ フロー ブロックがこの取り消し機構に参加するためには、CancellationToken プロパティを設定します。To enable dataflow blocks to participate in this cancellation mechanism, set the CancellationToken property. CancellationToken オブジェクトを取り消し状態に設定すると、このトークンを監視するすべてのデータ フロー ブロックは現在の項目の実行を終了しますが、後続の項目を開始しません。When this CancellationToken object is set to the canceled state, all dataflow blocks that monitor this token finish execution of their current item but do not start processing subsequent items. また、これらのデータ フロー ブロックはすべてのバッファリングされたメッセージをクリアし、すべてのソースとターゲット ブロックへの接続を解放し、取り消し状態に遷移します。These dataflow blocks also clear any buffered messages, release connections to any source and target blocks, and transition to the canceled state. 取り消し状態に遷移すると、処理中に例外が発生しない限り、Completion プロパティは Status プロパティを Canceled に設定します。By transitioning to the canceled state, the Completion property has the Status property set to Canceled, unless an exception occurred during processing. この場合、StatusFaulted に設定されます。In that case, Status is set to Faulted.

Windows フォーム アプリケーションで取り消しを使う方法を示した例については、「方法:データフロー ブロックをキャンセルする」をご覧ください。For an example that demonstrates how to use cancellation in a Windows Forms application, see How to: Cancel a Dataflow Block. TPL での取り消し処理の詳細については、「タスクのキャンセル」を参照してください。For more information about cancellation in the TPL, see Task Cancellation.

最長一致と最短一致の動作の指定Specifying Greedy Versus Non-Greedy Behavior

複数のグループ化データ フロー ブロックの型は最長一致モードまたは最短一致モードのどちらかで動作します。Several grouping dataflow block types can operate in either greedy or non-greedy mode. 既定では、定義済みのデータ フロー ブロックの型は、最長一致モードで動作します。By default, the predefined dataflow block types operate in greedy mode.

JoinBlock<T1,T2> などの結合ブロックでは、最長一致モードは、対応する結合データが使用できない場合でも、ブロックが直ちにデータを受け入れることを意味します。For join block types such as JoinBlock<T1,T2>, greedy mode means that the block immediately accepts data even if the corresponding data with which to join is not yet available. 最短一致モードは、ターゲットのそれぞれで結合が完了できるように使用可能になるまで、ブロックがすべての受信メッセージを延期することを意味します。Non-greedy mode means that the block postpones all incoming messages until one is available on each of its targets to complete the join. 延期されたメッセージのいずれかが使用できなくなった場合、結合ブロックは延期されたすべてのメッセージを解放し、プロセスを再起動します。If any of the postponed messages are no longer available, the join block releases all postponed messages and restarts the process. BatchBlock<T> クラスにおいては、最長一致と最短一致の動作は似ていますが、最短一致の場合には、バッチを完了するために十分なメッセージを別のソースから使用できるようになるまで BatchBlock<T> オブジェクトがすべての受信メッセージを延期する点が異なります。For the BatchBlock<T> class, greedy and non-greedy behavior is similar, except that under non-greedy mode, a BatchBlock<T> object postpones all incoming messages until enough are available from distinct sources to complete a batch.

データ フロー ブロックに最短一致モードを指定するには、GreedyFalse に設定します。To specify non-greedy mode for a dataflow block, set Greedy to False. 最短一致モードを使い、複数の結合ブロックを有効にして、データ ソースをより効率的に共有する例については、「方法:JoinBlock を使用して複数のソースからデータを読み込む」を参照してください。For an example that demonstrates how to use non-greedy mode to enable multiple join blocks to share a data source more efficiently, see How to: Use JoinBlock to Read Data From Multiple Sources.

[ページのトップへ][go to top]

カスタム データ フロー ブロックCustom Dataflow Blocks

TPL データ フロー ライブラリは多くの定義済みブロックの型を提供しますが、カスタム動作を実行する追加のブロックの型を作成できます。Although the TPL Dataflow Library provides many predefined block types, you can create additional block types that perform custom behavior. ISourceBlock<TOutput> または ITargetBlock<TInput> インターフェイスを直接実装するか、または Encapsulate メソッドを使用して、既存のブロックの型の動作をカプセル化する複雑なブロックをビルドします。Implement the ISourceBlock<TOutput> or ITargetBlock<TInput> interfaces directly or use the Encapsulate method to build a complex block that encapsulates the behavior of existing block types. カスタム データフロー ブロック機能の実装方法を示した例については、「チュートリアル:カスタム データフロー ブロックの型の作成」を参照してください。For examples that show how to implement custom dataflow block functionality, see Walkthrough: Creating a Custom Dataflow Block Type.

[ページのトップへ][go to top]

TitleTitle 説明Description
方法: データフロー ブロックに対してメッセージの読み取りと書き込みを行うHow to: Write Messages to and Read Messages from a Dataflow Block BufferBlock<T> オブジェクトにメッセージを書き込む方法とメッセージを読み取る方法を示します。Demonstrates how to write messages to and read messages from a BufferBlock<T> object.
方法: プロデューサー/コンシューマーのデータフロー パターンを実装するHow to: Implement a Producer-Consumer Dataflow Pattern データ フロー モデルを使ってプロデューサー/コンシューマー パターンを実装し、プロデューサーがデータ フロー ブロックにメッセージを送信して、コンシューマーがそのブロックからメッセージを読み取る方法を示します。Describes how to use the dataflow model to implement a producer-consumer pattern, where the producer sends messages to a dataflow block, and the consumer reads messages from that block.
方法: データフロー ブロックでデータを受信したときにアクションを実行するHow to: Perform Action When a Dataflow Block Receives Data デリゲートを実行データ フロー ブロックの型である、ActionBlock<TInput>TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> に提供する方法について説明します。Describes how to provide delegates to the execution dataflow block types, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput>.
チュートリアル: データフロー パイプラインの作成Walkthrough: Creating a Dataflow Pipeline Web からテキストをダウンロードし、そのテキストの操作を実行する、データ フロー パイプラインを作成する方法について説明します。Describes how to create a dataflow pipeline that downloads text from the web and performs operations on that text.
方法: データフロー ブロックのリンクを解除するHow to: Unlink Dataflow Blocks LinkTo メソッドを使って、ソースがターゲットにメッセージを提供した後に、ターゲット ブロックをソースからリンク解除する方法を示します。Demonstrates how to use the LinkTo method to unlink a target block from its source after the source offers a message to the target.
チュートリアル: Windows フォーム アプリケーションでのデータフローの使用Walkthrough: Using Dataflow in a Windows Forms Application Windows フォーム アプリケーションでイメージ処理を実行する、データ フロー ブロックのネットワークを作成する方法を示します。Demonstrates how to create a network of dataflow blocks that perform image processing in a Windows Forms application.
方法: データフロー ブロックをキャンセルするHow to: Cancel a Dataflow Block Windows フォーム アプリケーションで取り消しを使う方法を説明します。Demonstrates how to use cancellation in a Windows Forms application.
方法: JoinBlock を使用して複数のソースからデータを読み込むHow to: Use JoinBlock to Read Data From Multiple Sources データが複数のソースから使用できるようになった場合に JoinBlock<T1,T2> クラスを使って操作を実行する方法、および最短一致モードを使って、複数の結合ブロックがデータ ソースをより効率的に共有できる方法について説明します。Explains how to use the JoinBlock<T1,T2> class to perform an operation when data is available from multiple sources, and how to use non-greedy mode to enable multiple join blocks to share a data source more efficiently.
方法: データフロー ブロックで並列処理の範囲を指定するHow to: Specify the Degree of Parallelism in a Dataflow Block MaxDegreeOfParallelism プロパティを設定して、実行データ フロー ブロックが一度に 1 つ以上のメッセージを処理できる方法について説明します。Describes how to set the MaxDegreeOfParallelism property to enable an execution dataflow block to process more than one message at a time.
方法: データフロー ブロックのタスク スケジューラを指定するHow to: Specify a Task Scheduler in a Dataflow Block アプリケーションでデータ フローを使用する場合に特定のタスク スケジューラを関連付ける方法を示します。Demonstrates how to associate a specific task scheduler when you use dataflow in your application.
チュートリアル: BatchBlock および BatchedJoinBlock を使用した効率の向上Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency BatchBlock<T> クラスを使用してデータベースの挿入操作の効率を向上する方法、および BatchedJoinBlock<T1,T2> クラスを使用して、プログラムがデータベースを読み取る間にその結果と発生する例外の両方をキャプチャする方法について説明します。Describes how to use the BatchBlock<T> class to improve the efficiency of database insert operations, and how to use the BatchedJoinBlock<T1,T2> class to capture both the results and any exceptions that occur while the program reads from a database.
チュートリアル: カスタム データフロー ブロックの型の作成Walkthrough: Creating a Custom Dataflow Block Type カスタム動作を実装するデータ フロー ブロックの型を作成する 2 とおりの方法を示します。Demonstrates two ways to create a dataflow block type that implements custom behavior.
タスク並列ライブラリ (TPL)Task Parallel Library (TPL) .NET Framework アプリケーションの並列処理と並列プログラミングを簡略化するライブラリである TPL を紹介します。Introduces the TPL, a library that simplifies parallel and concurrent programming in .NET Framework applications.