Procedimiento para usar ForEach para quitar elementos de BlockingCollectionHow to: Use ForEach to Remove Items in a BlockingCollection

Además de obtener los elementos de BlockingCollection<T> mediante los métodos Take y TryTake, también puede usar foreach (For Each en Visual Basic) para quitar elementos hasta que se complete la adición y se vacíe la colección.In addition to taking items from a BlockingCollection<T> by using the Take and TryTake method, you can also use a foreach (For Each in Visual Basic) to remove items until adding is completed and the collection is empty. Esto se denomina una enumeración de mutación o enumeración de consumo porque, a diferencia de un bucle foreach (For Each) típico, este enumerador modifica la colección de origen mediante la eliminación de elementos.This is called a mutating enumeration or consuming enumeration because, unlike a typical foreach (For Each) loop, this enumerator modifies the source collection by removing items.

EjemploExample

En el ejemplo siguiente se muestra cómo se quitan todos los elementos de BlockingCollection<T> mediante un bucle foreach (For Each).The following example shows how to remove all the items in a BlockingCollection<T> by using a foreach (For Each) loop.

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

class Example
{
   // Limit the collection size to 2000 items at any given time.
   // Set itemsToProduce to > 500 to hit the limit.
   const int upperLimit = 1000;

   // Adjust this number to see how it impacts the producing-consuming pattern.
   const int itemsToProduce = 100;

   static BlockingCollection<long> collection = new BlockingCollection<long>(upperLimit);

   // Variables for diagnostic output only.
   static Stopwatch sw = new Stopwatch();
   static int totalAdditions = 0;

   // Counter for synchronizing producers.
   static int producersStillRunning = 2;

   static void Main()
   {
       // Start the stopwatch.
       sw.Start();

       // Queue the Producer threads. Store in an array
       // for use with ContinueWhenAll
       Task[] producers = new Task[2];
       producers[0] = Task.Run(() => RunProducer("A", 0));
       producers[1] = Task.Run(() => RunProducer("B", itemsToProduce));

       // Create a cleanup task that will call CompleteAdding after
       // all producers are done adding items.
       Task cleanup = Task.Factory.ContinueWhenAll(producers, (p) => collection.CompleteAdding());

       // Queue the Consumer thread. Put this call
       // before Parallel.Invoke to begin consuming as soon as
       // the producers add items.
       Task.Run(() => RunConsumer());

       // Keep the console window open while the
       // consumer thread completes its output.
       Console.ReadKey(true);
   }

   static void RunProducer(string ID, int start)
   {

       int additions = 0;
       for (int i = start; i < start + itemsToProduce; i++)
       {
           // The data that is added to the collection.
           long ticks = sw.ElapsedTicks;

           // Display additions and subtractions.
           Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i);

           if(!collection.IsAddingCompleted)
               collection.Add(ticks);

           // Counter for demonstration purposes only.
           additions++;

           // Uncomment this line to
           // slow down the producer threads     ing.
           Thread.SpinWait(100000);
       }

       Interlocked.Add(ref totalAdditions, additions);
       Console.WriteLine("{0} is done adding: {1} items", ID, additions);
   }

   static void RunConsumer()
   {
       // GetConsumingEnumerable returns the enumerator for the
       // underlying collection.
       int subtractions = 0;
       foreach (var item in collection.GetConsumingEnumerable())
       {
           Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
                   item.ToString("D18"), subtractions++, collection.Count);
       }

       Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
                           totalAdditions, subtractions, collection.Count);
       sw.Stop();

       Console.WriteLine("Press any key to exit");
   }
}
Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent


Module EnumerateBC

    Class Program
        ' Limit the collection size to 2000 items
        ' at any given time. Set itemsToProduce to >500
        ' to hit the limit.
        Const upperLimit As Integer = 1000

        ' Adjust this number to see how it impacts
        ' the producing-consuming pattern.
        Const itemsToProduce As Integer = 100

        Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)

        ' Variables for diagnostic output only.
        Shared sw As New Stopwatch()
        Shared totalAdditions As Integer = 0

        ' Counter for synchronizing producers.
        Shared producersStillRunning As Integer = 2

        Shared Sub Main()

            ' Start the stopwatch.
            sw.Start()
            ' Queue the Producer threads. 

            Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
            Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))

            ' Store in an array for use with ContinueWhenAll
            Dim producers() As Task = {task1, task2}

            ' Create a cleanup task that will call CompleteAdding after
            ' all producers are done adding items.
            Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())

            ' Queue the Consumer thread. Put this call
            ' before Parallel.Invoke to begin consuming as soon as
            ' the producers add items.
            Task.Factory.StartNew(Sub() RunConsumer())

            ' Keep the console window open while the
            ' consumer thread completes its output.
            Console.ReadKey()

        End Sub

        Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
            Dim additions As Integer = 0

            For i As Integer = start To start + itemsToProduce - 1

                ' The data that is added to the collection.
                Dim ticks As Long = sw.ElapsedTicks

                'Display additions and subtractions.
                Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)

                ' Don't try to add item after CompleteAdding
                ' has been called.
                If collection.IsAddingCompleted = False Then
                    collection.Add(ticks)
                End If

                ' Counter for demonstration purposes only.
                additions = additions + 1

                ' Uncomment this line to 
                ' slow down the producer threads without sleeping.
                Thread.SpinWait(100000)

            Next
            Interlocked.Add(totalAdditions, additions)
            Console.WriteLine("{0} is done adding: {1} items", ID, additions)

        End Sub

        Shared Sub RunConsumer()
            ' GetConsumingEnumerable returns the enumerator for the 
            ' underlying collection.
            Dim subtractions As Integer = 0

            For Each item In collection.GetConsumingEnumerable

                subtractions = subtractions + 1
                Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
                                  item.ToString("D18"), subtractions, collection.Count)
            Next

            Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
                                    totalAdditions, subtractions, collection.Count())
            sw.Stop()

            Console.WriteLine("Press any key to exit.")
        End Sub

    End Class
End Module

Este ejemplo usa un bucle de foreach con el método BlockingCollection<T>.GetConsumingEnumerable en el subproceso de consumo, lo que provoca que se quite cada elemento de la colección según se enumera.This example uses a foreach loop with the BlockingCollection<T>.GetConsumingEnumerable method in the consuming thread, which causes each item to be removed from the collection as it is enumerated. System.Collections.Concurrent.BlockingCollection<T> limita el número máximo de elementos que se encuentran en la colección en cualquier momento.System.Collections.Concurrent.BlockingCollection<T> limits the maximum number of items that are in the collection at any time. Al enumerar la colección de esta forma, bloquea el subproceso consumidor si no hay elementos disponibles o si la colección está vacía.Enumerating the collection in this way blocks the consumer thread if no items are available or if the collection is empty. En este ejemplo, el bloqueo no constituye un problema porque el subproceso de productor agrega elementos más rápido de lo que se pueden consumir.In this example blocking is not a concern because the producer thread adds items faster than they can be consumed.

No hay ninguna garantía de que los elementos se enumeren en el mismo orden en que los agregan los subprocesos de productor.There is no guarantee that the items are enumerated in the same order in which they are added by the producer threads.

Para enumerar la colección sin modificación alguna, simplemente use foreach (For Each) sin el método GetConsumingEnumerable.To enumerate the collection without modifying it, just use foreach (For Each) without the GetConsumingEnumerable method. Pero es importante comprender que este tipo de enumeración representa una instantánea de la colección en un momento concreto.However, it is important to understand that this kind of enumeration represents a snapshot of the collection at a precise point in time. Si otros subprocesos agregan o quitan elementos mientras se ejecuta el bucle, este no podría representar el estado real de la colección.If other threads are adding or removing items concurrently while you are executing the loop, then the loop might not represent the actual state of the collection.

Vea tambiénSee also