Procedura dettagliata: creazione di un tipo di blocco di flussi di dati personalizzatoWalkthrough: Creating a Custom Dataflow Block Type

Anche se la libreria di flussi di dati TPL rende disponibili diversi tipi di blocchi di flussi di dati che offrono varie funzionalità, è anche possibile creare tipi di blocchi personalizzati.Although the TPL Dataflow Library provides several dataflow block types that enable a variety of functionality, you can also create custom block types. Questo documento descrive come creare un tipo di blocco di flussi di dati che implementi un comportamento personalizzato.This document describes how to create a dataflow block type that implements custom behavior.

PrerequisitiPrerequisites

Prima di questo documento, leggere Flusso di dati.Read Dataflow before you read this document.

Nota

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET.The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow.To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. In alternativa, per installarlo usando l'interfaccia della riga di comando di .Net Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.Alternatively, to install it using the .Net Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

Definizione del blocco di flussi di dati finestra temporale scorrevoleDefining the Sliding Window Dataflow Block

Si consideri un'applicazione di flusso di dati che richiede che i valori di input vengano memorizzati nel buffer e quindi che l'output di questi avvenga in modalità finestra temporale scorrevole.Consider a dataflow application that requires that input values be buffered and then output in a sliding window manner. Per i valori di input {0, 1, 2, 3, 4, 5} e per una dimensione della finestra pari a tre, ad esempio, un blocco di flussi di dati finestra temporale scorrevole genera le matrici di output {0, 1, 2}, {1, 2, 3}, {2, 3, 4} e {3, 4, 5}.For example, for the input values {0, 1, 2, 3, 4, 5} and a window size of three, a sliding window dataflow block produces the output arrays {0, 1, 2}, {1, 2, 3}, {2, 3, 4}, and {3, 4, 5}. Le sezioni seguenti descrivono due modi per creare un tipo di blocco di flussi di dati che implementi questo comportamento personalizzato.The following sections describe two ways to create a dataflow block type that implements this custom behavior. La prima tecnica usa il metodo Encapsulate per combinare le funzionalità di un oggetto ISourceBlock<TOutput> e di un oggetto ITargetBlock<TInput> in un solo blocco di propagazione.The first technique uses the Encapsulate method to combine the functionality of an ISourceBlock<TOutput> object and an ITargetBlock<TInput> object into one propagator block. La seconda tecnica definisce una classe che deriva da IPropagatorBlock<TInput,TOutput> e combina le funzionalità esistenti per eseguire il comportamento personalizzato.The second technique defines a class that derives from IPropagatorBlock<TInput,TOutput> and combines existing functionality to perform custom behavior.

Uso del metodo Encapsulate per definire il blocco di flussi di dati finestra temporale scorrevoleUsing the Encapsulate Method to Define the Sliding Window Dataflow Block

L'esempio seguente usa il metodo Encapsulate per creare un blocco di propagazione da un'origine e da una destinazione.The following example uses the Encapsulate method to create a propagator block from a target and a source. Un blocco di propagazione consente a un blocco di origine e a un blocco di destinazione di fungere da ricevitore e mittente di dati.A propagator block enables a source block and a target block to act as a receiver and sender of data.

Questa tecnica è utile quando sono necessarie funzionalità di flusso di dati personalizzate, ma non è necessario un tipo che fornisca metodi, proprietà o campi aggiuntivi.This technique is useful when you require custom dataflow functionality, but you do not require a type that provides additional methods, properties, or fields.

// Creates a IPropagatorBlock<T, T[]> object propagates data in a 
// sliding window fashion.
public static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
{
   // Create a queue to hold messages.
   var queue = new Queue<T>();

   // The source part of the propagator holds arrays of size windowSize
   // and propagates data out to any connected targets.
   var source = new BufferBlock<T[]>();

   // The target part receives data and adds them to the queue.
   var target = new ActionBlock<T>(item =>
   {
      // Add the item to the queue.
      queue.Enqueue(item);
      // Remove the oldest item when the queue size exceeds the window size.
      if (queue.Count > windowSize) 
         queue.Dequeue();
      // Post the data in the queue to the source block when the queue size
      // equals the window size.
      if (queue.Count == windowSize) 
         source.Post(queue.ToArray());
   });

   // When the target is set to the completed state, propagate out any
   // remaining data and set the source to the completed state.
   target.Completion.ContinueWith(delegate
   {
      if (queue.Count > 0 && queue.Count < windowSize) 
         source.Post(queue.ToArray());
      source.Complete();
   });

   // Return a IPropagatorBlock<T, T[]> object that encapsulates the 
   // target and source blocks.
   return DataflowBlock.Encapsulate(target, source);
}
' Creates a IPropagatorBlock<T, T[]> object propagates data in a 
' sliding window fashion.
Public Shared Function CreateSlidingWindow(Of T)(ByVal windowSize As Integer) As IPropagatorBlock(Of T, T())
    ' Create a queue to hold messages.
    Dim queue = New Queue(Of T)()

    ' The source part of the propagator holds arrays of size windowSize
    ' and propagates data out to any connected targets.
    Dim source = New BufferBlock(Of T())()

    ' The target part receives data and adds them to the queue.
    Dim target = New ActionBlock(Of T)(Sub(item)
        ' Add the item to the queue.
        ' Remove the oldest item when the queue size exceeds the window size.
        ' Post the data in the queue to the source block when the queue size
        ' equals the window size.
        queue.Enqueue(item)
        If queue.Count > windowSize Then
            queue.Dequeue()
        End If
        If queue.Count = windowSize Then
            source.Post(queue.ToArray())
        End If
    End Sub)

    ' When the target is set to the completed state, propagate out any
    ' remaining data and set the source to the completed state.
    target.Completion.ContinueWith(Sub()
        If queue.Count > 0 AndAlso queue.Count < windowSize Then
            source.Post(queue.ToArray())
        End If
        source.Complete()
    End Sub)

    ' Return a IPropagatorBlock<T, T[]> object that encapsulates the 
    ' target and source blocks.
    Return DataflowBlock.Encapsulate(target, source)
End Function

Definire il blocco di flussi di dati finestra temporale scorrevole tramite derivazione da IPropagatorBlockDeriving from IPropagatorBlock to Define the Sliding Window Dataflow Block

L'esempio seguente illustra la classe SlidingWindowBlock.The following example shows the SlidingWindowBlock class. Questa classe deriva da IPropagatorBlock<TInput,TOutput>, quindi può fungere sia da origine che da destinazione dei dati.This class derives from IPropagatorBlock<TInput,TOutput> so that it can act as both a source and a target of data. Come nell'esempio precedente, la classe SlidingWindowBlock si basa su tipi di blocchi di flussi di dati esistenti.As in the previous example, the SlidingWindowBlock class is built on existing dataflow block types. La classe SlidingWindowBlock, tuttavia, implementa anche i metodi necessari per le interfacce ISourceBlock<TOutput>, ITargetBlock<TInput> e IDataflowBlock.However, the SlidingWindowBlock class also implements the methods that are required by the ISourceBlock<TOutput>, ITargetBlock<TInput>, and IDataflowBlock interfaces. Tutti questi metodi inoltrano lavoro ai membri di tipo blocco di flussi di dati predefiniti.These methods all forward work to the predefined dataflow block type members. Il metodo Post, ad esempio, rinvia il lavoro al membro dati m_target, che è anche un oggetto ITargetBlock<TInput>.For example, the Post method defers work to the m_target data member, which is also an ITargetBlock<TInput> object.

Questa tecnica è utile quando sono necessarie funzionalità di flusso di dati personalizzate ed è necessario anche un tipo che fornisca metodi, proprietà o campi aggiuntivi.This technique is useful when you require custom dataflow functionality, and also require a type that provides additional methods, properties, or fields. Anche la classe SlidingWindowBlock, ad esempio, deriva da IReceivableSourceBlock<TOutput> e può quindi fornire i metodi TryReceive e TryReceiveAll.For example, the SlidingWindowBlock class also derives from IReceivableSourceBlock<TOutput> so that it can provide the TryReceive and TryReceiveAll methods. La classe SlidingWindowBlock illustra anche l'estendibilità tramite la proprietà WindowSize, che recupera il numero di elementi nella finestra temporale scorrevole.The SlidingWindowBlock class also demonstrates extensibility by providing the WindowSize property, which retrieves the number of elements in the sliding window.

// Propagates data in a sliding window fashion.
public class SlidingWindowBlock<T> : IPropagatorBlock<T, T[]>, 
                                     IReceivableSourceBlock<T[]>
{
   // The size of the window.
   private readonly int m_windowSize;
   // The target part of the block.
   private readonly ITargetBlock<T> m_target;
   // The source part of the block.
   private readonly IReceivableSourceBlock<T[]> m_source;

   // Constructs a SlidingWindowBlock object.
   public SlidingWindowBlock(int windowSize)
   {
      // Create a queue to hold messages.
      var queue = new Queue<T>();

      // The source part of the propagator holds arrays of size windowSize
      // and propagates data out to any connected targets.
      var source = new BufferBlock<T[]>();

      // The target part receives data and adds them to the queue.
      var target = new ActionBlock<T>(item =>
      {
         // Add the item to the queue.
         queue.Enqueue(item);
         // Remove the oldest item when the queue size exceeds the window size.
         if (queue.Count > windowSize)
            queue.Dequeue();
         // Post the data in the queue to the source block when the queue size
         // equals the window size.
         if (queue.Count == windowSize)
            source.Post(queue.ToArray());
      });

      // When the target is set to the completed state, propagate out any
      // remaining data and set the source to the completed state.
      target.Completion.ContinueWith(delegate
      {
         if (queue.Count > 0 && queue.Count < windowSize)
            source.Post(queue.ToArray());
         source.Complete();
      });

      m_windowSize = windowSize;
      m_target = target;
      m_source = source;
   }

   // Retrieves the size of the window.
   public int WindowSize { get { return m_windowSize; } }

   #region IReceivableSourceBlock<TOutput> members

   // Attempts to synchronously receive an item from the source.
   public bool TryReceive(Predicate<T[]> filter, out T[] item)
   {
      return m_source.TryReceive(filter, out item);
   }

   // Attempts to remove all available elements from the source into a new 
   // array that is returned.
   public bool TryReceiveAll(out IList<T[]> items)
   {
      return m_source.TryReceiveAll(out items);
   }

   #endregion

   #region ISourceBlock<TOutput> members

   // Links this dataflow block to the provided target.
   public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
   {
      return m_source.LinkTo(target, linkOptions);
   }

   // Called by a target to reserve a message previously offered by a source 
   // but not yet consumed by this target.
   bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, 
      ITargetBlock<T[]> target)
   {
      return m_source.ReserveMessage(messageHeader, target);
   }

   // Called by a target to consume a previously offered message from a source.
   T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, 
      ITargetBlock<T[]> target, out bool messageConsumed)
   {
      return m_source.ConsumeMessage(messageHeader, 
         target, out messageConsumed);
   }

   // Called by a target to release a previously reserved message from a source.
   void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, 
      ITargetBlock<T[]> target)
   {
      m_source.ReleaseReservation(messageHeader, target);
   }

   #endregion

   #region ITargetBlock<TInput> members

   // Asynchronously passes a message to the target block, giving the target the 
   // opportunity to consume the message.
   DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, 
      T messageValue, ISourceBlock<T> source, bool consumeToAccept)
   {
      return m_target.OfferMessage(messageHeader, 
         messageValue, source, consumeToAccept);
   }

   #endregion

   #region IDataflowBlock members

   // Gets a Task that represents the completion of this dataflow block.
   public Task Completion { get { return m_source.Completion; } }

   // Signals to this target block that it should not accept any more messages, 
   // nor consume postponed messages. 
   public void Complete()
   {
      m_target.Complete();
   }

   public void Fault(Exception error)
   {
      m_target.Fault(error);
   }

   #endregion
}
    ' Propagates data in a sliding window fashion.
    Public Class SlidingWindowBlock(Of T)
        Implements IPropagatorBlock(Of T, T()), IReceivableSourceBlock(Of T())
        ' The size of the window.
        Private ReadOnly m_windowSize As Integer
        ' The target part of the block.
        Private ReadOnly m_target As ITargetBlock(Of T)
        ' The source part of the block.
        Private ReadOnly m_source As IReceivableSourceBlock(Of T())

        ' Constructs a SlidingWindowBlock object.
        Public Sub New(ByVal windowSize As Integer)
            ' Create a queue to hold messages.
            Dim queue = New Queue(Of T)()

            ' The source part of the propagator holds arrays of size windowSize
            ' and propagates data out to any connected targets.
            Dim source = New BufferBlock(Of T())()

            ' The target part receives data and adds them to the queue.
            Dim target = New ActionBlock(Of T)(Sub(item)
                ' Add the item to the queue.
                ' Remove the oldest item when the queue size exceeds the window size.
                ' Post the data in the queue to the source block when the queue size
                ' equals the window size.
                queue.Enqueue(item)
                If queue.Count > windowSize Then
                    queue.Dequeue()
                End If
                If queue.Count = windowSize Then
                    source.Post(queue.ToArray())
                End If
            End Sub)

            ' When the target is set to the completed state, propagate out any
            ' remaining data and set the source to the completed state.
            target.Completion.ContinueWith(Sub()
                If queue.Count > 0 AndAlso queue.Count < windowSize Then
                    source.Post(queue.ToArray())
                End If
                source.Complete()
            End Sub)

            m_windowSize = windowSize
            m_target = target
            m_source = source
        End Sub

        ' Retrieves the size of the window.
        Public ReadOnly Property WindowSize() As Integer
            Get
                Return m_windowSize
            End Get
        End Property

        '#Region "IReceivableSourceBlock<TOutput> members"

        ' Attempts to synchronously receive an item from the source.
        Public Function TryReceive(ByVal filter As Predicate(Of T()), <System.Runtime.InteropServices.Out()> ByRef item() As T) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceive
            Return m_source.TryReceive(filter, item)
        End Function

        ' Attempts to remove all available elements from the source into a new 
        ' array that is returned.
        Public Function TryReceiveAll(<System.Runtime.InteropServices.Out()> ByRef items As IList(Of T())) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceiveAll
            Return m_source.TryReceiveAll(items)
        End Function

        '#End Region

#Region "ISourceBlock<TOutput> members"

        ' Links this dataflow block to the provided target.
        Public Function LinkTo(ByVal target As ITargetBlock(Of T()), ByVal linkOptions As DataflowLinkOptions) As IDisposable Implements ISourceBlock(Of T()).LinkTo
            Return m_source.LinkTo(target, linkOptions)
        End Function

        ' Called by a target to reserve a message previously offered by a source 
        ' but not yet consumed by this target.
        Private Function ReserveMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) As Boolean Implements ISourceBlock(Of T()).ReserveMessage
            Return m_source.ReserveMessage(messageHeader, target)
        End Function

        ' Called by a target to consume a previously offered message from a source.
        Private Function ConsumeMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T()), ByRef messageConsumed As Boolean) As T() Implements ISourceBlock(Of T()).ConsumeMessage
            Return m_source.ConsumeMessage(messageHeader, target, messageConsumed)
        End Function

        ' Called by a target to release a previously reserved message from a source.
        Private Sub ReleaseReservation(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) Implements ISourceBlock(Of T()).ReleaseReservation
            m_source.ReleaseReservation(messageHeader, target)
        End Sub

#End Region

#Region "ITargetBlock<TInput> members"

        ' Asynchronously passes a message to the target block, giving the target the 
        ' opportunity to consume the message.
        Private Function OfferMessage(ByVal messageHeader As DataflowMessageHeader, ByVal messageValue As T, ByVal source As ISourceBlock(Of T), ByVal consumeToAccept As Boolean) As DataflowMessageStatus Implements ITargetBlock(Of T).OfferMessage
            Return m_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
        End Function

#End Region

#Region "IDataflowBlock members"

        ' Gets a Task that represents the completion of this dataflow block.
        Public ReadOnly Property Completion() As Task Implements IDataflowBlock.Completion
            Get
                Return m_source.Completion
            End Get
        End Property

        ' Signals to this target block that it should not accept any more messages, 
        ' nor consume postponed messages. 
        Public Sub Complete() Implements IDataflowBlock.Complete
            m_target.Complete()
        End Sub

        Public Sub Fault(ByVal [error] As Exception) Implements IDataflowBlock.Fault
            m_target.Fault([error])
        End Sub

#End Region
    End Class

Esempio completoThe Complete Example

Nell'esempio riportato di seguito viene illustrato il codice completo per questa procedura guidata.The following example shows the complete code for this walkthrough. Viene anche illustrato come usare entrambi i blocchi di finestre temporali scorrevoli in un metodo che scrive nel blocco, legge da questo e stampa i risultati nella console.It also demonstrates how to use the both sliding window blocks in a method that writes to the block, reads from it, and prints the results to the console.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a custom dataflow block type.
class Program
{
   // Creates a IPropagatorBlock<T, T[]> object propagates data in a 
   // sliding window fashion.
   public static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
   {
      // Create a queue to hold messages.
      var queue = new Queue<T>();

      // The source part of the propagator holds arrays of size windowSize
      // and propagates data out to any connected targets.
      var source = new BufferBlock<T[]>();

      // The target part receives data and adds them to the queue.
      var target = new ActionBlock<T>(item =>
      {
         // Add the item to the queue.
         queue.Enqueue(item);
         // Remove the oldest item when the queue size exceeds the window size.
         if (queue.Count > windowSize) 
            queue.Dequeue();
         // Post the data in the queue to the source block when the queue size
         // equals the window size.
         if (queue.Count == windowSize) 
            source.Post(queue.ToArray());
      });

      // When the target is set to the completed state, propagate out any
      // remaining data and set the source to the completed state.
      target.Completion.ContinueWith(delegate
      {
         if (queue.Count > 0 && queue.Count < windowSize) 
            source.Post(queue.ToArray());
         source.Complete();
      });

      // Return a IPropagatorBlock<T, T[]> object that encapsulates the 
      // target and source blocks.
      return DataflowBlock.Encapsulate(target, source);
   }

   // Propagates data in a sliding window fashion.
   public class SlidingWindowBlock<T> : IPropagatorBlock<T, T[]>, 
                                        IReceivableSourceBlock<T[]>
   {
      // The size of the window.
      private readonly int m_windowSize;
      // The target part of the block.
      private readonly ITargetBlock<T> m_target;
      // The source part of the block.
      private readonly IReceivableSourceBlock<T[]> m_source;

      // Constructs a SlidingWindowBlock object.
      public SlidingWindowBlock(int windowSize)
      {
         // Create a queue to hold messages.
         var queue = new Queue<T>();

         // The source part of the propagator holds arrays of size windowSize
         // and propagates data out to any connected targets.
         var source = new BufferBlock<T[]>();

         // The target part receives data and adds them to the queue.
         var target = new ActionBlock<T>(item =>
         {
            // Add the item to the queue.
            queue.Enqueue(item);
            // Remove the oldest item when the queue size exceeds the window size.
            if (queue.Count > windowSize)
               queue.Dequeue();
            // Post the data in the queue to the source block when the queue size
            // equals the window size.
            if (queue.Count == windowSize)
               source.Post(queue.ToArray());
         });

         // When the target is set to the completed state, propagate out any
         // remaining data and set the source to the completed state.
         target.Completion.ContinueWith(delegate
         {
            if (queue.Count > 0 && queue.Count < windowSize)
               source.Post(queue.ToArray());
            source.Complete();
         });

         m_windowSize = windowSize;
         m_target = target;
         m_source = source;
      }

      // Retrieves the size of the window.
      public int WindowSize { get { return m_windowSize; } }

      #region IReceivableSourceBlock<TOutput> members

      // Attempts to synchronously receive an item from the source.
      public bool TryReceive(Predicate<T[]> filter, out T[] item)
      {
         return m_source.TryReceive(filter, out item);
      }

      // Attempts to remove all available elements from the source into a new 
      // array that is returned.
      public bool TryReceiveAll(out IList<T[]> items)
      {
         return m_source.TryReceiveAll(out items);
      }

      #endregion

      #region ISourceBlock<TOutput> members

      // Links this dataflow block to the provided target.
      public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
      {
         return m_source.LinkTo(target, linkOptions);
      }

      // Called by a target to reserve a message previously offered by a source 
      // but not yet consumed by this target.
      bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, 
         ITargetBlock<T[]> target)
      {
         return m_source.ReserveMessage(messageHeader, target);
      }

      // Called by a target to consume a previously offered message from a source.
      T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, 
         ITargetBlock<T[]> target, out bool messageConsumed)
      {
         return m_source.ConsumeMessage(messageHeader, 
            target, out messageConsumed);
      }

      // Called by a target to release a previously reserved message from a source.
      void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, 
         ITargetBlock<T[]> target)
      {
         m_source.ReleaseReservation(messageHeader, target);
      }

      #endregion

      #region ITargetBlock<TInput> members

      // Asynchronously passes a message to the target block, giving the target the 
      // opportunity to consume the message.
      DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, 
         T messageValue, ISourceBlock<T> source, bool consumeToAccept)
      {
         return m_target.OfferMessage(messageHeader, 
            messageValue, source, consumeToAccept);
      }

      #endregion

      #region IDataflowBlock members

      // Gets a Task that represents the completion of this dataflow block.
      public Task Completion { get { return m_source.Completion; } }

      // Signals to this target block that it should not accept any more messages, 
      // nor consume postponed messages. 
      public void Complete()
      {
         m_target.Complete();
      }

      public void Fault(Exception error)
      {
         m_target.Fault(error);
      }

      #endregion
   }

   // Demonstrates usage of the sliding window block by sending the provided
   // values to the provided propagator block and printing the output of 
   // that block to the console.
   static void DemonstrateSlidingWindow<T>(IPropagatorBlock<T, T[]> slidingWindow,
      IEnumerable<T> values)
   {
      // Create an action block that prints arrays of data to the console.
      string windowComma = string.Empty;
      var printWindow = new ActionBlock<T[]>(window =>
      {
         Console.Write(windowComma);
         Console.Write("{");

         string comma = string.Empty;
         foreach (T item in window)
         {
            Console.Write(comma);
            Console.Write(item);
            comma = ",";
         }
         Console.Write("}");

         windowComma = ", ";
      });

      // Link the printer block to the sliding window block.
      slidingWindow.LinkTo(printWindow);

      // Set the printer block to the completed state when the sliding window
      // block completes.
      slidingWindow.Completion.ContinueWith(delegate { printWindow.Complete(); });

      // Print an additional newline to the console when the printer block completes.
      var completion = printWindow.Completion.ContinueWith(delegate { Console.WriteLine(); });

      // Post the provided values to the sliding window block and then wait
      // for the sliding window block to complete.
      foreach (T value in values)
      {
         slidingWindow.Post(value);
      }
      slidingWindow.Complete();

      // Wait for the printer to complete and perform its final action.
      completion.Wait();
   }

   static void Main(string[] args)
   {

      Console.Write("Using the DataflowBlockExtensions.Encapsulate method "); 
      Console.WriteLine("(T=int, windowSize=3):");
      DemonstrateSlidingWindow(CreateSlidingWindow<int>(3), Enumerable.Range(0, 10));

      Console.WriteLine();

      var slidingWindow = new SlidingWindowBlock<char>(4);

      Console.Write("Using SlidingWindowBlock<T> ");
      Console.WriteLine("(T=char, windowSize={0}):", slidingWindow.WindowSize);      
      DemonstrateSlidingWindow(slidingWindow, from n in Enumerable.Range(65, 10)
                                              select (char)n);
   }
}

/* Output:
Using the DataflowBlockExtensions.Encapsulate method (T=int, windowSize=3):
{0,1,2}, {1,2,3}, {2,3,4}, {3,4,5}, {4,5,6}, {5,6,7}, {6,7,8}, {7,8,9}

Using SlidingWindowBlock<T> (T=char, windowSize=4):
{A,B,C,D}, {B,C,D,E}, {C,D,E,F}, {D,E,F,G}, {E,F,G,H}, {F,G,H,I}, {G,H,I,J}
 */
Imports System
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a custom dataflow block type.
Friend Class Program
    ' Creates a IPropagatorBlock<T, T[]> object propagates data in a 
    ' sliding window fashion.
    Public Shared Function CreateSlidingWindow(Of T)(ByVal windowSize As Integer) As IPropagatorBlock(Of T, T())
        ' Create a queue to hold messages.
        Dim queue = New Queue(Of T)()

        ' The source part of the propagator holds arrays of size windowSize
        ' and propagates data out to any connected targets.
        Dim source = New BufferBlock(Of T())()

        ' The target part receives data and adds them to the queue.
        Dim target = New ActionBlock(Of T)(Sub(item)
            ' Add the item to the queue.
            ' Remove the oldest item when the queue size exceeds the window size.
            ' Post the data in the queue to the source block when the queue size
            ' equals the window size.
            queue.Enqueue(item)
            If queue.Count > windowSize Then
                queue.Dequeue()
            End If
            If queue.Count = windowSize Then
                source.Post(queue.ToArray())
            End If
        End Sub)

        ' When the target is set to the completed state, propagate out any
        ' remaining data and set the source to the completed state.
        target.Completion.ContinueWith(Sub()
            If queue.Count > 0 AndAlso queue.Count < windowSize Then
                source.Post(queue.ToArray())
            End If
            source.Complete()
        End Sub)

        ' Return a IPropagatorBlock<T, T[]> object that encapsulates the 
        ' target and source blocks.
        Return DataflowBlock.Encapsulate(target, source)
    End Function

    ' Propagates data in a sliding window fashion.
    Public Class SlidingWindowBlock(Of T)
        Implements IPropagatorBlock(Of T, T()), IReceivableSourceBlock(Of T())
        ' The size of the window.
        Private ReadOnly m_windowSize As Integer
        ' The target part of the block.
        Private ReadOnly m_target As ITargetBlock(Of T)
        ' The source part of the block.
        Private ReadOnly m_source As IReceivableSourceBlock(Of T())

        ' Constructs a SlidingWindowBlock object.
        Public Sub New(ByVal windowSize As Integer)
            ' Create a queue to hold messages.
            Dim queue = New Queue(Of T)()

            ' The source part of the propagator holds arrays of size windowSize
            ' and propagates data out to any connected targets.
            Dim source = New BufferBlock(Of T())()

            ' The target part receives data and adds them to the queue.
            Dim target = New ActionBlock(Of T)(Sub(item)
                ' Add the item to the queue.
                ' Remove the oldest item when the queue size exceeds the window size.
                ' Post the data in the queue to the source block when the queue size
                ' equals the window size.
                queue.Enqueue(item)
                If queue.Count > windowSize Then
                    queue.Dequeue()
                End If
                If queue.Count = windowSize Then
                    source.Post(queue.ToArray())
                End If
            End Sub)

            ' When the target is set to the completed state, propagate out any
            ' remaining data and set the source to the completed state.
            target.Completion.ContinueWith(Sub()
                If queue.Count > 0 AndAlso queue.Count < windowSize Then
                    source.Post(queue.ToArray())
                End If
                source.Complete()
            End Sub)

            m_windowSize = windowSize
            m_target = target
            m_source = source
        End Sub

        ' Retrieves the size of the window.
        Public ReadOnly Property WindowSize() As Integer
            Get
                Return m_windowSize
            End Get
        End Property

        '#Region "IReceivableSourceBlock<TOutput> members"

        ' Attempts to synchronously receive an item from the source.
        Public Function TryReceive(ByVal filter As Predicate(Of T()), <System.Runtime.InteropServices.Out()> ByRef item() As T) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceive
            Return m_source.TryReceive(filter, item)
        End Function

        ' Attempts to remove all available elements from the source into a new 
        ' array that is returned.
        Public Function TryReceiveAll(<System.Runtime.InteropServices.Out()> ByRef items As IList(Of T())) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceiveAll
            Return m_source.TryReceiveAll(items)
        End Function

        '#End Region

#Region "ISourceBlock<TOutput> members"

        ' Links this dataflow block to the provided target.
        Public Function LinkTo(ByVal target As ITargetBlock(Of T()), ByVal linkOptions As DataflowLinkOptions) As IDisposable Implements ISourceBlock(Of T()).LinkTo
            Return m_source.LinkTo(target, linkOptions)
        End Function

        ' Called by a target to reserve a message previously offered by a source 
        ' but not yet consumed by this target.
        Private Function ReserveMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) As Boolean Implements ISourceBlock(Of T()).ReserveMessage
            Return m_source.ReserveMessage(messageHeader, target)
        End Function

        ' Called by a target to consume a previously offered message from a source.
        Private Function ConsumeMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T()), ByRef messageConsumed As Boolean) As T() Implements ISourceBlock(Of T()).ConsumeMessage
            Return m_source.ConsumeMessage(messageHeader, target, messageConsumed)
        End Function

        ' Called by a target to release a previously reserved message from a source.
        Private Sub ReleaseReservation(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) Implements ISourceBlock(Of T()).ReleaseReservation
            m_source.ReleaseReservation(messageHeader, target)
        End Sub

#End Region

#Region "ITargetBlock<TInput> members"

        ' Asynchronously passes a message to the target block, giving the target the 
        ' opportunity to consume the message.
        Private Function OfferMessage(ByVal messageHeader As DataflowMessageHeader, ByVal messageValue As T, ByVal source As ISourceBlock(Of T), ByVal consumeToAccept As Boolean) As DataflowMessageStatus Implements ITargetBlock(Of T).OfferMessage
            Return m_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
        End Function

#End Region

#Region "IDataflowBlock members"

        ' Gets a Task that represents the completion of this dataflow block.
        Public ReadOnly Property Completion() As Task Implements IDataflowBlock.Completion
            Get
                Return m_source.Completion
            End Get
        End Property

        ' Signals to this target block that it should not accept any more messages, 
        ' nor consume postponed messages. 
        Public Sub Complete() Implements IDataflowBlock.Complete
            m_target.Complete()
        End Sub

        Public Sub Fault(ByVal [error] As Exception) Implements IDataflowBlock.Fault
            m_target.Fault([error])
        End Sub

#End Region
    End Class

    ' Demonstrates usage of the sliding window block by sending the provided
    ' values to the provided propagator block and printing the output of 
    ' that block to the console.
    Private Shared Sub DemonstrateSlidingWindow(Of T)(ByVal slidingWindow As IPropagatorBlock(Of T, T()), ByVal values As IEnumerable(Of T))
        ' Create an action block that prints arrays of data to the console.
        Dim windowComma As String = String.Empty
        Dim printWindow = New ActionBlock(Of T())(Sub(window)
            Console.Write(windowComma)
            Console.Write("{")
            Dim comma As String = String.Empty
            For Each item As T In window
                Console.Write(comma)
                Console.Write(item)
                comma = ","
            Next item
            Console.Write("}")
            windowComma = ", "
        End Sub)

        ' Link the printer block to the sliding window block.
        slidingWindow.LinkTo(printWindow)

        ' Set the printer block to the completed state when the sliding window
        ' block completes.
        slidingWindow.Completion.ContinueWith(Sub() printWindow.Complete())

        ' Print an additional newline to the console when the printer block completes.
        Dim completion = printWindow.Completion.ContinueWith(Sub() Console.WriteLine())

        ' Post the provided values to the sliding window block and then wait
        ' for the sliding window block to complete.
        For Each value As T In values
            slidingWindow.Post(value)
        Next value
        slidingWindow.Complete()

        ' Wait for the printer to complete and perform its final action.
        completion.Wait()
    End Sub

    Shared Sub Main(ByVal args() As String)

        Console.Write("Using the DataflowBlockExtensions.Encapsulate method ")
        Console.WriteLine("(T=int, windowSize=3):")
        DemonstrateSlidingWindow(CreateSlidingWindow(Of Integer)(3), Enumerable.Range(0, 10))

        Console.WriteLine()

        Dim slidingWindow = New SlidingWindowBlock(Of Char)(4)

        Console.Write("Using SlidingWindowBlock<T> ")
        Console.WriteLine("(T=char, windowSize={0}):", slidingWindow.WindowSize)
        DemonstrateSlidingWindow(slidingWindow, _
            From n In Enumerable.Range(65, 10) _
            Select ChrW(n))
    End Sub
End Class

' Output:
'Using the DataflowBlockExtensions.Encapsulate method (T=int, windowSize=3):
'{0,1,2}, {1,2,3}, {2,3,4}, {3,4,5}, {4,5,6}, {5,6,7}, {6,7,8}, {7,8,9}
'
'Using SlidingWindowBlock<T> (T=char, windowSize=4):
'{A,B,C,D}, {B,C,D,E}, {C,D,E,F}, {D,E,F,G}, {E,F,G,H}, {F,G,H,I}, {G,H,I,J}
' 

Compilazione del codiceCompiling the Code

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollarlo in un file denominato SlidingWindowBlock.cs (SlidingWindowBlock.vb per Visual Basic) e quindi eseguire il comando riportato di seguito in una finestra del prompt dei comandi di Visual Studio.Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named SlidingWindowBlock.cs (SlidingWindowBlock.vb for Visual Basic) and then run the following command in a Visual Studio Command Prompt window.

Visual C#Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll SlidingWindowBlock.cscsc.exe /r:System.Threading.Tasks.Dataflow.dll SlidingWindowBlock.cs

Visual BasicVisual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll SlidingWindowBlock.vbvbc.exe /r:System.Threading.Tasks.Dataflow.dll SlidingWindowBlock.vb

Vedere ancheSee Also

Flusso di datiDataflow