How to: Write Messages to and Read Messages from a Dataflow Block

This document describes how to use the TPL Dataflow Library to write messages to and read messages from a dataflow block. The TPL Dataflow Library provides both synchronous and asynchronous methods for writing messages to and reading messages from a dataflow block. This document uses the System.Threading.Tasks.Dataflow.BufferBlock<T> class. The BufferBlock<T> class buffers messages and behaves as both a message source and as a message target.

Tip

The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package.

Writing to and Reading from a Dataflow Block Synchronously

The following example uses the Post method to write to a BufferBlock<T> dataflow block and the Receive method to read from the same object.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
      ' Create a BufferBlock<int> object.
      Dim bufferBlock = New BufferBlock(Of Integer)()

      ' Post several messages to the block.
      For i As Integer = 0 To 2
         bufferBlock.Post(i)
      Next i

      ' Receive the messages back from the block.
      For i As Integer = 0 To 2
         Console.WriteLine(bufferBlock.Receive())
      Next i

'       Output:
'         0
'         1
'         2
'       

You can also use the TryReceive method to read from a dataflow block, as shown in the following example. The TryReceive method does not block the current thread and is useful when you occasionally poll for data.

// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
int value;
while (bufferBlock.TryReceive(out value))
{
   Console.WriteLine(value);
}

/* Output:
   0
   1
   2
 */
      ' Post more messages to the block.
      For i As Integer = 0 To 2
         bufferBlock.Post(i)
      Next i

      ' Receive the messages back from the block.
      Dim value As Integer
      Do While bufferBlock.TryReceive(value)
         Console.WriteLine(value)
      Loop

'       Output:
'         0
'         1
'         2
'       

Because the Post method acts synchronously, the BufferBlock<T> object in the previous examples receives all data before the second loop reads data. The following example extends the first example by using Invoke to read from and write to the message block concurrently. Because Invoke performs actions concurrently, the values are not written to the BufferBlock<T> object in any specific order.

// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
   {
      bufferBlock.Post(0);
      bufferBlock.Post(1);
   });
var receive = Task.Run(() => 
   {
      for (int i = 0; i < 3; i++)
      {
         Console.WriteLine(bufferBlock.Receive());
      }
   });
var post2 = Task.Run(() => 
   {
      bufferBlock.Post(2);
   });
Task.WaitAll(post01, receive, post2);

/* Sample output:
   2
   0
   1
 */
      ' Write to and read from the message block concurrently.
      Dim post01 = Task.Run(Sub()
          bufferBlock.Post(0)
          bufferBlock.Post(1)
      End Sub)
      Dim receive = Task.Run(Sub()
            For i As Integer = 0 To 2
               Console.WriteLine(bufferBlock.Receive())
            Next i
      End Sub)
      Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
      Task.WaitAll(post01, receive, post2)

'       Sample output:
'         2
'         0
'         1
'       

Writing to and Reading from a Dataflow Block Asynchronously

The following example uses the SendAsync method to asynchronously write to a BufferBlock<T> object and the ReceiveAsync method to asynchronously read from the same object. This example uses the async and await operators (Async and Await in Visual Basic) to asynchronously send data to and read data from the target block. The SendAsync method is useful when you must enable a dataflow block to postpone messages. The ReceiveAsync method is useful when you want to act on data when that data becomes available. For more information about how messages propagate among message blocks, see the section Message Passing in Dataflow.

// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
   await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(await bufferBlock.ReceiveAsync());
}

/* Output:
   0
   1
   2
 */
      ' Post more messages to the block asynchronously.
      For i As Integer = 0 To 2
         await bufferBlock.SendAsync(i)
      Next i

      ' Asynchronously receive the messages back from the block.
      For i As Integer = 0 To 2
         Console.WriteLine(await bufferBlock.ReceiveAsync())
      Next i

'       Output:
'         0
'         1
'         2
'       

A Complete Example

The following example shows the complete code for this document.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
   // Demonstrates asynchronous dataflow operations.
   static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
   {
      // Post more messages to the block asynchronously.
      for (int i = 0; i < 3; i++)
      {
         await bufferBlock.SendAsync(i);
      }

      // Asynchronously receive the messages back from the block.
      for (int i = 0; i < 3; i++)
      {
         Console.WriteLine(await bufferBlock.ReceiveAsync());
      }

      /* Output:
         0
         1
         2
       */
   }

   static void Main(string[] args)
   {
      // Create a BufferBlock<int> object.
      var bufferBlock = new BufferBlock<int>();

      // Post several messages to the block.
      for (int i = 0; i < 3; i++)
      {
         bufferBlock.Post(i);
      }

      // Receive the messages back from the block.
      for (int i = 0; i < 3; i++)
      {
         Console.WriteLine(bufferBlock.Receive());
      }

      /* Output:
         0
         1
         2
       */

      // Post more messages to the block.
      for (int i = 0; i < 3; i++)
      {
         bufferBlock.Post(i);
      }

      // Receive the messages back from the block.
      int value;
      while (bufferBlock.TryReceive(out value))
      {
         Console.WriteLine(value);
      }

      /* Output:
         0
         1
         2
       */

      // Write to and read from the message block concurrently.
      var post01 = Task.Run(() =>
         {
            bufferBlock.Post(0);
            bufferBlock.Post(1);
         });
      var receive = Task.Run(() => 
         {
            for (int i = 0; i < 3; i++)
            {
               Console.WriteLine(bufferBlock.Receive());
            }
         });
      var post2 = Task.Run(() => 
         {
            bufferBlock.Post(2);
         });
      Task.WaitAll(post01, receive, post2);

      /* Sample output:
         2
         0
         1
       */

      // Demonstrate asynchronous dataflow operations.
      AsyncSendReceive(bufferBlock).Wait();
   }
}
Imports System
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
   ' Demonstrates asynchronous dataflow operations.
   Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
      ' Post more messages to the block asynchronously.
      For i As Integer = 0 To 2
         await bufferBlock.SendAsync(i)
      Next i

      ' Asynchronously receive the messages back from the block.
      For i As Integer = 0 To 2
         Console.WriteLine(await bufferBlock.ReceiveAsync())
      Next i

'       Output:
'         0
'         1
'         2
'       
   End Function

   Shared Sub Main(ByVal args() As String)
      ' Create a BufferBlock<int> object.
      Dim bufferBlock = New BufferBlock(Of Integer)()

      ' Post several messages to the block.
      For i As Integer = 0 To 2
         bufferBlock.Post(i)
      Next i

      ' Receive the messages back from the block.
      For i As Integer = 0 To 2
         Console.WriteLine(bufferBlock.Receive())
      Next i

'       Output:
'         0
'         1
'         2
'       

      ' Post more messages to the block.
      For i As Integer = 0 To 2
         bufferBlock.Post(i)
      Next i

      ' Receive the messages back from the block.
      Dim value As Integer
      Do While bufferBlock.TryReceive(value)
         Console.WriteLine(value)
      Loop

'       Output:
'         0
'         1
'         2
'       

      ' Write to and read from the message block concurrently.
      Dim post01 = Task.Run(Sub()
          bufferBlock.Post(0)
          bufferBlock.Post(1)
      End Sub)
      Dim receive = Task.Run(Sub()
            For i As Integer = 0 To 2
               Console.WriteLine(bufferBlock.Receive())
            Next i
      End Sub)
      Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
      Task.WaitAll(post01, receive, post2)

'       Sample output:
'         2
'         0
'         1
'       

      ' Demonstrate asynchronous dataflow operations.
      AsyncSendReceive(bufferBlock).Wait()
   End Sub

End Class

Compiling the Code

Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named DataflowReadWrite.cs (DataflowReadWrite.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowReadWrite.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowReadWrite.vb

Next Steps

This example shows how to read from and write to a message block directly. You can also connect dataflow blocks to form pipelines, which are linear sequences of dataflow blocks, or networks, which are graphs of dataflow blocks. In a pipeline or network, sources asynchronously propagate data to targets as that data becomes available. For an example that creates a basic dataflow pipeline, see Walkthrough: Creating a Dataflow Pipeline. For an example that creates a more complex dataflow network, see Walkthrough: Using Dataflow in a Windows Forms Application.

See Also

Dataflow