BlockingCollection 概述BlockingCollection Overview

BlockingCollection<T> 是一个线程安全集合类,可提供以下功能:BlockingCollection<T> is a thread-safe collection class that provides the following features:

  • 实现制造者-使用者模式。An implementation of the Producer-Consumer pattern.

  • 通过多线程并发添加和获取项。Concurrent adding and taking of items from multiple threads.

  • 可选最大容量。Optional maximum capacity.

  • 集合为空或已满时通过插入和移除操作进行阻塞。Insertion and removal operations that block when collection is empty or full.

  • 插入和移除“尝试”操作不发生阻塞,或在指定时间段内发生阻塞。Insertion and removal "try" operations that do not block or that block up to a specified period of time.

  • 封装实现 IProducerConsumerCollection<T> 的任何集合类型Encapsulates any collection type that implements IProducerConsumerCollection<T>

  • 使用取消标记执行取消操作。Cancellation with cancellation tokens.

  • 支持使用 foreach(在 Visual Basic 中,使用 For Each)的两种枚举:Two kinds of enumeration with foreach (For Each in Visual Basic):

    1. 只读枚举。Read-only enumeration.

    2. 在枚举项时将项移除的枚举。Enumeration that removes items as they are enumerated.

限制和阻塞支持Bounding and Blocking Support

BlockingCollection<T> 支持限制和阻塞。BlockingCollection<T> supports bounding and blocking. 限制意味着可以设置集合的最大容量。Bounding means you can set the maximum capacity of the collection. 限制在某些情况中很重要,因为它使你能够控制内存中的集合的最大大小,并可阻止制造线程移动到离使用线程前方太远的位置。Bounding is important in certain scenarios because it enables you to control the maximum size of the collection in memory, and it prevents the producing threads from moving too far ahead of the consuming threads.

多个线程或任务可同时向集合添加项,如果集合达到其指定最大容量,则制造线程将发生阻塞,直到移除集合中的某个项。Multiple threads or tasks can add items to the collection concurrently, and if the collection reaches its specified maximum capacity, the producing threads will block until an item is removed. 多个使用者可以同时移除项,如果集合变空,则使用线程将发生阻塞,直到制造者添加某个项。Multiple consumers can remove items concurrently, and if the collection becomes empty, the consuming threads will block until a producer adds an item. 制造线程可调用 CompleteAdding 来指示不再添加项。A producing thread can call CompleteAdding to indicate that no more items will be added. 使用者将监视 IsCompleted 属性以了解集合何时为空且不再添加项。Consumers monitor the IsCompleted property to know when the collection is empty and no more items will be added. 下面的示例展示了容量上限为 100 的简单 BlockingCollection。The following example shows a simple BlockingCollection with a bounded capacity of 100. 只要满足某些外部条件为 true,制造者任务就会向集合添加项,然后调用 CompleteAddingA producer task adds items to the collection as long as some external condition is true, and then calls CompleteAdding. 使用者任务获取项,直到 IsCompleted 属性为 true。The consumer task takes items until the IsCompleted property is true.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

有关完整的示例,请参阅 如何:在 BlockingCollection 中逐个添加和取出项For a complete example, see How to: Add and Take Items Individually from a BlockingCollection.

计时阻塞操作Timed Blocking Operations

在针对有限集合的计时阻塞 TryAddTryTake 操作中,此方法将尝试添加或取出某个项。In timed blocking TryAdd and TryTake operations on bounded collections, the method tries to add or take an item. 如果项可用,项会被置于通过引用传入的变量中,然后方法返回 true。If an item is available it is placed into the variable that was passed in by reference, and the method returns true. 如果在指定的超时期限过后未检索到任何项,方法返回 false。If no item is retrieved after a specified time-out period the method returns false. 相应线程可以任意执行一些其他有用的工作,然后再重新尝试访问该集合。The thread is then free to do some other useful work before trying again to access the collection. 有关计时阻塞访问的示例,请参阅如何:在 BlockingCollection 中逐个添加和取出项中的第二个示例。For an example of timed blocking access, see the second example in How to: Add and Take Items Individually from a BlockingCollection.

取消添加和取出操作Cancelling Add and Take Operations

添加和取出操作通常会在一个循环内执行。Add and Take operations are typically performed in a loop. 可以通过以下方法来取消循环:向 TryAddTryTake 方法传入 CancellationToken,然后在每次迭代时检查该标记的 IsCancellationRequested 属性的值。You can cancel a loop by passing in a CancellationToken to the TryAdd or TryTake method, and then checking the value of the token's IsCancellationRequested property on each iteration. 如果值为 true,由你自行决定是否通过清理所有资源并退出循环来响应取消请求。If the value is true, then it is up to you to respond the cancellation request by cleaning up any resources and exiting the loop. 下面的示例演示获取取消标记和使用该标记的代码的 TryAdd 重载:The following example shows an overload of TryAdd that takes a cancellation token, and the code that uses it:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

有关如何添加取消支持的示例,请参阅如何:在 BlockingCollection 中逐个添加和取出项中的第二个示例。For an example of how to add cancellation support, see the second example in How to: Add and Take Items Individually from a BlockingCollection.

指定集合类型Specifying the Collection Type

创建 BlockingCollection<T> 时,不仅可以指定上限容量,而且可以指定要使用的集合类型。When you create a BlockingCollection<T>, you can specify not only the bounded capacity but also the type of collection to use. 例如,可为先进先出 (FIFO) 行为指定 ConcurrentQueue<T>,也可为后进先出 (LIFO) 行为指定 ConcurrentStack<T>For example, you could specify a ConcurrentQueue<T> for first in-first out (FIFO) behavior, or a ConcurrentStack<T> for last in-first out (LIFO) behavior. 可使用实现 IProducerConsumerCollection<T> 接口的任何集合类。You can use any collection class that implements the IProducerConsumerCollection<T> interface. BlockingCollection<T> 的默认集合类型为 ConcurrentQueue<T>The default collection type for BlockingCollection<T> is ConcurrentQueue<T>. 下面的代码示例演示如何创建字符串的 BlockingCollection<T>,其容量为 1000 并使用 ConcurrentBag<T>The following code example shows how to create a BlockingCollection<T> of strings that has a capacity of 1000 and uses a ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

有关详细信息,请参阅如何:向集合添加限制和阻塞功能For more information, see How to: Add Bounding and Blocking Functionality to a Collection.

IEnumerable 支持IEnumerable Support

BlockingCollection<T> 提供 GetConsumingEnumerable 方法,该方法允许使用者使用 foreach(在 Visual Basic 中为 For Each)删除项直到完成集合,也就是说,集合为空且不再添加项。BlockingCollection<T> provides a GetConsumingEnumerable method that enables consumers to use foreach (For Each in Visual Basic) to remove items until the collection is completed, which means it is empty and no more items will be added. 有关详细信息,请参阅如何:使用 ForEach 移除 BlockingCollection 中的项For more information, see How to: Use ForEach to Remove Items in a BlockingCollection.

将多个 BlockingCollection 作为整体使用Using Many BlockingCollections As One

在使用者需要同时取出多个集合中的项的情况下,可以创建 BlockingCollection<T> 的数组并使用静态方法,如 TakeFromAnyAddToAny 方法,这两个方法可以在该数组的任意集合中添加或取出项。For scenarios in which a consumer needs to take items from multiple collections simultaneously, you can create arrays of BlockingCollection<T> and use the static methods such as TakeFromAny and AddToAny that will add to or take from any of the collections in the array. 如果一个集合发生阻塞,此方法会立即尝试其他集合,直到找到能够执行该操作的集合。If one collection is blocking, the method immediately tries another until it finds one that can perform the operation. 有关详细信息,请参阅如何:在管道中使用阻塞集合的数组For more information, see How to: Use Arrays of Blocking Collections in a Pipeline.

请参阅See also