Procédure : utiliser des tableaux de collections de blocage dans un pipeline

L’exemple suivant montre comment utiliser des tableaux d’objets System.Collections.Concurrent.BlockingCollection<T> avec des méthodes statiques telles que TryAddToAny et TryTakeFromAny pour implémenter le transfert de données rapide et flexible entre des composants.

Exemple

L’exemple suivant montre une implémentation de pipeline de base dans laquelle chaque objet prend simultanément des données dans la collection d’entrée, les transforme et les passe à la collection de sortie.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class PipeLineDemo
{
   public static void Main()
   {
      CancellationTokenSource cts = new CancellationTokenSource();

      // Start up a UI thread for cancellation.
      Task.Run(() =>
          {
              if(Console.ReadKey(true).KeyChar == 'c')
                  cts.Cancel();
          });

      //Generate some source data.
      BlockingCollection<int>[] sourceArrays = new BlockingCollection<int>[5];
      for(int i = 0; i < sourceArrays.Length; i++)
          sourceArrays[i] = new BlockingCollection<int>(500);
      Parallel.For(0, sourceArrays.Length * 500, (j) =>
                          {
                              int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
                              if(k >=0)
                                  Console.WriteLine("added {0} to source data", j);
                          });

      foreach (var arr in sourceArrays)
          arr.CompleteAdding();

      // First filter accepts the ints, keeps back a small percentage
      // as a processing fee, and converts the results to decimals.
      var filter1 = new PipelineFilter<int, decimal>
      (
          sourceArrays,
          (n) => Convert.ToDecimal(n * 0.97),
          cts.Token,
          "filter1"
       );

      // Second filter accepts the decimals and converts them to
      // System.Strings.
      var filter2 = new PipelineFilter<decimal, string>
      (
          filter1.m_output,
          (s) => String.Format("{0}", s),
          cts.Token,
          "filter2"
       );

      // Third filter uses the constructor with an Action<T>
      // that renders its output to the screen,
      // not a blocking collection.
      var filter3 = new PipelineFilter<string, string>
      (
          filter2.m_output,
          (s) => Console.WriteLine("The final result is {0}", s),
          cts.Token,
          "filter3"
       );

       // Start up the pipeline!
      try
      {
          Parallel.Invoke(
                       () => filter1.Run(),
                       () => filter2.Run(),
                       () => filter3.Run()
                   );
      }
      catch (AggregateException ae) {
          foreach(var ex in ae.InnerExceptions)
              Console.WriteLine(ex.Message + ex.StackTrace);
      }
      finally {
         cts.Dispose();
      }
      // You will need to press twice if you ran to the end:
      // once for the cancellation thread, and once for this thread.
      Console.WriteLine("Press any key.");
      Console.ReadKey(true);
  }

   class PipelineFilter<TInput, TOutput>
   {
      Func<TInput, TOutput> m_processor = null;
      public BlockingCollection<TInput>[] m_input;
      public BlockingCollection<TOutput>[] m_output = null;
      Action<TInput> m_outputProcessor = null;
      CancellationToken m_token;
      public string Name { get; private set; }

      public PipelineFilter(
          BlockingCollection<TInput>[] input,
          Func<TInput, TOutput> processor,
          CancellationToken token,
          string name)
      {
          m_input = input;
          m_output = new BlockingCollection<TOutput>[5];
          for (int i = 0; i < m_output.Length; i++)
              m_output[i] = new BlockingCollection<TOutput>(500);

          m_processor = processor;
          m_token = token;
          Name = name;
      }

      // Use this constructor for the final endpoint, which does
      // something like write to file or screen, instead of
      // pushing to another pipeline filter.
      public PipelineFilter(
          BlockingCollection<TInput>[] input,
          Action<TInput> renderer,
          CancellationToken token,
          string name)
      {
          m_input = input;
          m_outputProcessor = renderer;
          m_token = token;
          Name = name;
      }

      public void Run()
      {
          Console.WriteLine("{0} is running", this.Name);
          while (!m_input.All(bc => bc.IsCompleted) && !m_token.IsCancellationRequested)
          {
              TInput receivedItem;
              int i = BlockingCollection<TInput>.TryTakeFromAny(
                  m_input, out receivedItem, 50, m_token);
              if ( i >= 0)
              {
                  if (m_output != null) // we pass data to another blocking collection
                  {
                      TOutput outputItem = m_processor(receivedItem);
                      BlockingCollection<TOutput>.AddToAny(m_output, outputItem);
                      Console.WriteLine("{0} sent {1} to next", this.Name, outputItem);
                  }
                  else // we're an endpoint
                  {
                      m_outputProcessor(receivedItem);
                  }
              }
              else
                {
                    Console.WriteLine("Unable to retrieve data from previous filter");
                }
            }
          if (m_output != null)
          {
              foreach (var bc in m_output) bc.CompleteAdding();
          }
      }
   }
}
Imports System.Collections
Imports System.Collections.Concurrent
Imports System.Collections.Generic
Imports System.Linq
Imports System.Text
Imports System.Threading
Imports System.Threading.Tasks

Namespace BlockingCollectionPipeline
    Class PipeLineDemo
        Public Shared Sub Main()
            Dim cts As New CancellationTokenSource()

            ' Start up a UI thread for cancellation.
            Task.Factory.StartNew(Sub()

                                      If (Console.ReadKey().KeyChar = "c"c) Then
                                          cts.Cancel()
                                      End If
                                  End Sub)
            'Generate some source data.
            Dim sourceArrays() As BlockingCollection(Of Integer)
            ReDim sourceArrays(5)
            For i As Integer = 0 To sourceArrays.Length - 1
                sourceArrays(i) = New BlockingCollection(Of Integer)(500)
            Next

            Parallel.For(0, sourceArrays.Length * 500, Sub(j)

                                                           Dim k = BlockingCollection(Of Integer).TryAddToAny(sourceArrays, j)
                                                           If (k >= 0) Then
                                                               Console.WriteLine("added {0} to source data", j)
                                                           End If
                                                       End Sub)

            For Each arr In sourceArrays
                arr.CompleteAdding()
            Next

            ' First filter accepts the ints, keeps back a small percentage
            ' as a processing fee, and converts the results to decimals.
            Dim filter1 = New PipelineFilter(Of Integer, Decimal)(
            sourceArrays,
            Function(n)
                Return Convert.ToDecimal(n * 0.97)
            End Function,
                        cts.Token,
                        "filter1"
             )
            ' Second filter accepts the decimals and converts them to 
            ' System.Strings.
            Dim filter2 = New PipelineFilter(Of Decimal, String)(
            filter1.m_output,
            Function(s) (String.Format("{0}", s)),
            cts.Token,
            "filter2"
             )
            ' Third filter uses the constructor with an Action<T>
            ' that renders its output to the screen, 
            ' not a blocking collection.
            Dim filter3 = New PipelineFilter(Of String, String)(
            filter2.m_output,
            Sub(s) Console.WriteLine("The final result is {0}", s),
            cts.Token,
            "filter3"
             )
            ' Start up the pipeline!
            Try

                Parallel.Invoke(
                             Sub() filter1.Run(),
                             Sub() filter2.Run(),
                             Sub() filter3.Run()
                         )

            Catch ae As AggregateException
                For Each ex In ae.InnerExceptions
                    Console.WriteLine(ex.Message + ex.StackTrace)
                Next
            Finally
                cts.Dispose()
            End Try

            ' You will need to press twice if you ran to the end:
            ' once for the cancellation thread, and once for this thread.
            Console.WriteLine("Press any key.")
            Console.ReadKey()
        End Sub
    End Class
    class PipelineFilter(Of TInput, TOutput)

        Private m_processor As Func(Of TInput, TOutput) = Nothing
        Public m_input() As BlockingCollection(Of TInput) = Nothing
        Public m_output() As BlockingCollection(Of TOutput) = Nothing
        Private m_outputProcessor As Action(Of TInput) = Nothing
        Private m_token As CancellationToken
        Public Name As String
        Public Sub New(ByVal input() As BlockingCollection(Of TInput),
                ByVal processor As Func(Of TInput, TOutput),
                ByVal token As CancellationToken,
                ByVal _name As String)

            m_input = input
            '  m_output = New BlockingCollection(Of TOutput)()
            ReDim m_output(5)
            For i As Integer = 0 To m_output.Length - 1
                m_output(i) = New BlockingCollection(Of TOutput)(500)
                m_processor = processor
                m_token = token
                name = _name
            Next
        End Sub

        ' Use this constructor for the final endpoint, which does
        ' something like write to file or screen, instead of 
        ' pushing to another pipeline filter.
        Public Sub New(ByVal input() As BlockingCollection(Of TInput),
             ByVal renderer As Action(Of TInput),
           ByVal token As CancellationToken,
            ByVal _name As String)

            m_input = input
            m_outputProcessor = renderer
            m_token = token
            name = _name
        End Sub
        Public Sub Run()

            Console.WriteLine("{0} is running", Me.Name)
            While ((m_input.All(Function(bc) bc.IsCompleted) = False) And m_token.IsCancellationRequested = False)

                Dim receivedItem As TInput
                Dim i As Integer = BlockingCollection(Of TInput).TryTakeFromAny(
                        m_input, receivedItem, 50, m_token)
                If (i >= 0) Then

                    If (Not m_output Is Nothing) Then ' we pass data to another blocking collection

                        Dim outputItem As TOutput = m_processor(receivedItem)
                        BlockingCollection(Of TOutput).AddToAny(m_output, outputItem)
                        Console.WriteLine("{0} sent{1} to next", Me.Name, outputItem)

                    Else ' we're an endpoint

                        m_outputProcessor(receivedItem)
                    End If

                else
                    Console.WriteLine("Unable to retrieve data from previous filter")
                End If
            End While
            If (Not m_output Is Nothing) Then

                For Each bc In m_output
                    bc.CompleteAdding()
                Next

            End If
        End Sub
    End Class
End Namespace

Voir aussi