方法:プロデューサー/コンシューマーのデータフロー パターンを実装する

この記事では、TPL データフロー ライブラリを使用して、プロデューサー/コンシューマー パターンを実装する方法について説明します。 このパターンでは、"プロデューサー" がメッセージをメッセージ ブロックに送信し、"コンシューマー" がそのブロックからメッセージを読み取ります。

注意

TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow を実行します。

次の例では、データフローを使用する基本的なプロデューサー/コンシューマー モデルを示します。 Produce メソッドは、データのランダム バイトを含む配列を System.Threading.Tasks.Dataflow.ITargetBlock<TInput> オブジェクトに書き込み、Consume メソッドは System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> オブジェクトからバイトを読み取ります。 派生型ではなく、ISourceBlock<TOutput>ITargetBlock<TInput> のインターフェイス上で動作することで、さまざまなデータフロー ブロック型上で動作する再利用可能なコードを記述できます。 この例では、BufferBlock<T> クラスを使用します。 BufferBlock<T> クラスはソース ブロックとターゲット ブロックの両方として機能するため、プロデューサーおよびコンシューマーは共有されたオブジェクトを使用してデータを転送できます。

Produce メソッドはループ内で Post メソッドを呼び出し、ターゲット ブロックにデータを同期的に書き込みます。 Produce メソッドはターゲット ブロックにすべてのデータを書き込んだ後、Complete メソッドを呼び出して、このブロックに使用可能なデータが追加されないように指示します。 Consume メソッドでは、async 演算子と await 演算子 (Visual Basic では AsyncAwait) を使用して、ISourceBlock<TOutput> オブジェクトから受信した合計バイト数を非同期的に計算します。 非同期的に動作するために、Consume メソッドは OutputAvailableAsync メソッドを呼び出して、ソース ブロックに使用可能なデータがあるときとソース ブロックに使用可能なデータが追加されなくなったときに、通知を受信します。

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

信頼性の高いプログラミング

上記の例では、1 つのコンシューマーのみを使用してソース データを処理します。 アプリケーションに複数のコンシューマーがある場合は、次の例に示すように、TryReceive メソッドを使用してソース ブロックからデータを読み取ります。

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

TryReceive メソッドは、データを使用できないときに False を返します。 複数のコンシューマーがソース ブロックに同時にアクセスする必要がある場合、この仕組みが、OutputAvailableAsync の呼び出し後もデータを使用可能なことを保証します。

関連項目