Postupy: Implementace vzoru toku dat producenta a příjemce

V tomto článku se dozvíte, jak použít knihovnu toku dat TPL k implementaci vzoru producenta a příjemce. V tomto vzoru producent odesílá zprávy do bloku zpráv a příjemce čte zprávy z tohoto bloku.

Poznámka:

Knihovna toku dat TPL ( System.Threading.Tasks.Dataflow obor názvů) není distribuována s .NET. Pokud chcete nainstalovat System.Threading.Tasks.Dataflow obor názvů v sadě Visual Studio, otevřete projekt, v nabídce Projekt zvolte Spravovat balíčky NuGet a vyhledejte System.Threading.Tasks.Dataflow balíček online. Pokud ho chcete nainstalovat pomocí rozhraní příkazového řádku .NET Core, spusťte dotnet add package System.Threading.Tasks.Dataflow.

Příklad

Následující příklad ukazuje základní model producent-příjemce, který používá tok dat. Metoda Produce zapisuje pole obsahující náhodné bajty dat do System.Threading.Tasks.Dataflow.ITargetBlock<TInput> objektu a Consume metoda čte bajty z objektu System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> . Díky tomu, že místo jejich odvozených typů funguje na ISourceBlock<TOutput>ITargetBlock<TInput> rozhraních, můžete napsat opakovaně použitelný kód, který může fungovat na různých typech bloků toku dat. Tento příklad používá BufferBlock<T> třídu. Vzhledem k tomu, že BufferBlock<T> třída funguje jako zdrojový blok i jako cílový blok, může producent a příjemce k přenosu dat použít sdílený objekt.

Metoda Produce volá metodu Post ve smyčce pro synchronní zápis dat do cílového bloku. Jakmile Produce metoda zapíše všechna data do cílového bloku, zavolá metodu Complete , která označuje, že blok nebude mít nikdy k dispozici další data. Metoda používá asynchronní operátory a operátory await (Async a Await v jazyce Visual Basic) k asynchronnímu výpočtu celkového počtu bajtů přijatých z objektuISourceBlock<TOutput>.Consume Aby fungovala asynchronně, Consume metoda volá metodu OutputAvailableAsync , aby obdržela oznámení, když zdrojový blok obsahuje data k dispozici a kdy zdrojový blok nikdy nebude mít k dispozici další data.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Robustní programování

Předchozí příklad používá ke zpracování zdrojových dat pouze jednoho příjemce. Pokud máte ve své aplikaci více příjemců, použijte metodu TryReceive ke čtení dat ze zdrojového bloku, jak je znázorněno v následujícím příkladu.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Metoda TryReceive vrátí False , pokud nejsou k dispozici žádná data. Pokud více příjemců musí přistupovat ke zdrojovému bloku souběžně, tento mechanismus zaručuje, že data jsou stále k dispozici po volání OutputAvailableAsync.

Viz také