İzlenecek yol: Özel bir Veri Akışı Blok Türü Oluşturma

TPL Veri Akışı Kitaplığı çeşitli işlevleri etkinleştiren çeşitli veri akışı blok türleri sağlasa da, özel blok türleri de oluşturabilirsiniz. Bu belgede, özel davranış uygulayan bir veri akışı blok türünün nasıl oluşturulacağı açıklanmaktadır.


Bu belgeyi okumadan önce Veri Akışı'nı okuyun.


TPL Veri Akışı Kitaplığı ( System.Threading.Tasks.Dataflow ad alanı) .NET ile dağıtılmaz. Ad alanını System.Threading.Tasks.Dataflow Visual Studio'ya yüklemek için projenizi açın, Projemenüsünden NuGet Paketlerini Yönet'i seçin ve çevrimiçi ortamda paketi arayınSystem.Threading.Tasks.Dataflow. Alternatif olarak, .NET Core CLI kullanarak yüklemek için komutunu çalıştırın dotnet add package System.Threading.Tasks.Dataflow.

Kayan Pencere Veri Akışı Bloğunu Tanımlama

Giriş değerlerinin arabelleğe alınması ve ardından kayan pencere biçiminde çıkış yapılmasını gerektiren bir veri akışı uygulaması düşünün. Örneğin, {0, 1, 2, 3, 4, 5} giriş değerleri ve üç pencere boyutu için kayan pencere veri akışı bloğu {0, 1, 2}, {1, 2, 3}, {2, 3, 4} ve {3, 4, 5} çıkış dizilerini üretir. Aşağıdaki bölümlerde, bu özel davranışı uygulayan bir veri akışı blok türü oluşturmanın iki yolu açıklanmaktadır. İlk teknik, bir nesnenin Encapsulate ve bir nesnenin işlevselliğini tek bir ISourceBlock<TOutput>ITargetBlock<TInput> yayıcı bloğunda birleştirmek için yöntemini kullanır. İkinci teknik, özel davranış gerçekleştirmek için mevcut işlevlerden IPropagatorBlock<TInput,TOutput> türetilen ve bu işlevleri birleştiren bir sınıf tanımlar.

Kayan Pencere Veri Akışı Bloğunu Tanımlamak için Kapsülleme Yöntemini Kullanma

Aşağıdaki örnek, hedeften ve kaynaktan bir yayıcı bloğu oluşturmak için yöntemini kullanır Encapsulate . Yayıcı bloğu, kaynak bloğun ve hedef bloğun veri alıcısı ve göndereni olarak davranmasını sağlar.

Bu teknik, özel veri akışı işlevselliğine ihtiyaç duyduğunuzda, ancak ek yöntemler, özellikler veya alanlar sağlayan bir türe ihtiyacınız olmadığında kullanışlıdır.

// Creates a IPropagatorBlock<T, T[]> object propagates data in a
// sliding window fashion.
public static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
   // Create a queue to hold messages.
   var queue = new Queue<T>();

   // The source part of the propagator holds arrays of size windowSize
   // and propagates data out to any connected targets.
   var source = new BufferBlock<T[]>();

   // The target part receives data and adds them to the queue.
   var target = new ActionBlock<T>(item =>
      // Add the item to the queue.
      // Remove the oldest item when the queue size exceeds the window size.
      if (queue.Count > windowSize)
      // Post the data in the queue to the source block when the queue size
      // equals the window size.
      if (queue.Count == windowSize)

   // When the target is set to the completed state, propagate out any
   // remaining data and set the source to the completed state.
      if (queue.Count > 0 && queue.Count < windowSize)

   // Return a IPropagatorBlock<T, T[]> object that encapsulates the
   // target and source blocks.
   return DataflowBlock.Encapsulate(target, source);
' Creates a IPropagatorBlock<T, T[]> object propagates data in a 
' sliding window fashion.
Public Shared Function CreateSlidingWindow(Of T)(ByVal windowSize As Integer) As IPropagatorBlock(Of T, T())
    ' Create a queue to hold messages.
    Dim queue = New Queue(Of T)()

    ' The source part of the propagator holds arrays of size windowSize
    ' and propagates data out to any connected targets.
    Dim source = New BufferBlock(Of T())()

    ' The target part receives data and adds them to the queue.
    Dim target = New ActionBlock(Of T)(Sub(item)
                                           ' Add the item to the queue.
                                           ' Remove the oldest item when the queue size exceeds the window size.
                                           ' Post the data in the queue to the source block when the queue size
                                           ' equals the window size.
                                           If queue.Count > windowSize Then
                                           End If
                                           If queue.Count = windowSize Then
                                           End If
                                       End Sub)

    ' When the target is set to the completed state, propagate out any
    ' remaining data and set the source to the completed state.
                                       If queue.Count > 0 AndAlso queue.Count < windowSize Then
                                       End If
                                   End Sub)

    ' Return a IPropagatorBlock<T, T[]> object that encapsulates the 
    ' target and source blocks.
    Return DataflowBlock.Encapsulate(target, source)
End Function

Kayan Pencere Veri Akışı Bloğunu Tanımlamak için IPropagatorBlock'tan Türetme

Aşağıdaki örnekte sınıfı gösterilmektedir SlidingWindowBlock . Bu sınıf, hem kaynak hem de veri hedefi olarak hareket edebilmesi için öğesinden IPropagatorBlock<TInput,TOutput> türetilir. Önceki örnekte olduğu gibi sınıfı, SlidingWindowBlock mevcut veri akışı bloğu türleri üzerine kurulmuştur. Ancak, SlidingWindowBlock sınıfı , ITargetBlock<TInput>ve IDataflowBlock arabirimleri için ISourceBlock<TOutput>gerekli olan yöntemleri de uygular. Bu yöntemlerin tümü çalışmayı önceden tanımlanmış veri akışı blok türü üyelerine iletir. Örneğin, Post yöntem saptırıcıları aynı zamanda bir ITargetBlock<TInput> nesne olan veri üyesine m_target çalışır.

Bu teknik, özel veri akışı işlevselliğine ihtiyaç duyduğunuzda ve ayrıca ek yöntemler, özellikler veya alanlar sağlayan bir tür gerektirdiğinde yararlıdır. Örneğin, SlidingWindowBlock sınıfı ve yöntemlerini sağlayabilmesi TryReceiveTryReceiveAll için öğesinden IReceivableSourceBlock<TOutput> de türetilir. sınıfı SlidingWindowBlock , kayan penceredeki WindowSize öğelerin sayısını alan özelliğini sağlayarak genişletilebilirliği de gösterir.

// Propagates data in a sliding window fashion.
public class SlidingWindowBlock<T> : IPropagatorBlock<T, T[]>,
   // The size of the window.
   private readonly int m_windowSize;
   // The target part of the block.
   private readonly ITargetBlock<T> m_target;
   // The source part of the block.
   private readonly IReceivableSourceBlock<T[]> m_source;

   // Constructs a SlidingWindowBlock object.
   public SlidingWindowBlock(int windowSize)
      // Create a queue to hold messages.
      var queue = new Queue<T>();

      // The source part of the propagator holds arrays of size windowSize
      // and propagates data out to any connected targets.
      var source = new BufferBlock<T[]>();

      // The target part receives data and adds them to the queue.
      var target = new ActionBlock<T>(item =>
         // Add the item to the queue.
         // Remove the oldest item when the queue size exceeds the window size.
         if (queue.Count > windowSize)
         // Post the data in the queue to the source block when the queue size
         // equals the window size.
         if (queue.Count == windowSize)

      // When the target is set to the completed state, propagate out any
      // remaining data and set the source to the completed state.
         if (queue.Count > 0 && queue.Count < windowSize)

      m_windowSize = windowSize;
      m_target = target;
      m_source = source;

   // Retrieves the size of the window.
   public int WindowSize { get { return m_windowSize; } }

   #region IReceivableSourceBlock<TOutput> members

   // Attempts to synchronously receive an item from the source.
   public bool TryReceive(Predicate<T[]> filter, out T[] item)
      return m_source.TryReceive(filter, out item);

   // Attempts to remove all available elements from the source into a new
   // array that is returned.
   public bool TryReceiveAll(out IList<T[]> items)
      return m_source.TryReceiveAll(out items);


   #region ISourceBlock<TOutput> members

   // Links this dataflow block to the provided target.
   public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
      return m_source.LinkTo(target, linkOptions);

   // Called by a target to reserve a message previously offered by a source
   // but not yet consumed by this target.
   bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
      ITargetBlock<T[]> target)
      return m_source.ReserveMessage(messageHeader, target);

   // Called by a target to consume a previously offered message from a source.
   T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
      ITargetBlock<T[]> target, out bool messageConsumed)
      return m_source.ConsumeMessage(messageHeader,
         target, out messageConsumed);

   // Called by a target to release a previously reserved message from a source.
   void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
      ITargetBlock<T[]> target)
      m_source.ReleaseReservation(messageHeader, target);


   #region ITargetBlock<TInput> members

   // Asynchronously passes a message to the target block, giving the target the
   // opportunity to consume the message.
   DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
      T messageValue, ISourceBlock<T> source, bool consumeToAccept)
      return m_target.OfferMessage(messageHeader,
         messageValue, source, consumeToAccept);


   #region IDataflowBlock members

   // Gets a Task that represents the completion of this dataflow block.
   public Task Completion { get { return m_source.Completion; } }

   // Signals to this target block that it should not accept any more messages,
   // nor consume postponed messages.
   public void Complete()

   public void Fault(Exception error)

    ' Propagates data in a sliding window fashion.
    Public Class SlidingWindowBlock(Of T)
        Implements IPropagatorBlock(Of T, T()), IReceivableSourceBlock(Of T())
        ' The size of the window.
        Private ReadOnly m_windowSize As Integer
        ' The target part of the block.
        Private ReadOnly m_target As ITargetBlock(Of T)
        ' The source part of the block.
        Private ReadOnly m_source As IReceivableSourceBlock(Of T())

        ' Constructs a SlidingWindowBlock object.
        Public Sub New(ByVal windowSize As Integer)
            ' Create a queue to hold messages.
            Dim queue = New Queue(Of T)()

            ' The source part of the propagator holds arrays of size windowSize
            ' and propagates data out to any connected targets.
            Dim source = New BufferBlock(Of T())()

            ' The target part receives data and adds them to the queue.
            Dim target = New ActionBlock(Of T)(Sub(item)
                                                   ' Add the item to the queue.
                                                   ' Remove the oldest item when the queue size exceeds the window size.
                                                   ' Post the data in the queue to the source block when the queue size
                                                   ' equals the window size.
                                                   If queue.Count > windowSize Then
                                                   End If
                                                   If queue.Count = windowSize Then
                                                   End If
                                               End Sub)

            ' When the target is set to the completed state, propagate out any
            ' remaining data and set the source to the completed state.
                                               If queue.Count > 0 AndAlso queue.Count < windowSize Then
                                               End If
                                           End Sub)

            m_windowSize = windowSize
            m_target = target
            m_source = source
        End Sub

        ' Retrieves the size of the window.
        Public ReadOnly Property WindowSize() As Integer
                Return m_windowSize
            End Get
        End Property

        '#Region "IReceivableSourceBlock<TOutput> members"

        ' Attempts to synchronously receive an item from the source.
        Public Function TryReceive(ByVal filter As Predicate(Of T()), <System.Runtime.InteropServices.Out()> ByRef item() As T) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceive
            Return m_source.TryReceive(filter, item)
        End Function

        ' Attempts to remove all available elements from the source into a new 
        ' array that is returned.
        Public Function TryReceiveAll(<System.Runtime.InteropServices.Out()> ByRef items As IList(Of T())) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceiveAll
            Return m_source.TryReceiveAll(items)
        End Function

        '#End Region

#Region "ISourceBlock<TOutput> members"

        ' Links this dataflow block to the provided target.
        Public Function LinkTo(ByVal target As ITargetBlock(Of T()), ByVal linkOptions As DataflowLinkOptions) As IDisposable Implements ISourceBlock(Of T()).LinkTo
            Return m_source.LinkTo(target, linkOptions)
        End Function

        ' Called by a target to reserve a message previously offered by a source 
        ' but not yet consumed by this target.
        Private Function ReserveMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) As Boolean Implements ISourceBlock(Of T()).ReserveMessage
            Return m_source.ReserveMessage(messageHeader, target)
        End Function

        ' Called by a target to consume a previously offered message from a source.
        Private Function ConsumeMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T()), ByRef messageConsumed As Boolean) As T() Implements ISourceBlock(Of T()).ConsumeMessage
            Return m_source.ConsumeMessage(messageHeader, target, messageConsumed)
        End Function

        ' Called by a target to release a previously reserved message from a source.
        Private Sub ReleaseReservation(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) Implements ISourceBlock(Of T()).ReleaseReservation
            m_source.ReleaseReservation(messageHeader, target)
        End Sub

#End Region

#Region "ITargetBlock<TInput> members"

        ' Asynchronously passes a message to the target block, giving the target the 
        ' opportunity to consume the message.
        Private Function OfferMessage(ByVal messageHeader As DataflowMessageHeader, ByVal messageValue As T, ByVal source As ISourceBlock(Of T), ByVal consumeToAccept As Boolean) As DataflowMessageStatus Implements ITargetBlock(Of T).OfferMessage
            Return m_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
        End Function

#End Region

#Region "IDataflowBlock members"

        ' Gets a Task that represents the completion of this dataflow block.
        Public ReadOnly Property Completion() As Task Implements IDataflowBlock.Completion
                Return m_source.Completion
            End Get
        End Property

        ' Signals to this target block that it should not accept any more messages, 
        ' nor consume postponed messages. 
        Public Sub Complete() Implements IDataflowBlock.Complete
        End Sub

        Public Sub Fault(ByVal [error] As Exception) Implements IDataflowBlock.Fault
        End Sub

#End Region
    End Class

Tam Örnek

Aşağıdaki örnekte bu izlenecek yol için tam kod gösterilmektedir. Ayrıca, bloka yazan, ondan okuyan ve sonuçları konsola yazdıran bir yöntemde her iki kayan pencere bloğunun nasıl kullanılacağını gösterir.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a custom dataflow block type.
class Program
   // Creates a IPropagatorBlock<T, T[]> object propagates data in a
   // sliding window fashion.
   public static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
      // Create a queue to hold messages.
      var queue = new Queue<T>();

      // The source part of the propagator holds arrays of size windowSize
      // and propagates data out to any connected targets.
      var source = new BufferBlock<T[]>();

      // The target part receives data and adds them to the queue.
      var target = new ActionBlock<T>(item =>
         // Add the item to the queue.
         // Remove the oldest item when the queue size exceeds the window size.
         if (queue.Count > windowSize)
         // Post the data in the queue to the source block when the queue size
         // equals the window size.
         if (queue.Count == windowSize)

      // When the target is set to the completed state, propagate out any
      // remaining data and set the source to the completed state.
         if (queue.Count > 0 && queue.Count < windowSize)

      // Return a IPropagatorBlock<T, T[]> object that encapsulates the
      // target and source blocks.
      return DataflowBlock.Encapsulate(target, source);

   // Propagates data in a sliding window fashion.
   public class SlidingWindowBlock<T> : IPropagatorBlock<T, T[]>,
      // The size of the window.
      private readonly int m_windowSize;
      // The target part of the block.
      private readonly ITargetBlock<T> m_target;
      // The source part of the block.
      private readonly IReceivableSourceBlock<T[]> m_source;

      // Constructs a SlidingWindowBlock object.
      public SlidingWindowBlock(int windowSize)
         // Create a queue to hold messages.
         var queue = new Queue<T>();

         // The source part of the propagator holds arrays of size windowSize
         // and propagates data out to any connected targets.
         var source = new BufferBlock<T[]>();

         // The target part receives data and adds them to the queue.
         var target = new ActionBlock<T>(item =>
            // Add the item to the queue.
            // Remove the oldest item when the queue size exceeds the window size.
            if (queue.Count > windowSize)
            // Post the data in the queue to the source block when the queue size
            // equals the window size.
            if (queue.Count == windowSize)

         // When the target is set to the completed state, propagate out any
         // remaining data and set the source to the completed state.
            if (queue.Count > 0 && queue.Count < windowSize)

         m_windowSize = windowSize;
         m_target = target;
         m_source = source;

      // Retrieves the size of the window.
      public int WindowSize { get { return m_windowSize; } }

      #region IReceivableSourceBlock<TOutput> members

      // Attempts to synchronously receive an item from the source.
      public bool TryReceive(Predicate<T[]> filter, out T[] item)
         return m_source.TryReceive(filter, out item);

      // Attempts to remove all available elements from the source into a new
      // array that is returned.
      public bool TryReceiveAll(out IList<T[]> items)
         return m_source.TryReceiveAll(out items);


      #region ISourceBlock<TOutput> members

      // Links this dataflow block to the provided target.
      public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
         return m_source.LinkTo(target, linkOptions);

      // Called by a target to reserve a message previously offered by a source
      // but not yet consumed by this target.
      bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
         ITargetBlock<T[]> target)
         return m_source.ReserveMessage(messageHeader, target);

      // Called by a target to consume a previously offered message from a source.
      T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
         ITargetBlock<T[]> target, out bool messageConsumed)
         return m_source.ConsumeMessage(messageHeader,
            target, out messageConsumed);

      // Called by a target to release a previously reserved message from a source.
      void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
         ITargetBlock<T[]> target)
         m_source.ReleaseReservation(messageHeader, target);


      #region ITargetBlock<TInput> members

      // Asynchronously passes a message to the target block, giving the target the
      // opportunity to consume the message.
      DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
         T messageValue, ISourceBlock<T> source, bool consumeToAccept)
         return m_target.OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);


      #region IDataflowBlock members

      // Gets a Task that represents the completion of this dataflow block.
      public Task Completion { get { return m_source.Completion; } }

      // Signals to this target block that it should not accept any more messages,
      // nor consume postponed messages.
      public void Complete()

      public void Fault(Exception error)


   // Demonstrates usage of the sliding window block by sending the provided
   // values to the provided propagator block and printing the output of
   // that block to the console.
   static void DemonstrateSlidingWindow<T>(IPropagatorBlock<T, T[]> slidingWindow,
      IEnumerable<T> values)
      // Create an action block that prints arrays of data to the console.
      string windowComma = string.Empty;
      var printWindow = new ActionBlock<T[]>(window =>

         string comma = string.Empty;
         foreach (T item in window)
            comma = ",";

         windowComma = ", ";

      // Link the printer block to the sliding window block.

      // Set the printer block to the completed state when the sliding window
      // block completes.
      slidingWindow.Completion.ContinueWith(delegate { printWindow.Complete(); });

      // Print an additional newline to the console when the printer block completes.
      var completion = printWindow.Completion.ContinueWith(delegate { Console.WriteLine(); });

      // Post the provided values to the sliding window block and then wait
      // for the sliding window block to complete.
      foreach (T value in values)

      // Wait for the printer to complete and perform its final action.

   static void Main(string[] args)

      Console.Write("Using the DataflowBlockExtensions.Encapsulate method ");
      Console.WriteLine("(T=int, windowSize=3):");
      DemonstrateSlidingWindow(CreateSlidingWindow<int>(3), Enumerable.Range(0, 10));


      var slidingWindow = new SlidingWindowBlock<char>(4);

      Console.Write("Using SlidingWindowBlock<T> ");
      Console.WriteLine("(T=char, windowSize={0}):", slidingWindow.WindowSize);
      DemonstrateSlidingWindow(slidingWindow, from n in Enumerable.Range(65, 10)
                                              select (char)n);

/* Output:
Using the DataflowBlockExtensions.Encapsulate method (T=int, windowSize=3):
{0,1,2}, {1,2,3}, {2,3,4}, {3,4,5}, {4,5,6}, {5,6,7}, {6,7,8}, {7,8,9}

Using SlidingWindowBlock<T> (T=char, windowSize=4):
{A,B,C,D}, {B,C,D,E}, {C,D,E,F}, {D,E,F,G}, {E,F,G,H}, {F,G,H,I}, {G,H,I,J}
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a custom dataflow block type.
Friend Class Program
    ' Creates a IPropagatorBlock<T, T[]> object propagates data in a 
    ' sliding window fashion.
    Public Shared Function CreateSlidingWindow(Of T)(ByVal windowSize As Integer) As IPropagatorBlock(Of T, T())
        ' Create a queue to hold messages.
        Dim queue = New Queue(Of T)()

        ' The source part of the propagator holds arrays of size windowSize
        ' and propagates data out to any connected targets.
        Dim source = New BufferBlock(Of T())()

        ' The target part receives data and adds them to the queue.
        Dim target = New ActionBlock(Of T)(Sub(item)
                                               ' Add the item to the queue.
                                               ' Remove the oldest item when the queue size exceeds the window size.
                                               ' Post the data in the queue to the source block when the queue size
                                               ' equals the window size.
                                               If queue.Count > windowSize Then
                                               End If
                                               If queue.Count = windowSize Then
                                               End If
                                           End Sub)

        ' When the target is set to the completed state, propagate out any
        ' remaining data and set the source to the completed state.
                                           If queue.Count > 0 AndAlso queue.Count < windowSize Then
                                           End If
                                       End Sub)

        ' Return a IPropagatorBlock<T, T[]> object that encapsulates the 
        ' target and source blocks.
        Return DataflowBlock.Encapsulate(target, source)
    End Function

    ' Propagates data in a sliding window fashion.
    Public Class SlidingWindowBlock(Of T)
        Implements IPropagatorBlock(Of T, T()), IReceivableSourceBlock(Of T())
        ' The size of the window.
        Private ReadOnly m_windowSize As Integer
        ' The target part of the block.
        Private ReadOnly m_target As ITargetBlock(Of T)
        ' The source part of the block.
        Private ReadOnly m_source As IReceivableSourceBlock(Of T())

        ' Constructs a SlidingWindowBlock object.
        Public Sub New(ByVal windowSize As Integer)
            ' Create a queue to hold messages.
            Dim queue = New Queue(Of T)()

            ' The source part of the propagator holds arrays of size windowSize
            ' and propagates data out to any connected targets.
            Dim source = New BufferBlock(Of T())()

            ' The target part receives data and adds them to the queue.
            Dim target = New ActionBlock(Of T)(Sub(item)
                                                   ' Add the item to the queue.
                                                   ' Remove the oldest item when the queue size exceeds the window size.
                                                   ' Post the data in the queue to the source block when the queue size
                                                   ' equals the window size.
                                                   If queue.Count > windowSize Then
                                                   End If
                                                   If queue.Count = windowSize Then
                                                   End If
                                               End Sub)

            ' When the target is set to the completed state, propagate out any
            ' remaining data and set the source to the completed state.
                                               If queue.Count > 0 AndAlso queue.Count < windowSize Then
                                               End If
                                           End Sub)

            m_windowSize = windowSize
            m_target = target
            m_source = source
        End Sub

        ' Retrieves the size of the window.
        Public ReadOnly Property WindowSize() As Integer
                Return m_windowSize
            End Get
        End Property

        '#Region "IReceivableSourceBlock<TOutput> members"

        ' Attempts to synchronously receive an item from the source.
        Public Function TryReceive(ByVal filter As Predicate(Of T()), <System.Runtime.InteropServices.Out()> ByRef item() As T) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceive
            Return m_source.TryReceive(filter, item)
        End Function

        ' Attempts to remove all available elements from the source into a new 
        ' array that is returned.
        Public Function TryReceiveAll(<System.Runtime.InteropServices.Out()> ByRef items As IList(Of T())) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceiveAll
            Return m_source.TryReceiveAll(items)
        End Function

        '#End Region

#Region "ISourceBlock<TOutput> members"

        ' Links this dataflow block to the provided target.
        Public Function LinkTo(ByVal target As ITargetBlock(Of T()), ByVal linkOptions As DataflowLinkOptions) As IDisposable Implements ISourceBlock(Of T()).LinkTo
            Return m_source.LinkTo(target, linkOptions)
        End Function

        ' Called by a target to reserve a message previously offered by a source 
        ' but not yet consumed by this target.
        Private Function ReserveMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) As Boolean Implements ISourceBlock(Of T()).ReserveMessage
            Return m_source.ReserveMessage(messageHeader, target)
        End Function

        ' Called by a target to consume a previously offered message from a source.
        Private Function ConsumeMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T()), ByRef messageConsumed As Boolean) As T() Implements ISourceBlock(Of T()).ConsumeMessage
            Return m_source.ConsumeMessage(messageHeader, target, messageConsumed)
        End Function

        ' Called by a target to release a previously reserved message from a source.
        Private Sub ReleaseReservation(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) Implements ISourceBlock(Of T()).ReleaseReservation
            m_source.ReleaseReservation(messageHeader, target)
        End Sub

#End Region

#Region "ITargetBlock<TInput> members"

        ' Asynchronously passes a message to the target block, giving the target the 
        ' opportunity to consume the message.
        Private Function OfferMessage(ByVal messageHeader As DataflowMessageHeader, ByVal messageValue As T, ByVal source As ISourceBlock(Of T), ByVal consumeToAccept As Boolean) As DataflowMessageStatus Implements ITargetBlock(Of T).OfferMessage
            Return m_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
        End Function

#End Region

#Region "IDataflowBlock members"

        ' Gets a Task that represents the completion of this dataflow block.
        Public ReadOnly Property Completion() As Task Implements IDataflowBlock.Completion
                Return m_source.Completion
            End Get
        End Property

        ' Signals to this target block that it should not accept any more messages, 
        ' nor consume postponed messages. 
        Public Sub Complete() Implements IDataflowBlock.Complete
        End Sub

        Public Sub Fault(ByVal [error] As Exception) Implements IDataflowBlock.Fault
        End Sub

#End Region
    End Class

    ' Demonstrates usage of the sliding window block by sending the provided
    ' values to the provided propagator block and printing the output of 
    ' that block to the console.
    Private Shared Sub DemonstrateSlidingWindow(Of T)(ByVal slidingWindow As IPropagatorBlock(Of T, T()), ByVal values As IEnumerable(Of T))
        ' Create an action block that prints arrays of data to the console.
        Dim windowComma As String = String.Empty
        Dim printWindow = New ActionBlock(Of T())(Sub(window)
                                                      Dim comma As String = String.Empty
                                                      For Each item As T In window
                                                          comma = ","
                                                      Next item
                                                      windowComma = ", "
                                                  End Sub)

        ' Link the printer block to the sliding window block.

        ' Set the printer block to the completed state when the sliding window
        ' block completes.
        slidingWindow.Completion.ContinueWith(Sub() printWindow.Complete())

        ' Print an additional newline to the console when the printer block completes.
        Dim completion = printWindow.Completion.ContinueWith(Sub() Console.WriteLine())

        ' Post the provided values to the sliding window block and then wait
        ' for the sliding window block to complete.
        For Each value As T In values
        Next value

        ' Wait for the printer to complete and perform its final action.
    End Sub

    Shared Sub Main(ByVal args() As String)

        Console.Write("Using the DataflowBlockExtensions.Encapsulate method ")
        Console.WriteLine("(T=int, windowSize=3):")
        DemonstrateSlidingWindow(CreateSlidingWindow(Of Integer)(3), Enumerable.Range(0, 10))


        Dim slidingWindow = New SlidingWindowBlock(Of Char)(4)

        Console.Write("Using SlidingWindowBlock<T> ")
        Console.WriteLine("(T=char, windowSize={0}):", slidingWindow.WindowSize)
        DemonstrateSlidingWindow(slidingWindow, _
            From n In Enumerable.Range(65, 10) _
            Select ChrW(n))
    End Sub
End Class

' Output:
'Using the DataflowBlockExtensions.Encapsulate method (T=int, windowSize=3):
'{0,1,2}, {1,2,3}, {2,3,4}, {3,4,5}, {4,5,6}, {5,6,7}, {6,7,8}, {7,8,9}
'Using SlidingWindowBlock<T> (T=char, windowSize=4):
'{A,B,C,D}, {B,C,D,E}, {C,D,E,F}, {D,E,F,G}, {E,F,G,H}, {F,G,H,I}, {G,H,I,J}

