Nasıl yapılır: Veri Akışı bloğundan ileti yazma ve okuma

Bu makalede, bir veri akışı bloğuna ileti yazmak ve bu bloktan iletileri okumak için Görev Paralel Kitaplığı (TPL) Veri Akışı Kitaplığı'nın nasıl kullanılacağı açıklanmaktadır. TPL Veri Akışı Kitaplığı, bir veri akışı bloğuna ileti yazmak ve bu bloktan iletileri okumak için hem zaman uyumlu hem de zaman uyumsuz yöntemler sağlar. Bu makalede sınıfın nasıl kullanıldığı gösterilmektedir System.Threading.Tasks.Dataflow.BufferBlock<T> . sınıfı iletileri BufferBlock<T> arabelleğe alır ve hem ileti kaynağı hem de ileti hedefi olarak davranır.

Not

TPL Veri Akışı Kitaplığı ( System.Threading.Tasks.Dataflow ad alanı) .NET ile dağıtılmaz. Ad alanını System.Threading.Tasks.Dataflow Visual Studio'ya yüklemek için projenizi açın, Proje menüsünden NuGet Paketlerini Yönet'i seçin ve çevrimiçi ortamda System.Threading.Tasks.Dataflow paketi arayın. Alternatif olarak, .NET Core CLI kullanarak yüklemek için komutunu çalıştırındotnet add package System.Threading.Tasks.Dataflow.

Zaman uyumlu yazma ve okuma

Aşağıdaki örnek, bir BufferBlock<T> veri akışı bloğuna yazmak için yöntemini ve Receive aynı nesneden okuma yöntemini kullanırPost.

var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(bufferBlock.Receive());
}

// Output:
//   0
//   1
//   2
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

' Output:
'   0
'   1
'   2

Aşağıdaki örnekte gösterildiği gibi bir veri akışı bloğundan okumak için yöntemini de kullanabilirsiniz TryReceive . TryReceive yöntemi geçerli iş parçacığını engellemez ve verileri ara sıra yokladığınızda kullanışlıdır.

// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
    bufferBlock.Post(i);
}

// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
    Console.WriteLine(value);
}

// Output:
//   0
//   1
//   2
' Post more messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
    Console.WriteLine(value)
Loop

' Output:
'   0
'   1
'   2

Post yöntemi zaman uyumlu olarak davrandığından, BufferBlock<T> önceki örneklerdeki nesne ikinci döngü verileri okumadan önce tüm verileri alır. Aşağıdaki örnek, ileti bloğundan eşzamanlı olarak okumak ve ileti bloğuna yazmak için kullanarak Task.WhenAll(Task[]) ilk örneği genişletir. WhenAll Eşzamanlı olarak yürütülen tüm zaman uyumsuz işlemleri beklediğinden, değerler nesneye BufferBlock<T> belirli bir sırada yazılmaz.

// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(bufferBlock.Receive());
    }
});
var post2 = Task.Run(() =>
{
    bufferBlock.Post(2);
});

await Task.WhenAll(post01, receive, post2);

// Output:
//   0
//   1
//   2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
                          bufferBlock.Post(0)
                          bufferBlock.Post(1)
                      End Sub)
Dim receive = Task.Run(Sub()
                           For i As Integer = 0 To 2
                               Console.WriteLine(bufferBlock.Receive())
                           Next i
                       End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)

' Output:
'   0
'   1
'   2

Zaman uyumsuz yazma ve okuma

Aşağıdaki örnek, bir BufferBlock<T> nesneye zaman uyumsuz olarak yazmak için yöntemini ve ReceiveAsync aynı nesneden zaman uyumsuz olarak okumak için yöntemini kullanırSendAsync. Bu örnek, zaman uyumsuz olarak hedef bloka veri göndermek ve hedef bloktan veri okumak için zaman uyumsuz ve await işleçlerini (Async ve Await in Visual Basic) kullanır. yöntemi SendAsync , iletileri ertelemek için bir veri akışı bloğunu etkinleştirmeniz gerektiğinde kullanışlıdır. Bu ReceiveAsync yöntem, veriler kullanılabilir olduğunda veriler üzerinde işlem yapmak istediğinizde kullanışlıdır. İletilerin ileti blokları arasında nasıl yayılma hakkında daha fazla bilgi için Veri Akışında İleti Geçirme bölümüne bakın.

// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
    await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(await bufferBlock.ReceiveAsync());
}

// Output:
//   0
//   1
//   2
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
    await bufferBlock.SendAsync(i)
Next i

' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i

' Output:
'   0
'   1
'   2

Tam bir örnek

Aşağıdaki örnekte bu makalenin tüm kodları gösterilmektedir.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
    // Demonstrates asynchronous dataflow operations.
    static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
    {
        // Post more messages to the block asynchronously.
        for (int i = 0; i < 3; i++)
        {
            await bufferBlock.SendAsync(i);
        }

        // Asynchronously receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(await bufferBlock.ReceiveAsync());
        }

        // Output:
        //   0
        //   1
        //   2
    }

    static async Task Main()
    {
        var bufferBlock = new BufferBlock<int>();

        // Post several messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine(bufferBlock.Receive());
        }

        // Output:
        //   0
        //   1
        //   2

        // Post more messages to the block.
        for (int i = 0; i < 3; i++)
        {
            bufferBlock.Post(i);
        }

        // Receive the messages back from the block.
        while (bufferBlock.TryReceive(out int value))
        {
            Console.WriteLine(value);
        }

        // Output:
        //   0
        //   1
        //   2

        // Write to and read from the message block concurrently.
        var post01 = Task.Run(() =>
        {
            bufferBlock.Post(0);
            bufferBlock.Post(1);
        });
        var receive = Task.Run(() =>
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine(bufferBlock.Receive());
            }
        });
        var post2 = Task.Run(() =>
        {
            bufferBlock.Post(2);
        });

        await Task.WhenAll(post01, receive, post2);

        // Output:
        //   0
        //   1
        //   2

        // Demonstrate asynchronous dataflow operations.
        await AsyncSendReceive(bufferBlock);
    }
}
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
    ' Demonstrates asynchronous dataflow operations.
    Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
        ' Post more messages to the block asynchronously.
        For i As Integer = 0 To 2
            await bufferBlock.SendAsync(i)
        Next i

        ' Asynchronously receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(await bufferBlock.ReceiveAsync())
        Next i

        ' Output:
        '   0
        '   1
        '   2
    End Function

    Shared Sub Main(ByVal args() As String)
        Dim bufferBlock = New BufferBlock(Of Integer)()

        ' Post several messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
        Next i

        ' Output:
        '   0
        '   1
        '   2

        ' Post more messages to the block.
        For i As Integer = 0 To 2
            bufferBlock.Post(i)
        Next i

        ' Receive the messages back from the block.
        Dim value As Integer
        Do While bufferBlock.TryReceive(value)
            Console.WriteLine(value)
        Loop

        ' Output:
        '   0
        '   1
        '   2

        ' Write to and read from the message block concurrently.
        Dim post01 = Task.Run(Sub()
                                  bufferBlock.Post(0)
                                  bufferBlock.Post(1)
                              End Sub)
        Dim receive = Task.Run(Sub()
                                   For i As Integer = 0 To 2
                                       Console.WriteLine(bufferBlock.Receive())
                                   Next i
                               End Sub)
        Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
        Task.WaitAll(post01, receive, post2)

        ' Output:
        '   0
        '   1
        '   2

        ' Demonstrate asynchronous dataflow operations.
        AsyncSendReceive(bufferBlock).Wait()
    End Sub

End Class

Sonraki adımlar

Bu örnekte doğrudan ileti bloğundan okuma ve ileti bloğuna yazma işlemi gösterilmektedir. Veri akışı bloklarını, veri akışı bloklarının doğrusal dizileri olan işlem hatlarına veya veri akışı bloklarının grafikleri olan ağlara da bağlayabilirsiniz. Bir ardışık düzen veya ağda, kaynaklar, veriler kullanılabilir oldukça zaman uyumsuz olarak hedeflere yayarlar. Temel bir veri akışı işlem hattı oluşturan bir örnek için bkz . İzlenecek Yol: Veri Akışı İşlem Hattı Oluşturma. Daha karmaşık bir veri akışı ağı oluşturan bir örnek için bkz . İzlenecek Yol: Windows Forms Uygulamasında Veri Akışını Kullanma.

Ayrıca bkz.