BlockingCollection — Przegląd

BlockingCollection<T> to klasa kolekcji bezpieczna wątkowo, która udostępnia następujące funkcje:

  • Implementacja wzorca producent-konsument.

  • Równoczesne dodawanie i pobieranie elementów z wielu wątków.

  • Opcjonalna maksymalna pojemność.

  • Operacje wstawiania i usuwania, które blokują, gdy kolekcja jest pusta lub pełna.

  • Operacje wstawiania i usuwania "try", które nie blokują lub blokują maksymalnie określony okres czasu.

  • Hermetyzuje dowolny typ kolekcji, który implementuje IProducerConsumerCollection<T>

  • Anulowanie z tokenami anulowania.

  • Dwa rodzaje wyliczenia z foreach (For Each w Visual Basic):

    1. Wyliczenie tylko do odczytu.

    2. Wyliczenie, które usuwa elementy podczas wyliczania.

Obsługa ograniczenia i blokowania

BlockingCollection<T> obsługuje ograniczenia i blokowanie. Ograniczenie oznacza, że można ustawić maksymalną pojemność kolekcji. Ograniczenie jest ważne w niektórych scenariuszach, ponieważ umożliwia kontrolowanie maksymalnego rozmiaru kolekcji w pamięci i uniemożliwia tworzenie wątków zbyt daleko przed wątkami zużywającym.

Wiele wątków lub zadań może jednocześnie dodawać elementy do kolekcji, a jeśli kolekcja osiągnie określoną maksymalną pojemność, wątki tworzące będą blokowane do momentu usunięcia elementu. Wielu użytkowników może jednocześnie usuwać elementy, a jeśli kolekcja stanie się pusta, wątki zużywające będą blokowane, dopóki producent nie doda elementu. Wątek tworzący może wywołać CompleteAdding polecenie , aby wskazać, że nie zostaną dodane żadne elementy. Konsumenci monitorują IsCompleted właściwość, aby wiedzieć, kiedy kolekcja jest pusta i nie zostaną dodane żadne elementy. W poniższym przykładzie pokazano prostą wartość BlockingCollection z ograniczoną pojemnością 100. Zadanie producenta dodaje elementy do kolekcji, o ile jakiś warunek zewnętrzny jest spełniony, a następnie wywołuje metodę CompleteAdding. Zadanie odbiorcy przyjmuje elementy do momentu, gdy IsCompleted właściwość ma wartość true.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Pełny przykład można znaleźć w temacie How to: Add and Take Items Individually from a BlockingCollection (Jak dodawać i przyjmować elementy indywidualnie z kolekcji BlockingCollection).

Operacje blokowania czasowego

W przypadku czasowego blokowania TryAdd i TryTake operacji w powiązanych kolekcjach metoda próbuje dodać lub podjąć element. Jeśli element jest dostępny, jest umieszczany w zmiennej, która została przekazana przez odwołanie, a metoda zwraca wartość true. Jeśli żaden element nie zostanie pobrany po upływie określonego limitu czasu, metoda zwraca wartość false. Następnie wątek jest bezpłatny, aby wykonać inną przydatną pracę przed ponowną próbą uzyskania dostępu do kolekcji. Aby zapoznać się z przykładem czasu blokowania dostępu, zobacz drugi przykład w temacie How to: Add and Take Items Individually from a BlockingCollection (Jak dodawać i przyjmować elementy indywidualnie z kolekcji BlockingCollection).

Anulowanie operacji dodawania i wykonywania

Operacje dodawania i wykonywania są zwykle wykonywane w pętli. Pętlę można anulować, przekazując CancellationToken element do TryAdd metody lub TryTake , a następnie sprawdzając wartość właściwości tokenu IsCancellationRequested w każdej iteracji. Jeśli wartość ma wartość true, oznacza to, że odpowiadasz na żądanie anulowania, czyszcząc wszystkie zasoby i zamykając pętlę. W poniższym przykładzie pokazano przeciążenie TryAdd , które przyjmuje token anulowania i kod, który go używa:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Aby zapoznać się z przykładem dodawania obsługi anulowania, zobacz drugi przykład w temacie How to: Add and Take Items Individually from a BlockingCollection (Jak dodawać i przyjmować elementy indywidualnie z kolekcji BlockingCollection).

Określanie typu kolekcji

Podczas tworzenia obiektu BlockingCollection<T>można określić nie tylko powiązaną pojemność, ale także typ kolekcji do użycia. Można na przykład określić ConcurrentQueue<T> zachowanie pierwszego wyjścia (FIFO, first in-first out) lub ConcurrentStack<T> zachowanie ostatniego wyjścia (LIFO). Możesz użyć dowolnej klasy kolekcji, która implementuje IProducerConsumerCollection<T> interfejs. Domyślnym typem kolekcji dla elementu BlockingCollection<T> jest ConcurrentQueue<T>. Poniższy przykład kodu pokazuje, jak utworzyć BlockingCollection<T> ciągi o pojemności 1000 i używa elementu ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

Aby uzyskać więcej informacji, zobacz How to: Add Bounding and Blocking Functionality to a Collection (Instrukcje: dodawanie funkcji ograniczenia i blokowania do kolekcji).

Obsługa IEnumerable

BlockingCollection<T> Udostępnia metodę umożliwiającą GetConsumingEnumerable użytkownikom używanie foreach (For Each w Visual Basic) usuwania elementów do momentu ukończenia kolekcji, co oznacza, że jest ona pusta i nie zostaną dodane żadne elementy. Aby uzyskać więcej informacji, zobacz How to: Use ForEach to Remove Items in a BlockingCollection (Instrukcje: usuwanie elementów forEach w kolekcji BlockingCollection).

Używanie wielu kolekcji BlockingCollections jako jednego

W przypadku scenariuszy, w których użytkownik musi pobrać elementy z wielu kolekcji jednocześnie, można utworzyć tablice BlockingCollection<T> i użyć metod statycznych, takich jak TakeFromAny i AddToAny które zostaną dodane do dowolnej kolekcji w tablicy lub do niego pobrać. Jeśli jedna kolekcja blokuje, metoda natychmiast próbuje innego, dopóki nie znajdzie tej, która może wykonać operację. Aby uzyskać więcej informacji, zobacz How to: Use Arrays of Blocking Collections in a Pipeline (Instrukcje: używanie tablic kolekcji blokujących w potoku).

Zobacz też