Практическое руководство. Реализация шаблона потока данных производителя-потребителя

Из этой статьи вы узнаете, как использовать библиотеки потоков данных TPL для реализации шаблона "производитель-получатель". В этом шаблоне производитель отправляет сообщения в блок сообщений, а потребитель считывает сообщения из этого блока.

Примечание.

Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не поставляется с .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Проект и выполните поиск пакета System.Threading.Tasks.Dataflow в Интернете. Вы также можете установить его, выполнив в .NET Core CLI команду dotnet add package System.Threading.Tasks.Dataflow.

Пример

В следующем примере показана базовая модель производителя-потребителя, использующая поток данных. Метод Produce записывает массивы, содержащие случайные байты данных, в объект System.Threading.Tasks.Dataflow.ITargetBlock<TInput>, а метод Consume выполняет чтение байтов из объекта System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Используя интерфейсы ISourceBlock<TOutput> и ITargetBlock<TInput> вместо их производных типов, можно создавать пригодный для повторного использования код, который может работать с различными типами блоков потока данных. В этом примере используется класс BufferBlock<T>. Поскольку класс BufferBlock<T> действует и как блок источника, и как целевой блок, потребитель и производитель могут использовать общий объект для передачи данных.

Метод Produce вызывает метод Post в цикле для синхронной записи данных в целевой блок. После того, как метод Produce записывает все данные в целевой блок, он вызывает метод Complete, чтобы указать, что у этого блока никогда не будет дополнительных доступных данных. Метод Consume использует операторы async и await (Async и Await в Visual Basic) для асинхронного вычисления общего числа байтов, полученных от объекта ISourceBlock<TOutput>. Для асинхронной работы метод Consume вызывает метод OutputAvailableAsync, чтобы получать уведомления, если блок источника получит доступные данные и если у блока источника никогда не будет дополнительных доступных данных.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Отказоустойчивость

В предыдущем примере используется только один получатель для обработки исходных данных. При наличии нескольких получателей в приложении следует использовать метод TryReceive для чтения данных из блока источника, как показано в следующем примере.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Метод TryReceive возвращает False, когда нет доступных данных. Когда несколько потребителей должны использовать блок источника параллельно, этот механизм гарантирует, что данные все еще будут доступны после вызова OutputAvailableAsync.

См. также