Gewusst wie: Hinzufügen und Entfernen von einzelnen Elementen zu bzw. aus einer BlockingCollectionHow to: Add and Take Items Individually from a BlockingCollection

Dieses Beispiel zeigt, wie einem BlockingCollection<T>-Objekt Elemente sowohl auf blockierende als auch auf nicht blockierende Weise hinzugefügt und wie Elemente auf diese Weisen aus dem Objekt entfernt werden können.This example shows how to add and remove items from a BlockingCollection<T> in both a blocking and non-blocking manner. Weitere Informationen zu BlockingCollection<T> finden Sie unter Übersicht über BlockingCollection.For more information on BlockingCollection<T>, see BlockingCollection Overview.

Ein Beispiel für das Entfernen von Elementen aus einer BlockingCollection<T> per Enumeration, bis sie leer ist und keine weiteren Elemente hinzugefügt werden, finden Sie unter Vorgehensweise: Entfernen von Elementen in einer BlockingCollection mit ForEach.For an example of how to enumerate a BlockingCollection<T> until it is empty and no more elements will be added, see How to: Use ForEach to Remove Items in a BlockingCollection

BeispielExample

Dieses erste Beispiel zeigt, wie Elemente hinzuzufügen und zu entnehmen sind, damit die Vorgänge blockiert werden, wenn die Auflistung entweder vorübergehend leer ist (beim Entnehmen) oder ihre maximale Kapazität erreicht hat (beim Hinzufügen) oder wenn ein angegebenes Timeout erreicht ist.This first example shows how to add and take items so that the operations will block if the collection is either temporarily empty (when taking) or at maximum capacity (when adding), or a specified timeout period has elapsed. Beachten Sie, dass Blockierung für maximale Kapazität nur aktiviert wird, wenn bei der Erstellung von BlockingCollection im Konstruktor eine maximale Kapazität angegeben wurde.Note that blocking on maximum capacity is only enabled when the BlockingCollection has been created with a maximum capacity specified in the constructor.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class Program
{
   static void Main()
   {
      // Increase or decrease this value as desired.
      int itemsToAdd = 500;

      // Preserve all the display output for Adds and Takes
      Console.SetBufferSize(80, (itemsToAdd * 2) + 3);

      // A bounded collection. Increase, decrease, or remove the
      // maximum capacity argument to see how it impacts behavior.
      BlockingCollection<int> numbers = new BlockingCollection<int>(50);


      // A simple blocking consumer with no cancellation.
      Task.Run(() =>
      {
          int i = -1;
          while (!numbers.IsCompleted)
          {
              try
              {
                  i = numbers.Take();
              }
              catch (InvalidOperationException)
              {
                  Console.WriteLine("Adding was completed!");
                  break;
              }
              Console.WriteLine("Take:{0} ", i);

              // Simulate a slow consumer. This will cause
              // collection to fill up fast and thus Adds wil block.
              Thread.SpinWait(100000);
          }

          Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
      });

      // A simple blocking producer with no cancellation.
      Task.Run(() =>
      {
          for (int i = 0; i < itemsToAdd; i++) {
              numbers.Add(i);
              Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
          }

          // See documentation for this method.
          numbers.CompleteAdding();
      });

      // Keep the console display open in debug mode.
      Console.ReadLine();
   }
}
Option Strict On
Option Explicit On

Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks

Module SimpleBlocking

    Class Program
        Shared Sub Main()
            ' Increase or decrease this value as desired.
            Dim itemsToAdd As Integer = 500

            ' Preserve all the display output for Adds and Takes
            Console.SetBufferSize(80, (itemsToAdd * 2) + 3)

            ' A bounded collection. Increase, decrease, or remove the 
            ' maximum capacity argument to see how it impacts behavior.
            Dim numbers = New BlockingCollection(Of Integer)(50)

            ' A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(Sub()
                                      Dim i As Integer = -1
                                      While numbers.IsCompleted = False
                                          Try
                                              i = numbers.Take()
                                          Catch ioe As InvalidOperationException
                                              Console.WriteLine("Adding was completed!")
                                              Exit While
                                          End Try
                                          Console.WriteLine("Take:{0} ", i)
                                          ' Simulate a slow consumer. This will cause
                                          ' collection to fill up fast and thus Adds wil block.
                                          Thread.SpinWait(100000)
                                      End While
                                      Console.WriteLine(vbCrLf & "No more items to take. Press the Enter key to exit.")
                                  End Sub)

            ' A simple blocking producer with no cancellation.
            Task.Factory.StartNew(Sub()
                                      For i As Integer = 0 To itemsToAdd
                                          numbers.Add(i)
                                          Console.WriteLine("Add:{0} Count={1}", i, numbers.Count)
                                      Next

                                      'See documentation for this method.
                                      numbers.CompleteAdding()
                                  End Sub)

            'Keep the console window open in debug mode.
            Console.ReadLine()
        End Sub
    End Class

End Module

BeispielExample

In diesem zweiten Beispiel wird veranschaulicht, wie Elemente hinzuzufügen bzw. zu entnehmen sind, damit die Vorgänge nicht blockiert werden.This second example shows how to add and take items so that the operations will not block. Ist kein Element vorhanden, oder ist die maximale Kapazität in einer begrenzten Auflistung erreicht, oder ist das Timeout erreicht, gibt der TryAdd -Vorgang oder der TryTake -Vorgang „false“ zurück.If no item is present, or maximum capacity on a bounded collection has been reached, or the timeout period has elapsed, then the TryAdd or TryTake operation returns false. Dies ermöglicht dem Thread, vorübergehend eine andere Aufgabe zu erledigen und später erneut zu versuchen, entweder ein neues Element abzurufen oder das Element hinzuzufügen, das zuvor nicht hinzugefügt werden konnte.This allows the thread to do some other useful work for awhile and then try again later to either retrieve a new item, or try to add the same item that could not be added previously. Das Programm veranschaulicht außerdem, wie für den Zugriff auf eine BlockingCollection<T> ein Abbruch implementiert wird.The program also demonstrates how to implement cancellation when accessing a BlockingCollection<T>.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class ProgramWithCancellation
{

    static int inputs = 2000;

    static void Main()
    {
        // The token source for issuing the cancelation request.
        CancellationTokenSource cts = new CancellationTokenSource();

        // A blocking collection that can hold no more than 100 items at a time.
        BlockingCollection<int> numberCollection = new BlockingCollection<int>(100);

        // Set console buffer to hold our prodigious output.
        Console.SetBufferSize(80, 2000);

        // The simplest UI thread ever invented.
        Task.Run(() =>
        {
            if (Console.ReadKey(true).KeyChar == 'c')
                cts.Cancel();
        });

        // Start one producer and one consumer.
        Task t1 = Task.Run(() => NonBlockingConsumer(numberCollection, cts.Token));
        Task t2 = Task.Run(() => NonBlockingProducer(numberCollection, cts.Token));

        // Wait for the tasks to complete execution
        Task.WaitAll(t1, t2);

        cts.Dispose();
        Console.WriteLine("Press the Enter key to exit.");
        Console.ReadLine();
    }

    static void NonBlockingConsumer(BlockingCollection<int> bc, CancellationToken ct)
    {
        // IsCompleted == (IsAddingCompleted && Count == 0)
        while (!bc.IsCompleted)
        {
            int nextItem = 0;
            try
            {
                if (!bc.TryTake(out nextItem, 0, ct))
                {
                    Console.WriteLine(" Take Blocked");
                }
                else
                    Console.WriteLine(" Take:{0}", nextItem);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Taking canceled.");
                break;
            }

            // Slow down consumer just a little to cause
            // collection to fill up faster, and lead to "AddBlocked"
            Thread.SpinWait(500000);
        }

        Console.WriteLine("\r\nNo more items to take.");
    }

    static void NonBlockingProducer(BlockingCollection<int> bc, CancellationToken ct)
    {
        int itemToAdd = 0;
        bool success = false;

        do
        {
            // Cancellation causes OCE. We know how to handle it.
            try
            {
                // A shorter timeout causes more failures.
                success = bc.TryAdd(itemToAdd, 2, ct);
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Add loop canceled.");
                // Let other threads know we're done in case
                // they aren't monitoring the cancellation token.
                bc.CompleteAdding();
                break;
            }

            if (success)
            {
                Console.WriteLine(" Add:{0}", itemToAdd);
                itemToAdd++;
            }
            else
            {
                Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count);
                // Don't increment nextItem. Try again on next iteration.

                //Do something else useful instead.
                UpdateProgress(itemToAdd);
            }

        } while (itemToAdd < inputs);

        // No lock required here because only one producer.
        bc.CompleteAdding();
    }

    static void UpdateProgress(int i)
    {
        double percent = ((double)i / inputs) * 100;
        Console.WriteLine("Percent complete: {0}", percent);
    }
}
Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks

Class NonBlockingAccess
    Shared inputs As Integer = 2000

    Shared Sub Main()
        ' The token source for issuing the cancelation request.
        Dim cts As New CancellationTokenSource()

        ' A blocking collection that can hold no more than 100 items at a time.
        Dim numberCollection As BlockingCollection(Of Integer) = New BlockingCollection(Of Integer)(100)

        ' Set console buffer to hold our prodigious output.
        Console.SetBufferSize(80, 2000)

        ' The simplest UI thread ever invented.
        Task.Run(Sub()
                     If Console.ReadKey.KeyChar() = "c"c Then
                         cts.Cancel()
                     End If
                 End Sub)

        ' Start one producer and one consumer.
        Dim t1 As Task = Task.Run(Sub() NonBlockingConsumer(numberCollection, cts.Token))
        Dim t2 As Task = Task.Run(Sub() NonBlockingProducer(numberCollection, cts.Token))

        ' Wait for the tasks to complete execution
        Task.WaitAll(t1, t2)

        cts.Dispose()
        Console.WriteLine("Press the Enter key to exit.")
        Console.ReadLine()

    End Sub

    Shared Sub NonBlockingConsumer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)

        ' IsCompleted is equivalent to (IsAddingCompleted And Count = 0)
        While bc.IsCompleted = False
            Dim nextItem As Integer = 0
            Try
                If bc.TryTake(nextItem, 0, ct) = False Then
                    Console.WriteLine("  Take Blocked.")
                Else
                    Console.WriteLine(" Take: {0}", nextItem)
                End If
            Catch ex As OperationCanceledException
                Console.WriteLine("Taking canceled.")
                Exit While
            End Try
            'Slow down consumer just a little to cause
            ' collection to fill up faster, and lead to "AddBlocked"
            Thread.SpinWait(500000)
        End While

        Console.WriteLine(vbCrLf & "No more items to take.")
    End Sub

    Shared Sub NonBlockingProducer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
        Dim itemToAdd As Integer = 0
        Dim success As Boolean = False

        Do
            'Cancellation causes OCE. We know how to handle it.
            Try
                success = bc.TryAdd(itemToAdd, 2, ct)
            Catch ex As OperationCanceledException
                Console.WriteLine("Add loop canceled.")

                ' Let other threads know we're done in case
                ' they aren't monitoring the cancellation token.
                bc.CompleteAdding()
                Exit Do
            End Try

            If success = True Then
                Console.WriteLine(" Add:{0}", itemToAdd)
                itemToAdd = itemToAdd + 1
            Else
                Console.Write("  AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count)

                ' Don't increment nextItem. Try again on next iteration
                ' Do something else useful instead.
                UpdateProgress(itemToAdd)
            End If
        Loop While itemToAdd < inputs

        ' No lock required here because only one producer.
        bc.CompleteAdding()

    End Sub

    Shared Sub UpdateProgress(ByVal i As Integer)
        Dim percent As Double = (CType(i, Double) / inputs) * 100
        Console.WriteLine("Percent complete: {0}", percent)
    End Sub
End Class

Siehe auchSee Also

System.Collections.Concurrent
Übersicht über BlockingCollectionBlockingCollection Overview