BlockingCollection の概要

BlockingCollection<T> は、次の機能を提供するスレッド セーフなコレクション クラスです。

  • Producer-Consumer パターンの実装。

  • 複数のスレッドから同時に項目を追加し、取得する。

  • 最大容量 (オプション)。

  • コレクションが空またはいっぱいになったときにブロックする挿入操作と削除操作。

  • ブロックせずに実行されるか、指定された時間が経過するまでブロックする、挿入および削除の "試行" 操作。

  • IProducerConsumerCollection<T> を実装する任意のコレクション型のカプセル化。

  • キャンセル トークンを使用したキャンセル。

  • foreach を使用した 2 種類の列挙 (Visual Basic の場合は For Each)。

    1. 読み取り専用の列挙。

    2. 列挙された項目を削除する列挙。

境界とブロッキングのサポート

BlockingCollection<T> はバインドとブロックに対応しています。 境界は、コレクションの最大容量を設定できることを意味します。 境界を使用すると、メモリ内のコレクションの最大サイズを制御し、producer スレッドが consumer スレッドよりも先に進行しすぎるのを防ぐことができます。これは、特定のシナリオで重要になります。

コレクションには、複数のスレッドやタスクが同時に項目を追加できます。コレクションが指定された最大容量に達すると、producer スレッドは項目が削除されるまでブロックします。 複数の cosumer が同時に項目を削除できます。コレクションが空になった場合、consumer スレッドは、producer が項目を追加するまでブロックします。 producer スレッドでは、それ以上項目が追加されないことを示すために、CompleteAdding を呼び出すことができます。 consumer では、IsCompleted プロパティを監視して、コレクションが空になったときや、それ以上の項目は追加されないことになったときを把握できます。 次の例は、容量の上限が 100 に設定された単純な BlockingCollection を示しています。 いくつかの外部条件が true である間、producer タスクはコレクションに項目を追加し、CompleteAdding を呼び出します。 consumer タスクは、IsCompleted プロパティが true になるまで項目を取得します。

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

コード例全体については、「方法: BlockingCollection の項目を個別に追加および取得する」を参照してください。

時間制限付きのブロッキング操作

境界のあるコレクションにおける時間制限付きの TryAdd 操作および TryTake 操作では、メソッドは項目を追加または取得しようと試みます。 項目を使用できる場合、その項目は参照渡しで渡された変数に格納され、メソッドは true を返します。 指定されたタイムアウト期間を経過しても項目が取得されない場合、メソッドは false を返します。 その後、再びコレクションへのアクセスを試みる前に、他の必要な処理をスレッドで自由に実行できます。 時間制限付きアクセス ブロックの例については、「方法: BlockingCollection の項目を個別に追加および取得する」の 2 つ目の例を参照してください。

追加操作と取得操作の取り消し

追加操作と取得操作は、通常、ループ内で実行されます。 TryAdd メソッドまたは TryTake メソッドに CancellationToken を渡し、各イテレーションでトークンの IsCancellationRequested プロパティの値を確認するようにすると、ループを取り消すことができます。 値が true である場合は、リソースをクリーンアップし、ループを終了してキャンセル要求に応答するかどうかを決定できます。 次の例は、キャンセル トークンを受け取る TryAdd のオーバーロードと、それを使用するコードを示しています。

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

キャンセル サポートを追加する方法の例については、「方法: BlockingCollection の項目を個別に追加および取得する」の 2 つ目の例を参照してください。

コレクションの型の指定

BlockingCollection<T> の作成時には、容量の上限だけでなく、使用するコレクションの型も指定できます。 たとえば、先入れ先出し法 (FIFO) に ConcurrentQueue<T> を指定したり、後入れ先出し法 (LIFO) に ConcurrentStack<T> を指定したりできます。 IProducerConsumerCollection<T> インターフェイスを実装する任意のコレクション クラスを使用できます。 BlockingCollection<T> の既定のコレクション型は ConcurrentQueue<T> です。 次のコード例は、1000 という容量を持ち、ConcurrentBag<T> を使用する文字列の BlockingCollection<T> を作成する方法を示しています。

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

詳細については、「方法:境界ブロッキング機能をコレクションに追加する」を参照してください。

IEnumerable のサポート

BlockingCollection<T>GetConsumingEnumerable メソッドを提供します。これにより、コンシューマーは foreach (Visual Basic では For Each) を使用し、コレクションが完了するまで (コレクションが空になり、それ以上項目が追加されなくなるまで)、項目を削除できます。 詳細については、「方法:ForEach を使用して BlockingCollection 内の項目を削除する」を参照してください。

多数の BlockingCollection を 1 つとして使用する

consumer が複数のコレクションから同時に項目を取得する必要のあるシナリオでは、BlockingCollection<T> の配列を作成し、TakeFromAnyAddToAny などの静的メソッドを使用できます。これらのメソッドでは、配列内の任意のコレクションを対象に追加または取得の操作を実行できます。 いずれかのコレクションがブロックしている場合、メソッドはすぐに別のコレクションを試します。これは、操作を実行できるコレクションが見つかるまで続行されます。 詳細については、「方法:パイプラインでブロッキング コレクションの配列を使用する」を参照してください。

関連項目