How to: Use Arrays of Blocking Collections in a Pipeline

The following example shows how to use arrays of System.Collections.Concurrent.BlockingCollection<T> objects with static methods such as TryAddToAny and TryTakeFromAny to implement fast and flexible data transfer between components.

Example

The following example demonstrates a basic pipeline implementation in which each object is concurrently taking data from the input collection, transforming it, and passing it to the output collection.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;


class PipeLineDemo
{
   public static void Main()
   {
      CancellationTokenSource cts = new CancellationTokenSource();

      // Start up a UI thread for cancellation.
      Task.Run(() =>
          {
              if(Console.ReadKey(true).KeyChar == 'c')
                  cts.Cancel();
          });

      //Generate some source data.
      BlockingCollection<int>[] sourceArrays = new BlockingCollection<int>[5];
      for(int i = 0; i < sourceArrays.Length; i++)
          sourceArrays[i] = new BlockingCollection<int>(500);
      Parallel.For(0, sourceArrays.Length * 500, (j) =>
                          {
                              int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
                              if(k >=0)
                                  Console.WriteLine("added {0} to source data", j);
                          });

      foreach (var arr in sourceArrays)
          arr.CompleteAdding();

      // First filter accepts the ints, keeps back a small percentage
      // as a processing fee, and converts the results to decimals.
      var filter1 = new PipelineFilter<int, decimal>
      (
          sourceArrays,
          (n) => Convert.ToDecimal(n * 0.97),
          cts.Token,
          "filter1"
       );

      // Second filter accepts the decimals and converts them to
      // System.Strings.
      var filter2 = new PipelineFilter<decimal, string>
      (
          filter1.m_output,
          (s) => String.Format("{0}", s),
          cts.Token,
          "filter2"
       );

      // Third filter uses the constructor with an Action<T>
      // that renders its output to the screen,
      // not a blocking collection.
      var filter3 = new PipelineFilter<string, string>
      (
          filter2.m_output,
          (s) => Console.WriteLine("The final result is {0}", s),
          cts.Token,
          "filter3"
       );

       // Start up the pipeline!
      try
      {
          Parallel.Invoke(
                       () => filter1.Run(),
                       () => filter2.Run(),
                       () => filter3.Run()
                   );
      }
      catch (AggregateException ae) {
          foreach(var ex in ae.InnerExceptions)
              Console.WriteLine(ex.Message + ex.StackTrace);
      }
      finally {
         cts.Dispose();
      }
      // You will need to press twice if you ran to the end:
      // once for the cancellation thread, and once for this thread.
      Console.WriteLine("Press any key.");
      Console.ReadKey(true);
  }

   class PipelineFilter<TInput, TOutput>
   {
      Func<TInput, TOutput> m_processor = null;
      public BlockingCollection<TInput>[] m_input;
      public BlockingCollection<TOutput>[] m_output = null;
      Action<TInput> m_outputProcessor = null;
      CancellationToken m_token;
      public string Name { get; private set; }

      public PipelineFilter(
          BlockingCollection<TInput>[] input,
          Func<TInput, TOutput> processor,
          CancellationToken token,
          string name)
      {
          m_input = input;
          m_output = new BlockingCollection<TOutput>[5];
          for (int i = 0; i < m_output.Length; i++)
              m_output[i] = new BlockingCollection<TOutput>(500);

          m_processor = processor;
          m_token = token;
          Name = name;
      }

      // Use this constructor for the final endpoint, which does
      // something like write to file or screen, instead of
      // pushing to another pipeline filter.
      public PipelineFilter(
          BlockingCollection<TInput>[] input,
          Action<TInput> renderer,
          CancellationToken token,
          string name)
      {
          m_input = input;
          m_outputProcessor = renderer;
          m_token = token;
          Name = name;
      }

      public void Run()
      {
          Console.WriteLine("{0} is running", this.Name);
          while (!m_input.All(bc => bc.IsCompleted) && !m_token.IsCancellationRequested)
          {
              TInput receivedItem;
              int i = BlockingCollection<TInput>.TryTakeFromAny(
                  m_input, out receivedItem, 50, m_token);
              if ( i >= 0)
              {
                  if (m_output != null) // we pass data to another blocking collection
                  {
                      TOutput outputItem = m_processor(receivedItem);
                      BlockingCollection<TOutput>.AddToAny(m_output, outputItem);
                      Console.WriteLine("{0} sent {1} to next", this.Name, outputItem);
                  }
                  else // we're an endpoint
                  {
                      m_outputProcessor(receivedItem);
                  }
              }
              else
                  Console.WriteLine("Unable to retrieve data from previous filter");
          }
          if (m_output != null)
          {
              foreach (var bc in m_output) bc.CompleteAdding();
          }
      }
   }
}
Imports System
Imports System.Collections
Imports System.Collections.Concurrent
Imports System.Collections.Generic
Imports System.Linq
Imports System.Text
Imports System.Threading
Imports System.Threading.Tasks

Namespace BlockingCollectionPipeline
    Class PipeLineDemo
        Public Shared Sub Main()
            Dim cts As New CancellationTokenSource()

            ' Start up a UI thread for cancellation.
            Task.Factory.StartNew(Sub()

                                      If (Console.ReadKey().KeyChar = "c"c) Then
                                          cts.Cancel()
                                      End If
                                  End Sub)
            'Generate some source data.
            Dim sourceArrays() As BlockingCollection(Of Integer)
            ReDim sourceArrays(5)
            For i As Integer = 0 To sourceArrays.Length - 1
                sourceArrays(i) = New BlockingCollection(Of Integer)(500)
            Next

            Parallel.For(0, sourceArrays.Length * 500, Sub(j)

                                                           Dim k = BlockingCollection(Of Integer).TryAddToAny(sourceArrays, j)
                                                           If (k >= 0) Then
                                                               Console.WriteLine("added {0} to source data", j)
                                                           End If
                                                       End Sub)

            For Each arr In sourceArrays
                arr.CompleteAdding()
            Next

            ' First filter accepts the ints, keeps back a small percentage
            ' as a processing fee, and converts the results to decimals.
            Dim filter1 = New PipelineFilter(Of Integer, Decimal)(
            sourceArrays,
            Function(n)
                Return Convert.ToDecimal(n * 0.97)
            End Function,
                        cts.Token,
                        "filter1"
             )
            ' Second filter accepts the decimals and converts them to 
            ' System.Strings.
            Dim filter2 = New PipelineFilter(Of Decimal, String)(
            filter1.m_output,
            Function(s) (String.Format("{0}", s)),
            cts.Token,
            "filter2"
             )
            ' Third filter uses the constructor with an Action<T>
            ' that renders its output to the screen, 
            ' not a blocking collection.
            Dim filter3 = New PipelineFilter(Of String, String)(
            filter2.m_output,
            Sub(s) Console.WriteLine("The final result is {0}", s),
            cts.Token,
            "filter3"
             )
            ' Start up the pipeline!
            Try

                Parallel.Invoke(
                             Sub() filter1.Run(),
                             Sub() filter2.Run(),
                             Sub() filter3.Run()
                         )

            Catch ae As AggregateException
                For Each ex In ae.InnerExceptions
                    Console.WriteLine(ex.Message + ex.StackTrace)
                Next
            Finally
                cts.Dispose()
            End Try
          
            ' You will need to press twice if you ran to the end:
            ' once for the cancellation thread, and once for this thread.
            Console.WriteLine("Press any key.")
            Console.ReadKey()
        End Sub
    End Class
        class PipelineFilter(Of TInput, TOutput)

        Private m_processor As Func(Of TInput, TOutput) = Nothing
        Public m_input() As BlockingCollection(Of TInput) = Nothing
        Public m_output() As BlockingCollection(Of TOutput) = Nothing
        Private m_outputProcessor As Action(Of TInput) = Nothing
        Private m_token As CancellationToken
        Public Name As String
        Public Sub New(ByVal input() As BlockingCollection(Of TInput),
                ByVal processor As Func(Of TInput, TOutput),
                ByVal token As CancellationToken,
                ByVal _name As String)

            m_input = input
            '  m_output = New BlockingCollection(Of TOutput)()
            ReDim m_output(5)
            For i As Integer = 0 To m_output.Length - 1
                m_output(i) = New BlockingCollection(Of TOutput)(500)
                m_processor = processor
                m_token = token
                name = _name
            Next
        End Sub

        ' Use this constructor for the final endpoint, which does
        ' something like write to file or screen, instead of 
        ' pushing to another pipeline filter.
        Public Sub New(ByVal input() As BlockingCollection(Of TInput),
             ByVal renderer As Action(Of TInput),
           ByVal token As CancellationToken,
            ByVal _name As String)

            m_input = input
            m_outputProcessor = renderer
            m_token = token
            name = _name
        End Sub
        Public Sub Run()

            Console.WriteLine("{0} is running", Me.Name)
            While ((m_input.All(Function(bc) bc.IsCompleted) = False) And m_token.IsCancellationRequested = False)

                Dim receivedItem As TInput
                Dim i As Integer = BlockingCollection(Of TInput).TryTakeFromAny(
                        m_input, receivedItem, 50, m_token)
                If (i >= 0) Then

                    If (Not m_output Is Nothing) Then ' we pass data to another blocking collection

                        Dim outputItem As TOutput = m_processor(receivedItem)
                        BlockingCollection(Of TOutput).AddToAny(m_output, outputItem)
                        Console.WriteLine("{0} sent{1} to next", Me.Name, outputItem)

                    Else ' we're an endpoint

                        m_outputProcessor(receivedItem)
                    End If

                    else
                    Console.WriteLine("Unable to retrieve data from previous filter")
                End If
                        End While
            If (Not m_output Is Nothing) Then

                For Each bc In m_output
                    bc.CompleteAdding()
                Next

            End If
        End Sub
    End Class
End Namespace

See Also

System.Collections.Concurrent
Thread-Safe Collections