Übersicht über BlockingCollections

BlockingCollection<T> ist eine threadsichere Sammlungsklasse, die die folgenden Features bietet:

  • Implementierung des Producer-Consumer-Musters.

  • Gleichzeitiges Hinzufügen und Entfernen von Elementen aus mehreren Threads.

  • Optionale maximale Kapazität.

  • Einfüge- und Löschvorgänge, die blockiert werden, wenn die Sammlung leer oder voll ist.

  • Einfüge- und Löschvorgänge zum „Ausprobieren“, die Abläufe nicht oder nur für eine festgelegte Zeitspanne blockieren.

  • Kapselung aller Sammlungstypen, die IProducerConsumerCollection<T> implementieren

  • Abbruch mit Abbruchtoken.

  • Zwei Arten von Enumeration mit foreach (For Each in Visual Basic):

    1. Schreibgeschützte Enumeration.

    2. Enumeration, in deren Verlauf Elemente entfernt werden.

Unterstützung von Begrenzen und Blockieren

BlockingCollection<T> unterstützt das Begrenzen und Blockieren. Begrenzen bedeutet, dass Sie die maximale Kapazität der Sammlung festlegen können. Das Begrenzen ist in bestimmten Szenarien wichtig, da es Ihnen ermöglicht, die maximale Größe der Sammlung im Arbeitsspeicher zu steuern, und es verhindert, dass die erzeugenden Threads den verbrauchenden Threads zu weit vorauseilen.

Mehrere Threads oder Tasks können gleichzeitig der Sammlung Elemente hinzufügen, und wenn die Sammlung die angegebene maximale Kapazität erreicht, werden die erzeugenden Threads blockiert, bis ein Element entfernt wird. Mehrere Consumer können gleichzeitig Elemente entfernen, und wenn die Sammlung leer ist, werden die verbrauchenden Threads blockiert, bis ein Producer ein Element hinzufügt. Ein Producerthread kann CompleteAdding aufrufen, um anzuzeigen, dass keine Elemente mehr hinzugefügt werden. Consumer überwachen die IsCompleted-Eigenschaft, um zu erkennen, wann die Sammlung leer ist und keine Elemente mehr hinzugefügt werden. Das folgende Beispiel zeigt eine einfache BlockingCollection mit einer begrenzten Kapazität von 100. Ein Producertask fügt der Sammlung Elemente hinzu, solange eine externe Bedingung wahr ist, und ruft dann CompleteAdding auf. Der Consumertask entnimmt Elemente, bis die IsCompleted-Eigenschaft wahr ist.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Ein vollständiges Beispiel finden Sie unter Vorgehensweise: Hinzufügen und Entfernen von einzelnen Elementen zu bzw. aus einer BlockingCollection.

Zeitgesteuerte Blockierungsvorgänge

In zeitgesteuerten TryAdd- und TryTake-Blockierungsvorgängen, die für begrenzte Sammlungen durchgeführt werden, versucht die Methode, ein Element hinzuzufügen oder zu entnehmen. Wenn ein Element verfügbar ist, wird es in die Variable eingesetzt, die durch Verweis übergeben wurde, und die Methode gibt WAHR zurück. Wenn nach einer angegebenen Timeoutfrist kein Element abgerufen wird, gibt die Methode FALSCH zurück. Der Thread ist dann vor dem erneuten Versuch, auf die Sammlung zuzugreifen, frei für andere nützliche Aufgaben. Ein Beispiel für ein zeitgesteuertes Blockieren des Zugriffs finden Sie im zweiten Beispiel unter Vorgehensweise: Hinzufügen und Entfernen von einzelnen Elementen zu bzw. aus einer BlockingCollection.

Abbrechen von Hinzufüge- und Entnahmevorgängen

Hinzufüge- und Entnahmevorgänge werden in der Regel in einer Schleife ausgeführt. Sie können eine Schleife durch die Übergabe eines CancellationToken an die TryAdd- oder TryTake-Methode abbrechen und dann den Wert der IsCancellationRequested-Eigenschaft des Tokens in jeder Iteration überprüfen. Wenn der Wert TRUE ist, liegt es an Ihnen, auf die Abbruchanforderung durch Bereinigen aller Ressourcen und Beenden der Schleife zu reagieren. Das folgende Beispiel zeigt eine Überladung von TryAdd, die ein Abbruchtoken entgegennimmt, und den Code, der es verwendet:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Ein Beispiel für das Hinzufügen von Unterstützung für den Abbruch finden Sie im zweiten Beispiel unter Vorgehensweise: Hinzufügen und Entfernen von einzelnen Elementen zu bzw. aus einer BlockingCollection.

Angeben des Sammlungstyps

Beim Erstellen von BlockingCollection<T> können Sie nicht nur die begrenzte Kapazität angeben, sondern auch den Typ der zu verwendenden Sammlung. Sie könnten z.B. ConcurrentQueue<T> für das First-in-First-out-Verhalten (FIFO) oder ConcurrentStack<T> für das Last-in-First-out-Verhalten (LIFO) angeben. Sie können jede Sammlungsklasse verwenden, die die IProducerConsumerCollection<T>-Schnittstelle implementiert. Der standardmäßige Sammlungstyp für BlockingCollection<T> ist ConcurrentQueue<T>. Das folgende Codebeispiel veranschaulicht das Erstellen einer BlockingCollection<T> von Zeichenfolgen, die eine Kapazität von 1000 hat und ConcurrentBag<T> verwendet:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

Weitere Informationen finden Sie unter Vorgehensweise: Hinzufügen von Begrenzungs- und Blockierungsfunktionen zu einer Collection.

IEnumerable-Unterstützung

BlockingCollection<T> bietet eine GetConsumingEnumerable-Methode, mit der Consumer foreach (For Each in Visual Basic) verwenden können, um Elemente zu entfernen, bis die Sammlung abgeschlossen ist, d.h. bis sie leer ist und keine Elemente mehr hinzugefügt werden. Weitere Informationen finden Sie unter Vorgehensweise: Verwenden von ForEach zum Entfernen von Elementen aus einer BlockingCollection-Klasse.

Verwenden vieler BlockingCollections als eine einzige

Für Szenarien, in denen ein Consumer mehreren Sammlungen gleichzeitig Elemente entnehmen muss, können Sie Arrays mit BlockingCollection<T> erstellen und statische Methoden wie TakeFromAny und AddToAny verwenden, die Elemente beliebigen Sammlungen im Array hinzufügen bzw. ihnen entnehmen. Wenn eine Sammlung blockiert wird, versucht die Methode sofort, eine andere zu verwenden, bis sie eine Sammlung findet, die den Vorgang ausführen kann. Weitere Informationen finden Sie unter Vorgehensweise: Verwenden von Arrays mit BlockingCollections in einer Pipeline.

Siehe auch