方法: データフロー ブロックとの間でメッセージの読み取りと書き込みを行う

この記事では、タスク並列ライブラリ (TPL) データフロー ライブラリを使用して、データフロー ブロックとの間でメッセージの読み取りと書き込みを行う方法を説明します。 TPL データフロー ライブラリには、データフロー ブロックへのメッセージの書き込みとデータフロー ブロックからのメッセージの読み取りのための、同期と非同期の両方のメソッドが用意されています。 この記事では、System.Threading.Tasks.Dataflow.BufferBlock<T> クラスを使用する方法について説明します。 BufferBlock<T> クラスは、メッセージをバッファーし、メッセージ ソースとメッセージ ターゲットの両方として動作します。

注意

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow を実行します。

同期的な書き込みと読み取り

次の例では、Post メソッドを使用して BufferBlock<T> データフロー ブロックを書き込み、Receive メソッドを使用して同じオブジェクトから読み取ります。

var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(bufferBlock.Receive());
}

// Output:
//   0
//   1
//   2
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

' Output:
'   0
'   1
'   2

また、次の例に示すように、TryReceive メソッドを使用してデータフロー ブロックから読み取ることもできます。 TryReceive メソッドは、現在のスレッドをブロックしないため、データを時々ポーリングする場合に便利です。

// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
    Console.WriteLine(value);
}

// Output:
//   0
//   1
//   2
' Post more messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
    Console.WriteLine(value)
Loop

' Output:
'   0
'   1
'   2

Post メソッドは同期的に動作するため、前の例の BufferBlock<T> オブジェクトは、2 つ目のループがデータを読み取る前にすべてのデータを受け取ります。 次の例では、Task.WhenAll(Task[]) を使用して最初の例を拡張し、メッセージ ブロックからの読み取りとメッセージ ブロックへの書き込みを同時に行います。 WhenAll は同時に実行されているすべての非同期操作を待機するため、値は特定の順序で BufferBlock<T> オブジェクトに書き込まれません。

// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(bufferBlock.Receive());
    }
});
var post2 = Task.Run(() =>
{
    bufferBlock.Post(2);
});

await Task.WhenAll(post01, receive, post2);

// Output:
//   0
//   1
//   2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
                          bufferBlock.Post(0)
                          bufferBlock.Post(1)
                      End Sub)
Dim receive = Task.Run(Sub()
                           For i As Integer = 0 To 2
                               Console.WriteLine(bufferBlock.Receive())
                           Next i
                       End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)

' Output:
'   0
'   1
'   2

非同期的な書き込みと読み取り

次の例では、SendAsync メソッドを使用して BufferBlock<T> オブジェクトに非同期的に書き込み、ReceiveAsync メソッドを使用して同じオブジェクトから非同期的に読み取ります。 この例では、async 演算子と await 演算子 (Visual Basic では AsyncAwait) を使用して、ターゲット ブロックへのデータの送信とターゲット ブロックからのデータの読み取りを非同期的に行います。 SendAsync メソッドは、メッセージを延期するデータフロー ブロックを有効にする必要があるときに便利です。 ReceiveAsync メソッドは、データが利用できるようになったときにデータを操作する必要がある場合に便利です。 メッセージ ブロック間でメッセージが伝達されるしくみの詳細については、データフローに関するページの「Message Passing (メッセージ パッシング)」のセクションを参照してください。

// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
    await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(await bufferBlock.ReceiveAsync());
}

// Output:
//   0
//   1
//   2
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
    await bufferBlock.SendAsync(i)
Next i

' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i

' Output:
'   0
'   1
'   2

コード例全体

次の例には、この記事のコードがすべて含まれています。

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
    // Demonstrates asynchronous dataflow operations.
    static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
    {
        // Post more messages to the block asynchronously.
        for (int i = 0; i < 3; i++)
        {
            await bufferBlock.SendAsync(i);
        }

        // Asynchronously receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(await bufferBlock.ReceiveAsync());
        }

        // Output:
        //   0
        //   1
        //   2
    }

    static async Task Main()
    {
        var bufferBlock = new BufferBlock<int>();

        // Post several messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(bufferBlock.Receive());
        }

        // Output:
        //   0
        //   1
        //   2

        // Post more messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        while (bufferBlock.TryReceive(out int value))
        {
            Console.WriteLine(value);
        }

        // Output:
        //   0
        //   1
        //   2

        // Write to and read from the message block concurrently.
        var post01 = Task.Run(() =>
        {
            bufferBlock.Post(0);
            bufferBlock.Post(1);
        });
        var receive = Task.Run(() =>
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine(bufferBlock.Receive());
            }
        });
        var post2 = Task.Run(() =>
        {
            bufferBlock.Post(2);
        });

        await Task.WhenAll(post01, receive, post2);

        // Output:
        //   0
        //   1
        //   2

        // Demonstrate asynchronous dataflow operations.
        await AsyncSendReceive(bufferBlock);
    }
}
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
    ' Demonstrates asynchronous dataflow operations.
    Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
        ' Post more messages to the block asynchronously.
        For i As Integer = 0 To 2
            await bufferBlock.SendAsync(i)
        Next i

        ' Asynchronously receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(await bufferBlock.ReceiveAsync())
        Next i

        ' Output:
        '   0
        '   1
        '   2
    End Function

    Shared Sub Main(ByVal args() As String)
        Dim bufferBlock = New BufferBlock(Of Integer)()

        ' Post several messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
        Next i

        ' Output:
        '   0
        '   1
        '   2

        ' Post more messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        Dim value As Integer
        Do While bufferBlock.TryReceive(value)
            Console.WriteLine(value)
        Loop

        ' Output:
        '   0
        '   1
        '   2

        ' Write to and read from the message block concurrently.
        Dim post01 = Task.Run(Sub()
                                  bufferBlock.Post(0)
                                  bufferBlock.Post(1)
                              End Sub)
        Dim receive = Task.Run(Sub()
                                   For i As Integer = 0 To 2
                                       Console.WriteLine(bufferBlock.Receive())
                                   Next i
                               End Sub)
        Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
        Task.WaitAll(post01, receive, post2)

        ' Output:
        '   0
        '   1
        '   2

        ' Demonstrate asynchronous dataflow operations.
        AsyncSendReceive(bufferBlock).Wait()
    End Sub

End Class

次のステップ

この例は、メッセージ ブロックからの読み取りとメッセージ ブロックへの書き込みを直接行う方法を示しています。 データフロー ブロックを接続して、データフロー ブロックのリニア シーケンスである "パイプライン" を作成するか、またはデータフロー ブロックのグラフである "ネットワーク" を作成することもできます。 パイプラインまたはネットワークでは、データが使用可能になると、ソースはターゲットに非同期的にデータを伝達します。 基本的なデータフロー パイプラインを作成する例については、「Walkthrough: Creating a Dataflow Pipeline (チュートリアル: データフロー パイプラインの作成)」を参照してください。 さらに複雑なデータフロー ネットワークを作成する例については、「Walkthrough: Using Dataflow in a Windows Forms Application (チュートリアル: Windows フォーム アプリケーションでのデータフローの使用)」を参照してください。

参照