方法:プロデューサー/コンシューマーのデータフロー パターンを実装する
この記事では、TPL データフロー ライブラリを使用して、プロデューサー/コンシューマー パターンを実装する方法について説明します。 このパターンでは、"プロデューサー" がメッセージをメッセージ ブロックに送信し、"コンシューマー" がそのブロックからメッセージを読み取ります。
注意
TPL データフロー ライブラリ (System.Threading.Tasks.Dataflow 名前空間) は、.NET と一緒には配布されません。 Visual Studio に System.Threading.Tasks.Dataflow 名前空間をインストールするには、プロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] をクリックし、System.Threading.Tasks.Dataflow
パッケージをオンラインで検索します。 または、.NET Core CLI を使ってインストールするには、dotnet add package System.Threading.Tasks.Dataflow
を実行します。
例
次の例では、データフローを使用する基本的なプロデューサー/コンシューマー モデルを示します。 Produce
メソッドは、データのランダム バイトを含む配列を System.Threading.Tasks.Dataflow.ITargetBlock<TInput> オブジェクトに書き込み、Consume
メソッドは System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> オブジェクトからバイトを読み取ります。 派生型ではなく、ISourceBlock<TOutput> と ITargetBlock<TInput> のインターフェイス上で動作することで、さまざまなデータフロー ブロック型上で動作する再利用可能なコードを記述できます。 この例では、BufferBlock<T> クラスを使用します。 BufferBlock<T> クラスはソース ブロックとターゲット ブロックの両方として機能するため、プロデューサーおよびコンシューマーは共有されたオブジェクトを使用してデータを転送できます。
Produce
メソッドはループ内で Post メソッドを呼び出し、ターゲット ブロックにデータを同期的に書き込みます。 Produce
メソッドはターゲット ブロックにすべてのデータを書き込んだ後、Complete メソッドを呼び出して、このブロックに使用可能なデータが追加されないように指示します。 Consume
メソッドでは、async 演算子と await 演算子 (Visual Basic では Async と Await) を使用して、ISourceBlock<TOutput> オブジェクトから受信した合計バイト数を非同期的に計算します。 非同期的に動作するために、Consume
メソッドは OutputAvailableAsync メソッドを呼び出して、ソース ブロックに使用可能なデータがあるときとソース ブロックに使用可能なデータが追加されなくなったときに、通知を受信します。
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
信頼性の高いプログラミング
上記の例では、1 つのコンシューマーのみを使用してソース データを処理します。 アプリケーションに複数のコンシューマーがある場合は、次の例に示すように、TryReceive メソッドを使用してソース ブロックからデータを読み取ります。
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
TryReceive メソッドは、データを使用できないときに False
を返します。 複数のコンシューマーがソース ブロックに同時にアクセスする必要がある場合、この仕組みが、OutputAvailableAsync の呼び出し後もデータを使用可能なことを保証します。
関連項目
.NET
フィードバック
https://aka.ms/ContentUserFeedback」を参照してください。
以下は間もなく提供いたします。2024 年を通じて、コンテンツのフィードバック メカニズムとして GitHub の issue を段階的に廃止し、新しいフィードバック システムに置き換えます。 詳細については、「フィードバックの送信と表示