Instrukcje: implementowanie wzorca przepływu danych producenta-konsumenta

W tym artykule dowiesz się, jak za pomocą biblioteki przepływu danych TPL zaimplementować wzorzec producenta-konsumenta. W tym wzorcu producent wysyła komunikaty do bloku komunikatów, a użytkownik odczytuje komunikaty z tego bloku.

Uwaga

Biblioteka przepływów danych TPL ( System.Threading.Tasks.Dataflow przestrzeń nazw) nie jest dystrybuowana za pomocą platformy .NET. Aby zainstalować System.Threading.Tasks.Dataflow przestrzeń nazw w programie Visual Studio, otwórz projekt, wybierz pozycję Zarządzaj pakietami NuGet z menu Project i wyszukaj pakiet w trybie online System.Threading.Tasks.Dataflow . Alternatywnie, aby zainstalować go przy użyciu interfejsu wiersza polecenia platformy .NET Core, uruchom polecenie dotnet add package System.Threading.Tasks.Dataflow.

Przykład

W poniższym przykładzie pokazano podstawowy model producent-konsument korzystający z przepływu danych. Metoda Produce zapisuje tablice zawierające losowe bajty danych do System.Threading.Tasks.Dataflow.ITargetBlock<TInput> obiektu, a Consume metoda odczytuje bajty z System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> obiektu. Działając na ISourceBlock<TOutput> interfejsach i ITargetBlock<TInput> , zamiast ich typów pochodnych, można napisać kod wielokrotnego użytku, który może działać na różnych typach bloków przepływu danych. W tym przykładzie użyto BufferBlock<T> klasy . BufferBlock<T> Ponieważ klasa działa zarówno jako blok źródłowy, jak i jako blok docelowy, producent i odbiorca mogą używać obiektu udostępnionego do transferu danych.

Metoda Produce wywołuje metodę Post w pętli, aby synchronicznie zapisywać dane w bloku docelowym. Produce Gdy metoda zapisuje wszystkie dane w bloku docelowym, wywołuje Complete metodę , aby wskazać, że blok nigdy nie będzie miał dostępnych dodatkowych danych. Metoda Consume używa operatorów asynchronicznych i await (Async i Await w Visual Basic), aby asynchronicznie obliczyć całkowitą liczbę bajtów odebranych z ISourceBlock<TOutput> obiektu. Aby działać asynchronicznie, metoda wywołuje OutputAvailableAsync metodę w celu odbierania powiadomienia, Consume gdy blok źródłowy ma dostępne dane, a kiedy blok źródłowy nigdy nie będzie miał dodatkowych danych.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Niezawodne programowanie

W poprzednim przykładzie użyto tylko jednego konsumenta do przetwarzania danych źródłowych. Jeśli masz wielu użytkowników w aplikacji, użyj TryReceive metody , aby odczytać dane z bloku źródłowego, jak pokazano w poniższym przykładzie.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Metoda TryReceive zwraca False wartość, gdy żadne dane nie są dostępne. Gdy wielu użytkowników musi jednocześnie uzyskać dostęp do bloku źródłowego, ten mechanizm gwarantuje, że dane są nadal dostępne po wywołaniu metody OutputAvailableAsync.

Zobacz też