Instrukcje: zapisywanie i odczytywanie komunikatów z bloku przepływu danych

W tym artykule opisano, jak używać biblioteki przepływu danych biblioteki równoległych zadań (TPL) do zapisywania komunikatów i odczytywania komunikatów z bloku przepływu danych. Biblioteka przepływów danych TPL udostępnia zarówno metody synchroniczne, jak i asynchroniczne do zapisywania komunikatów w bloku przepływu danych i odczytywania ich z bloku przepływu danych. W tym artykule pokazano, jak używać System.Threading.Tasks.Dataflow.BufferBlock<T> klasy . Klasa BufferBlock<T> buforuje komunikaty i zachowuje się zarówno jako źródło komunikatu, jak i element docelowy komunikatu.

Uwaga

Biblioteka przepływów danych TPL ( System.Threading.Tasks.Dataflow przestrzeń nazw) nie jest dystrybuowana za pomocą platformy .NET. Aby zainstalować System.Threading.Tasks.Dataflow przestrzeń nazw w programie Visual Studio, otwórz projekt, wybierz pozycję Zarządzaj pakietami NuGet z menu Project i wyszukaj pakiet w trybie online System.Threading.Tasks.Dataflow . Alternatywnie, aby zainstalować go przy użyciu interfejsu wiersza polecenia platformy .NET Core, uruchom polecenie dotnet add package System.Threading.Tasks.Dataflow.

Zapisywanie i odczytywanie synchronicznie

W poniższym przykładzie użyto Post metody do zapisu w BufferBlock<T> bloku przepływu danych i Receive metody odczytywania z tego samego obiektu.

var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(bufferBlock.Receive());
}

// Output:
//   0
//   1
//   2
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

' Output:
'   0
'   1
'   2

Możesz również użyć TryReceive metody do odczytu z bloku przepływu danych, jak pokazano w poniższym przykładzie. Metoda TryReceive nie blokuje bieżącego wątku i jest przydatna, gdy od czasu do czasu sondujesz dane.

// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
    Console.WriteLine(value);
}

// Output:
//   0
//   1
//   2
' Post more messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
    Console.WriteLine(value)
Loop

' Output:
'   0
'   1
'   2

Post Ponieważ metoda działa synchronicznie, BufferBlock<T> obiekt w poprzednich przykładach odbiera wszystkie dane, zanim druga pętla odczytuje dane. Poniższy przykład rozszerza pierwszy przykład, używając polecenia Task.WhenAll(Task[]) w celu jednoczesnego odczytywania i zapisywania w bloku komunikatów. Ponieważ WhenAll oczekuje na wszystkie operacje asynchroniczne, które są wykonywane współbieżnie, wartości nie są zapisywane w BufferBlock<T> obiekcie w żadnej określonej kolejności.

// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(bufferBlock.Receive());
    }
});
var post2 = Task.Run(() =>
{
    bufferBlock.Post(2);
});

await Task.WhenAll(post01, receive, post2);

// Output:
//   0
//   1
//   2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
                          bufferBlock.Post(0)
                          bufferBlock.Post(1)
                      End Sub)
Dim receive = Task.Run(Sub()
                           For i As Integer = 0 To 2
                               Console.WriteLine(bufferBlock.Receive())
                           Next i
                       End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)

' Output:
'   0
'   1
'   2

Zapisywanie i odczytywanie asynchroniczne

W poniższym przykładzie SendAsync użyto metody , aby asynchronicznie zapisywać w BufferBlock<T> obiekcie i ReceiveAsync metodę asynchronicznie odczytywać z tego samego obiektu. W tym przykładzie użyto operatorów asynchronicznych i await (Async i Await w Visual Basic), aby asynchronicznie wysyłać dane do bloku docelowego i odczytywać je. Metoda jest przydatna SendAsync , gdy należy włączyć blok przepływu danych w celu odroczenia komunikatów. Metoda jest przydatna ReceiveAsync , gdy chcesz wykonywać działania na danych, gdy te dane staną się dostępne. Aby uzyskać więcej informacji na temat propagacji komunikatów między blokami komunikatów, zobacz sekcję Message Passing in Dataflow (Przekazywanie komunikatów w przepływie danych).

// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
    await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(await bufferBlock.ReceiveAsync());
}

// Output:
//   0
//   1
//   2
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
    await bufferBlock.SendAsync(i)
Next i

' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i

' Output:
'   0
'   1
'   2

Kompletny przykład

Poniższy przykład przedstawia cały kod tego artykułu.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
    // Demonstrates asynchronous dataflow operations.
    static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
    {
        // Post more messages to the block asynchronously.
        for (int i = 0; i < 3; i++)
        {
            await bufferBlock.SendAsync(i);
        }

        // Asynchronously receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(await bufferBlock.ReceiveAsync());
        }

        // Output:
        //   0
        //   1
        //   2
    }

    static async Task Main()
    {
        var bufferBlock = new BufferBlock<int>();

        // Post several messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(bufferBlock.Receive());
        }

        // Output:
        //   0
        //   1
        //   2

        // Post more messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        while (bufferBlock.TryReceive(out int value))
        {
            Console.WriteLine(value);
        }

        // Output:
        //   0
        //   1
        //   2

        // Write to and read from the message block concurrently.
        var post01 = Task.Run(() =>
        {
            bufferBlock.Post(0);
            bufferBlock.Post(1);
        });
        var receive = Task.Run(() =>
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine(bufferBlock.Receive());
            }
        });
        var post2 = Task.Run(() =>
        {
            bufferBlock.Post(2);
        });

        await Task.WhenAll(post01, receive, post2);

        // Output:
        //   0
        //   1
        //   2

        // Demonstrate asynchronous dataflow operations.
        await AsyncSendReceive(bufferBlock);
    }
}
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
    ' Demonstrates asynchronous dataflow operations.
    Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
        ' Post more messages to the block asynchronously.
        For i As Integer = 0 To 2
            await bufferBlock.SendAsync(i)
        Next i

        ' Asynchronously receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(await bufferBlock.ReceiveAsync())
        Next i

        ' Output:
        '   0
        '   1
        '   2
    End Function

    Shared Sub Main(ByVal args() As String)
        Dim bufferBlock = New BufferBlock(Of Integer)()

        ' Post several messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
        Next i

        ' Output:
        '   0
        '   1
        '   2

        ' Post more messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        Dim value As Integer
        Do While bufferBlock.TryReceive(value)
            Console.WriteLine(value)
        Loop

        ' Output:
        '   0
        '   1
        '   2

        ' Write to and read from the message block concurrently.
        Dim post01 = Task.Run(Sub()
                                  bufferBlock.Post(0)
                                  bufferBlock.Post(1)
                              End Sub)
        Dim receive = Task.Run(Sub()
                                   For i As Integer = 0 To 2
                                       Console.WriteLine(bufferBlock.Receive())
                                   Next i
                               End Sub)
        Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
        Task.WaitAll(post01, receive, post2)

        ' Output:
        '   0
        '   1
        '   2

        ' Demonstrate asynchronous dataflow operations.
        AsyncSendReceive(bufferBlock).Wait()
    End Sub

End Class

Następne kroki

W tym przykładzie pokazano, jak bezpośrednio odczytywać dane z bloku komunikatów i zapisywać je w bloku komunikatów. Bloki przepływu danych można również połączyć z potokami formularzy, które są liniowymi sekwencjami bloków przepływu danych lub sieciami, które są grafami bloków przepływu danych. W potoku lub sieci źródła asynchronicznie propagują dane do obiektów docelowych, gdy te dane staną się dostępne. Aby zapoznać się z przykładem tworzenia podstawowego potoku przepływu danych, zobacz Przewodnik: tworzenie potoku przepływu danych. Aby zapoznać się z przykładem tworzenia bardziej złożonej sieci przepływu danych, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms.

Zobacz też