How to: Perform Action When a Dataflow Block Receives Data

Execution dataflow block types call a user-provided delegate when they receive data. The System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>, and System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> classes are execution dataflow block types. You can use the delegate keyword (Sub in Visual Basic), Action<T>, Func<T,TResult>, or a lambda expression when you provide a work function to an execution dataflow block. This document describes how to use Func<T,TResult> and lambda expressions to perform action in execution blocks.

Tip

The TPL Dataflow Library (System.Threading.Tasks.Dataflow namespace) is not distributed with the .NET Framework 4.5. To install the System.Threading.Tasks.Dataflow namespace, open your project in Visual Studio 2012, choose Manage NuGet Packages from the Project menu, and search online for the Microsoft.Tpl.Dataflow package.

Example

The following example uses dataflow to read a file from disk and computes the number of bytes in that file that are equal to zero. It uses TransformBlock<TInput,TOutput> to read the file and compute the number of zero bytes, and ActionBlock<TInput> to print the number of zero bytes to the console. The TransformBlock<TInput,TOutput> object specifies a Func<T,TResult> object to perform work when the blocks receive data. The ActionBlock<TInput> object uses a lambda expression to print to the console the number of zero bytes that are read.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
   // Computes the number of zero bytes that the provided file
   // contains.
   static int CountBytes(string path)
   {
      byte[] buffer = new byte[1024];
      int totalZeroBytesRead = 0;
      using (var fileStream = File.OpenRead(path))
      {
         int bytesRead = 0;
         do
         {
            bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            totalZeroBytesRead += buffer.Count(b => b == 0);
         } while (bytesRead > 0);
      }

      return totalZeroBytesRead;
   }

   static void Main(string[] args)
   {
      // Create a temporary file on disk.
      string tempFile = Path.GetTempFileName();

      // Write random data to the temporary file.
      using (var fileStream = File.OpenWrite(tempFile))
      {
         Random rand = new Random();
         byte[] buffer = new byte[1024];
         for (int i = 0; i < 512; i++)
         {
            rand.NextBytes(buffer);
            fileStream.Write(buffer, 0, buffer.Length);
         }
      }

      // Create an ActionBlock<int> object that prints to the console 
      // the number of bytes read.
      var printResult = new ActionBlock<int>(zeroBytesRead =>
      {
         Console.WriteLine("{0} contains {1} zero bytes.",
            Path.GetFileName(tempFile), zeroBytesRead);
      });

      // Create a TransformBlock<string, int> object that calls the 
      // CountBytes function and returns its result.
      var countBytes = new TransformBlock<string, int>(
         new Func<string, int>(CountBytes));

      // Link the TransformBlock<string, int> object to the 
      // ActionBlock<int> object.
      countBytes.LinkTo(printResult);

      // Create a continuation task that completes the ActionBlock<int>
      // object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });

      // Post the path to the temporary file to the 
      // TransformBlock<string, int> object.
      countBytes.Post(tempFile);

      // Requests completion of the TransformBlock<string, int> object.
      countBytes.Complete();

      // Wait for the ActionBlock<int> object to print the message.
      printResult.Completion.Wait();

      // Delete the temporary file.
      File.Delete(tempFile);
   }
}

/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
   ' Computes the number of zero bytes that the provided file
   ' contains.
   Private Shared Function CountBytes(ByVal path As String) As Integer
      Dim buffer(1023) As Byte
      Dim totalZeroBytesRead As Integer = 0
      Using fileStream = File.OpenRead(path)
         Dim bytesRead As Integer = 0
         Do
            bytesRead = fileStream.Read(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
         Loop While bytesRead > 0
      End Using

      Return totalZeroBytesRead
   End Function

   Shared Sub Main(ByVal args() As String)
      ' Create a temporary file on disk.
      Dim tempFile As String = Path.GetTempFileName()

      ' Write random data to the temporary file.
      Using fileStream = File.OpenWrite(tempFile)
         Dim rand As New Random()
         Dim buffer(1023) As Byte
         For i As Integer = 0 To 511
            rand.NextBytes(buffer)
            fileStream.Write(buffer, 0, buffer.Length)
         Next i
      End Using

      ' Create an ActionBlock<int> object that prints to the console 
      ' the number of bytes read.
      Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))

      ' Create a TransformBlock<string, int> object that calls the 
      ' CountBytes function and returns its result.
      Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))

      ' Link the TransformBlock<string, int> object to the 
      ' ActionBlock<int> object.
      countBytes.LinkTo(printResult)

      ' Create a continuation task that completes the ActionBlock<int>
      ' object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(Sub() printResult.Complete())

      ' Post the path to the temporary file to the 
      ' TransformBlock<string, int> object.
      countBytes.Post(tempFile)

      ' Requests completion of the TransformBlock<string, int> object.
      countBytes.Complete()

      ' Wait for the ActionBlock<int> object to print the message.
      printResult.Completion.Wait()

      ' Delete the temporary file.
      File.Delete(tempFile)
   End Sub
End Class

' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'

Although you can provide a lambda expression to a TransformBlock<TInput,TOutput> object, this example uses Func<T,TResult> to enable other code to use the CountBytes method. The ActionBlock<TInput> object uses a lambda expression because the work to be performed is specific to this task and is not likely to be useful from other code. For more information about how lambda expressions work in the Task Parallel Library, see Lambda Expressions in PLINQ and TPL.

The section Summary of Delegate Types in the Dataflow document summarizes the delegate types that you can provide to ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput> objects. The table also specifies whether the delegate type operates synchronously or asynchronously.

Compiling the Code

Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named DataflowExecutionBlocks.cs (DataflowExecutionBlocks.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window.

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowExecutionBlocks.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowExecutionBlocks.vb

Robust Programming

This example provides a delegate of type Func<T,TResult> to the TransformBlock<TInput,TOutput> object to perform the task of the dataflow block synchronously. To enable the dataflow block to behave asynchronously, provide a delegate of type Func<TResult> to the dataflow block. When a dataflow block behaves asynchronously, the task of the dataflow block is complete only when the returned Task<TResult> object finishes. The following example modifies the CountBytes method and uses the async and await operators (Async and Await in Visual Basic) to asynchronously compute the total number of bytes that are zero in the provided file. The ReadAsync method performs file read operations asynchronously.

// Asynchronously computes the number of zero bytes that the provided file 
// contains.
static async Task<int> CountBytesAsync(string path)
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file 
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
   Dim buffer(1023) As Byte
   Dim totalZeroBytesRead As Integer = 0
   Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
      Dim bytesRead As Integer = 0
      Do
         ' Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
         totalZeroBytesRead += buffer.Count(Function(b) b = 0)
      Loop While bytesRead > 0
   End Using

   Return totalZeroBytesRead
End Function

You can also use asynchronous lambda expressions to perform action in an execution dataflow block. The following example modifies the TransformBlock<TInput,TOutput> object that is used in the previous example so that it uses a lambda expression to perform the work asynchronously.

// Create a TransformBlock<string, int> object that calls the 
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the 
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
         ' Asynchronously read from the file stream.
    Dim buffer(1023) As Byte
    Dim totalZeroBytesRead As Integer = 0
    Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
        Dim bytesRead As Integer = 0
        Do
            bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
        Loop While bytesRead > 0
    End Using
    Return totalZeroBytesRead
End Function)

See Also

Dataflow