PLINQ および TPL 用のカスタム パーティショナー

データ ソース上で操作を並列化する場合の必須の手順の 1 つは、ソースを複数のスレッドによって同時にアクセスできる複数のセクションにパーティション分割することです。 PLINQ およびタスク並列ライブラリ (TPL: Task Parallel Library) には、並列クエリまたは ForEach ループを記述するときに透過的に機能する既定のパーティショナーが用意されています。 より高度なシナリオでは、独自のパーティショナーをプラグインできます。

パーティション分割の種類

データ ソースは、さまざまな方法でパーティション分割できます。 最も効率的な方法は、ソースを複数のサブシーケンスに物理的に分離するのではなく、複数のスレッドが協調して元のソース シーケンスを処理するというものです。 配列や、長さが事前にわかっている IList コレクションなどの他のインデックス付きソースの場合は、"範囲パーティション分割" が最も簡単なパーティション分割です。 各スレッドは、一意の開始インデックスおよび終了インデックスを受け取ります。そのため、他のスレッドで上書きしたり、上書きされたりすることなく、ソースの範囲を処理できます。 範囲パーティション分割に必要なオーバーヘッドは、最初に行われる範囲を作成する作業のみです。その後は、追加の同期は不要です。 したがって、ワークロードが均等に分割されている限り、優れたパフォーマンスを実現できます。 範囲パーティション分割の欠点は、あるスレッドが早く終了した場合、他のスレッドが作業を終了するのを支援できないことです。

リンク リストまたは長さがわからない他のコレクションの場合は、"チャンク パーティション分割" を使用できます。 チャンク パーティション分割では、並列ループまたは並列クエリ内のすべてのスレッドまたはタスクが、1 つのチャンク内のソース要素をいくつか使用し、それらのソース要素を処理し、その後追加の要素を取得します。 パーティショナーは、すべての要素が配布され、重複する要素が存在しないことを保証します。 チャンクは任意のサイズにすることができます。 たとえば、「方法: 動的パーティションを実装する」で示されているパーティショナーは、1 つの要素のみを含むチャンクを作成します。 チャンクが大きすぎない限り、この種類のパーティション分割は、本質的に負荷分散を実行します。これは、スレッドへの要素の割り当てが事前に決定されないからです。 ただし、スレッドが別のチャンクを取得する必要があるたびに、パーティショナーが同期のオーバーヘッドを発生させます。 これらのケースで発生する同期の量は、チャンクのサイズに反比例します。

一般に、範囲パーティション分割の方が高速なのは、デリゲートの実行時間が短時間から中程度までの長さであり、ソースに多数の要素があり、かつ各パーティションの総作業量がほぼ等価である場合のみです。 したがってチャンク パーティション分割の方が、ほとんどのケースで高速です。 要素が少ないか、またはデリゲートの実行時間が長いソースでは、チャンク パーティション分割と範囲パーティション分割のパフォーマンスがほぼ等しくなります。

TPL パーティショナーは、動的な数のパーティションもサポートします。 つまり、たとえば ForEach ループが新しいタスクを作成するときに、パーティションをその場で作成できます。 この機能により、パーティショナーをループ自体と共に拡大縮小できます。 動的パーティショナーも本質的に負荷分散を実行します。 カスタム パーティショナーを作成するときは、ForEach ループから使用できるようにするために、動的パーティション分割をサポートする必要があります。

PLINQ 用の負荷分散パーティショナーの構成

Partitioner.Create メソッドの一部のオーバーロードを使用すると、配列または IList ソース用のパーティショナーを作成し、スレッド間でワークロードの分散を試みるかどうかを指定できます。 負荷分散を実行するようにパーティショナーを構成した場合、チャンク パーティション分割が使用され、要素は要求時に小さいチャンクで各パーティションに渡されます。 この方法は、ループまたはクエリの全体が完了するまで、すべてのパーティションに処理する要素があることを保証するのに役立ちます。 追加のオーバーロードを使用すると、任意の IEnumerable ソースを負荷分散パーティション分割できます。

一般に、負荷分散では、パーティションで比較的頻繁にパーティショナーに要素を要求する必要があります。 これに対し、静的パーティション分割を実行するパーティショナーでは、範囲パーティション分割またはチャンク パーティション分割を使用して、各パーティショナーに要素を一度に割り当てることができます。 この方法では、負荷分散よりもオーバーヘッドが少なくて済みますが、あるスレッドが他のスレッドよりもはるかに多くの作業を行う場合は、実行時間が長くなることがあります。 既定では、IList または配列が渡された場合、PLINQ は常に負荷分散なしの範囲パーティション分割を使用します。 PLINQ で負荷分散を有効にするには、次の例に示すように、Partitioner.Create メソッドを使用します。

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

特定のシナリオで負荷分散を使用するかどうかを判断する最適な方法は、典型的な負荷およびコンピューター構成の下で操作が完了するまでにどのくらいの時間がかかるかを実験し、計測することです。 たとえば、静的パーティション分割は、少数のコアしか持たないマルチコア コンピューターでは速度が飛躍的に向上することがありますが、比較的多くのコアを持つコンピューターでは速度が低下することがあります。

次の表に、Create メソッドで使用できるオーバーロードを示します。 これらのパーティショナーは、PLINQ または Task での使用に限定されるわけではありません。 これらのパーティショナーは、任意のカスタム parallel コンストラクトでも使用できます。

オーバーロード 負荷分散を使用する
Create<TSource>(IEnumerable<TSource>) 常時
Create<TSource>(TSource[], Boolean) ブール型の引数を true と指定した場合
Create<TSource>(IList<TSource>, Boolean) ブール型の引数を true と指定した場合
Create(Int32, Int32) なし
Create(Int32, Int32, Int32) 木椀を
Create(Int64, Int64) 木椀を
Create(Int64, Int64, Int64) なし

Parallel.ForEach 用の静的範囲パーティショナーの構成

For ループでは、ループの本体がデリゲートとしてメソッドに提供されます。 このデリゲートを呼び出すコストは、仮想メソッドの呼び出しとほぼ同じです。 シナリオによっては、並列ループの本体が小さく、各ループ反復でデリゲートを呼び出すコストが膨大になることがあります。 そのような状況では、いずれかの Create オーバーロードを使用して、ソース要素に対する範囲パーティション分割の IEnumerable<T> を作成できます。 その後、この範囲のコレクションを、本体が通常の for ループで構成される ForEach メソッドに渡すことができます。 この方法の利点は、デリゲートを呼び出すコストが、要素ごとに 1 回ではなく、範囲ごとに 1 回しか発生しないことです。 基本的なパターンを次の例に示します。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

ループ内のすべてのスレッドは、指定されたサブ範囲の開始インデックス値と終了インデックス値を含む独自の Tuple<T1,T2> を受け取ります。 内側の for ループでは、fromInclusive 値および toExclusive 値を使用して、配列または IList を直接ループ処理します。

Create オーバーロードのいずれかを使用すると、パーティションのサイズと、パーティションの数を指定できます。 このオーバーロードは、要素ごとの作業がきわめて少なく、要素ごとに 1 回の仮想メソッド呼び出しでもパフォーマンスに大きな影響が及ぶシナリオで使用できます。

カスタム パーティショナー

シナリオによっては、独自のパーティショナーを実装するのが適切か、または必須である場合があります。 たとえば、クラスの内部構造に関する知識に基づいて、カスタム コレクション クラスを既定のパーティショナーよりも効率的にパーティション分割できる場合があります。 または、ソース コレクションの異なる場所にある要素を処理するのにかかる時間についての知識に基づいて、可変サイズの範囲パーティションを作成する必要がある場合があります。

基本的なカスタム パーティショナーを作成するには、System.Collections.Concurrent.Partitioner<TSource> からクラスを派生させ、次の表に示すように仮想メソッドをオーバーライドします。

メソッド 説明
GetPartitions このメソッドは、メイン スレッドによって 1 回呼び出され、IList(IEnumerator(TSource)) を返します。 ループまたはクエリ内の各ワーカー スレッドでは、リスト上で GetEnumerator を呼び出して、個別のパーティションに対する IEnumerator<T> を取得できます。
SupportsDynamicPartitions true を実装した場合は GetDynamicPartitions を返し、それ以外の場合は false を返します。
GetDynamicPartitions SupportsDynamicPartitionstrue の場合、このメソッドを必要に応じて GetPartitions の代わりに呼び出すことができます。

結果が並べ替え可能である必要がある場合、または要素へのインデックス付きアクセスが必要な場合、System.Collections.Concurrent.OrderablePartitioner<TSource> から派生させて、次の表に示すように仮想メソッドをオーバーライドします。

メソッド 説明
GetPartitions このメソッドは、メイン スレッドによって 1 回呼び出され、IList(IEnumerator(TSource)) を返します。 ループまたはクエリ内の各ワーカー スレッドでは、リスト上で GetEnumerator を呼び出して、個別のパーティションに対する IEnumerator<T> を取得できます。
SupportsDynamicPartitions GetDynamicPartitions を実装した場合は true を返し、それ以外の場合は false を返します。
GetDynamicPartitions 通常は GetOrderableDynamicPartitions を単純に呼び出します。
GetOrderableDynamicPartitions SupportsDynamicPartitionstrue の場合、このメソッドを必要に応じて GetPartitions の代わりに呼び出すことができます。

次の表に、3 種類の負荷分散パーティショナーで OrderablePartitioner<TSource> クラスを実装する方法の詳細を示します。

メソッド/プロパティ 負荷分散なしの IList/配列 負荷分散ありの IList/配列 IEnumerable
GetOrderablePartitions 範囲パーティション分割を使用 指定された partitionCount のリストに最適化されたチャンク パーティション分割を使用 静的な数のパーティションを作成することにより、チャンク パーティション分割を使用
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions サポートしていない機能にアクセスしたときの例外をスロー リストおよび動的なパーティションに最適化されたチャンク パーティション分割を使用 動的な数のパーティションを作成することにより、チャンク パーティション分割を使用
KeysOrderedInEachPartition true を返します。 true を返します。 true を返します。
KeysOrderedAcrossPartitions true を返します。 false を返します。 false を返します。
KeysNormalized true を返します。 true を返します。 true を返します。
SupportsDynamicPartitions false を返します。 true を返します。 true を返します。

動的パーティション

パーティショナーを ForEach メソッドで使用する場合、動的な数のパーティションを返すことができる必要があります。 これは、パーティショナーがループの実行中の任意の時点で、新しいパーティションの列挙子をオンデマンドで供給できることを意味します。 基本的に、ループで新しい並列タスクを追加するたびに、そのタスク用の新しいパーティションが要求されます。 データが順序付け可能である必要がある場合は、System.Collections.Concurrent.OrderablePartitioner<TSource> から派生させて、各パーティション内の各項目に一意のインデックスが割り当てられるようにします。

詳細および使用例については、「方法: 動的パーティションを実装する」を参照してください。

パーティショナーのコントラクト

カスタム パーティショナーを実装するときは、次のガイドラインに従って、PLINQ および TPL 内の ForEach との適切な相互作用を保証します。

  • GetPartitionspartitionsCount に 0 以下の引数を指定して呼び出された場合は、ArgumentOutOfRangeException をスローします。 PLINQ および TPL が 0 と等しい partitionCount を渡すことはありませんが、このような場合に備えることをお勧めします。

  • GetPartitions および GetOrderablePartitions では、常に partitionsCount 個のパーティションを返す必要があります。 パーティショナーがデータを使い果たし、要求された数のパーティションを作成できない場合、このメソッドでは残りのパーティションのそれぞれについて、空の列挙子を返す必要があります。 それ以外の場合、PLINQ と TPL はいずれも InvalidOperationException をスローします。

  • GetPartitionsGetOrderablePartitionsGetDynamicPartitionsGetOrderableDynamicPartitions では、null (Visual Basic では Nothing) を返さないようにします。 返した場合、PLINQ または TPL は InvalidOperationException をスローします。

  • パーティションを返すメソッドでは、データ ソースを完全かつ一意に列挙できるパーティションを常に返す必要があります。 パーティショナーの設計により特別に必要な場合を除いて、データ ソース内の項目が重複したり、スキップされたりすることがないようにします。 この規則に従わないと、出力順序が乱れる場合があります。

  • 次のブール型の getter では、常に以下の値を正確に返して、出力順序が乱れないようにする必要があります。

    • KeysOrderedInEachPartition: 各パーティションは、昇順のキー インデックスを持つ要素を返します。

    • KeysOrderedAcrossPartitions: 返されるすべてのパーティションについて、パーティション i のキー インデックスは、パーティション i-1 のキー インデックスよりも大きくなります。

    • KeysNormalized: すべてのキー インデックスは、0 から始まりギャップなしで単調に増加します。

  • どのインデックスも一意である必要があります。 インデックスを重複させることはできません。 この規則に従わないと、出力順序が乱れる場合があります。

  • どのインデックスも負数以外である必要があります。 この規則に従わないと、PLINQ または TPL が例外をスローする場合があります。

関連項目