同時実行の問題

データ並列パターンと PLINQ

Igor Ostrovsky

マルチコア プロセッサは、主流のデスクトップ コンピューターではすっかり一般的になりましたが、マルチコア プロセッサの真価を完全に引き出すアプリケーションを作成することはいまだに困難です。しかし、マルチコアによる並列処理は確かに実現可能であり、よく利用されているアプリケーションの多くが、マルチコア コンピューターではパフォーマンスを向上できるように変更されています。.NET Framework のバージョン 4 では、マルチコア向けのアプリケーション開発を支援するツールとして、コーディネーションと同期の新しいプリミティブとデータ構造のセット、Task Parallel Library、並列 LINQ (PLINQ) など、プログラマーが利用できるツールがいくつか提供されます。この記事では、この中から最後の PLINQ について説明します。

PLINQ は興味深いツールで、問題がデータ並列パターンの 1 つに適合すれば、マルチコア コンピューターではスケール変換されるコードをより簡単に作成できるようにします。PLINQ は LINQ プロバイダーであるため、PLINQ を使用してプログラミングを行う場合は、おなじみの LINQ モデルを使用します。PLINQ は LINQ to Objects に非常によく似ていますが、クエリを評価する作業のスケジュール設定に、複数のスレッドを使用する点が異なります。クエリを LINQ to Objects ではなく PLINQ にバインドするには、データ ソースの後に AsParallel 呼び出しを追加するだけです (以下のコードを参照)。この手順によって、データ ソースが ParallelQuery ラッパーを使用してラップされ、クエリ内の残りの拡張メソッドが LINQ to Objects ではなく PLINQ にバインドされます。

IEnumerable<int> src = ...
var query = 
      src.AsParallel()
      .Where(x => x % 2 == 0)
      .Select(x => Foo(x));

foreach(var x in query)
{
      Bar(x); }

これと同じコードを C# クエリ構文を使用すると次のようになります。

IEnumerable<int> src = ...
var query = 
      from x in src.AsParallel()
      where x % 2 == 0
      select Foo(x);

foreach(var x in query)
{
      Bar(x); }

ただし、AsParallel を LINQ to Objects のクエリに追加することで、必ずしもプログラムの実行速度が向上するとは限りません。PLINQ は、適切なアルゴリズムを使用してデータを分割し、クエリの各部分をそれぞれ並列に実行して、結果をマージします。この方法によってマルチコア コンピューターでのパフォーマンス向上が実現できるかどうかは、いくつかの要因に左右されます。

PLINQ からメリットを得るには、クエリ全体の作業が、スレッド プールへの作業のスケジュール設定にかかるオーバーヘッドを吸収できるぐらい大きく、1 要素あたりの作業が、その要素の処理にかかるわずかなオーバーヘッドを吸収できるぐらい意味のあるものである必要があります。また、コストの高い計算を複数のワーカー スレッドで複数の入力要素に対して評価するように、クエリの中で最もコストの高い部分を分解できた場合に、PLINQ は最も威力を発揮します。

ここからは、PLINQ を使用して効率的に並列化できる、各種のデータ並列パターンを紹介します。

プロジェクション

"プロジェクション"、"マッピング"、"Select 演算子" という用語はどれも、よく使用される同じ操作で自然なデータ並列操作を指しています。プロジェクションは、1 つの引数を使用して答えを計算するプロジェクション関数が使用される操作で、一連の入力に対してこの関数を評価する必要があります。

プロジェクションは、プロジェクション関数を複数の入力要素に対して同時に評価できるため、自然にデータ並列になります。少なくとも多少の計算コストがかかる関数であった場合、PLINQ を使用することで、コンピューターに搭載されている複数のコアに関数の計算処理を分散できるため、計算速度を向上できます。

たとえば、次のクエリでは、PLINQ が ExpensiveFunc への呼び出しを (少なくともマルチコア コンピューターでは) 並列に評価します。

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel()
             .Select(x => ExpensiveFunc(x));

foreach(var x in query)
{
      Console.WriteLine(x);
}

このコード ブロックは、ExpensiveFunc(0)、ExpensiveFunc(1) ~ ExpensiveFunc(99) をコンソールに出力します。ただし、値は必ずしも予想どおりの順序で出力されません。既定では、PLINQ は順番を無視してシーケンスを扱うため、値が出力される順序は決まっていません。

src 配列を順序付けて処理するように PLINQ に指示するには、次のように AsOrdered 演算子を使用します。

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel().AsOrdered()
             .Select(x => ExpensiveFunc(x));

foreach(var x in query)
{
      Console.WriteLine(x);

これで結果が、予想どおりに ExpensiveFunc(0) ~ ExpensiveFunc(99) の順番で画面に出力されます。PLINQ では、順序を維持するために入力要素ごとに追加のオーバーヘッドが発生しますが、これは通常は大きなコストにはなりません。

これまで見てきた例では、PLINQ クエリは常に for ループで使用されています。このようなシナリオでは、PLINQ は、結果をバックグラウンドで計算する非同期ワーカーを用意し、for ループは次の結果が準備できていない場合常に待機します。ただし、PLINK クエリを使用する方法は、これだけではありません。ToArray、ToList、ToDictionary などの演算子によって、PLINK クエリを実行することもできます。

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel().AsOrdered()
             .Select(x => ExpensiveFunc(x));

int[] results = query.ToArray(); // The query runs here

この場合も、PLINQ は ExpensiveFunc を並列に呼び出して、クエリの実行速度を向上しています。ただし、今回は、実行は同期と取って行われます。クエリ全体は、コード サンプルで指定されている 1 行で完了します。

結果を配列に変換するのではなく、結果の合計を計算したり、結果の最大値、最小値、平均値、またはユーザー定義集計を使用したりすることもできます。

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel()
             .Select(x => ExpensiveFunc(x));

int resultSum = query.Sum(); // The query runs here

また別の可能性としては、生成された要素ごとに任意のアクションを実行することもできます。

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel()
             .Select(x => ExpensiveFunc(x));

int resultSum = query.ForAll(
      x => Console.WriteLine(x)
);

ForAll を使用するこの例と、for ループを使用するこのセクションの最初の例との間には、重要な違いがあります。ForAll の例では、アクションは PLINQ ワーカー スレッドで実行されます。for ループの例では、明らかに、PLINQ クエリを作成するスレッドで、ループ本体が 実行されます。

また、並列のプロジェクション クエリを作成する場合、あと 1 つ注意が必要な問題に遭遇する可能性があります。PLINQ は、入力シーケンスを複数のシーケンスに分割し、各シーケンスを並列に実行することで、並列実行を実現します。シーケンスを分割する作業は "パーティション分割" と呼ばれ、どのパーティション分割アルゴリズムを選択するかが、クエリのパフォーマンスに大きく影響する可能性があります。

通常、PLINQ によって入力シーケンスをパーティション分割する適切なアルゴリズムが選択されますが、入力が配列 (または IList を実装するその他の型) である場合は、PLINQ による選択を上書きした方がよい場合があります。このような場合の既定の PLINQ の動作では、配列は、コンピューターに搭載されているコア数と同じ数のセクションに、静的にパーティション分割されます。しかし、プロジェクション要素のコストが要素ごとに異なる場合に、負荷の高いすべての要素が同一のパーティションに入る可能性があります。

PLINQ が配列 (またはその他の IList 型) の負荷分散パーティション分割アルゴリズムを使用するようにするには、Partitioner.Create メソッドを使用して、loadBalancing 引数の値に true を渡します。

int[] src = Enumerable.Range(0, 100).ToArray();
var query = Partitioner.Create(src, true).AsParallel()
             .Select(x => ExpensiveFunc(x));

foreach(var x in query)
{
      Console.WriteLine(x);
}

フィルター

プロジェクション パターンを少し変えたのがフィルターです。フィルターでは、各入力を元に出力を計算するプロジェクション関数が使用されるのではなく、特定の要素を出力に含めるかどうかを決定するフィルター関数が使用されます。

並列処理による高速化を最も実現できるのは、フィルター関数を評価するための計算コストが大きい場合です。状況によっては、コストの低い関数を使用したフィルターでも、特にフィルター関数がほとんどの入力を拒否する場合は、処理速度を大幅に向上できます。次の例では、PLINQ が [0 ~ 99] の範囲内で、ExpensiveFilter が true を返す番号を出力します。

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel()
             .Where(x => ExpensiveFilter(x));

foreach(var x in query)
{
      Console.WriteLine(x);
}

最初のプロジェクションの例と同様に、このパターンの結果では順序が考慮されません。順序を付けて結果を返すための方法は同じで、AsParallel の後に AsOrdered を追加するだけです。実は、プロジェクションについて前に説明した補足事項は、すべてフィルターにも当てはまります。つまり、foreach、ToArray、ToList、ToDictionary、集計、または ForAll を使用して、クエリを使用できます。また、入力が配列に保持されていて、静的パーティション分割では負荷が均等に分散されない可能性がある場合は、既定のパーティション分割方式を上書きした方がよい場合があります (これらのオプションは、これ以降のパターンでも、基本的に同じように使用できます)。

独立したアクション

プロジェクション パターンとフィルター パターンでは、計算コストのかかる部分は、入力シーケンスを出力シーケンスに変換しています。より簡単なパターンは、シーケンス要素ごとに実行される必要がある、コストの高いアクションです。このアクションは値を返す必要はありません。次のように、計算コストの高い、スレッドセーフの副作用を実行するだけです。

int[] src = Enumerable.Range(0, 100).ToArray();
src.AsParallel()
.ForAll(
     x => { ExpensiveAction(x); }
);

同時実行で処理するため、PLINQ はワーカー スレッで ExpensiveAction を実行します。つまり、ExpensiveAction は計算コストが高く、さらに重要なことにスレッドセーフです。ExpensiveAction は異なる複数のスレッドで呼び出されるため、これらの呼び出しには順序がありません。

結局のところ、このパターンはあまりに単純なので PLINQ は必要ではなく、単純に Task Parallel Library の Parallel.ForEach を使用できます (.NET Framework 4 の場合)。ただし、PLINQ の ForAll は、その他の PLINQ 演算子と組み合わせて使用すると便利なことが多くあります。

int[] src = Enumerable.Range(0, 100).ToArray();
src.AsParallel()
.Where(x => x%2 == 0)
.ForAll(
     x => { ExpensiveAction(x); }
);

シーケンスの圧縮

シーケンスの圧縮は、プロジェクションに似たパターンですが、入力シーケンスが 1 つではなく 2 つである点が異なります。1 つの入力要素を 1 つの出力要素に変換するコストの高い関数を使用するのではなく、一方のシーケンスからの 1 つの入力と、もう一方のシーケンスからの 1 つの入力を単一の出力要素に変換するコストの高い関数を使用します。

このパターンは、.NET 4.0 で導入された Zip LINQ-to-Objects 演算子を使用することで、サポートされます。Zip LINQ-to-Objects 演算子は PLINQ でも使用できます。最大限のパフォーマンスが得られるのは、入力シーケンスが配列または IList コレクションの場合です。

int[] arr1 = ..., arr2 = ...;
int[] results =
      arr1.AsParallel().AsOrdered()
      .Zip(
           arr2.AsParallel().AsOrdered(),
           (arr1Elem, arr2Elem) => ExpensiveFunc(arr1Elem, arr2Elem))
      .ToArray();

実際、入力シーケンスが配列に保持されている場合は、守備よく Zip 演算子をプロジェクションとして表現し直すことができます。

int[] arr1 = ..., arr2 = ...;
int length = Math.Min(arr1.Length, arr2.Length);
int[] results = 
      ParallelEnumerable.Range(0, length).AsOrdered()
      .Select(index => ExpensiveFunc(arr1[index], arr2[index]))
      .ToArray();

どのパターンの実装を選択する場合でも、この種類のワークロードは PLINQ を使用して順当に高速化できます。

リダクション

リダクションは、集計や圧縮とも呼ばれ、シーケンスの要素を単一の結果になるまで組み合わせる操作です。よく使用されるリダクションには合計、平均、最小値、および最大値があり、これらのリダクションは非常に頻繁に使用されるため、PLINQ では演算子 (Sum、Average、Min、および Max) として直接サポートされています。ただし、これらの演算子が要素ごとに実行する作業はほとんどないため、PLINQ では、通常、これらの演算子はプロジェクションやフィルターなどコストの高い計算も含まれるクエリで使用されます。このルールの例外として 1 つ考えられるのは、コストの高い比較関数を使用する最小値または最大値の操作です。ただし、リダクション関数がコストの高い操作の場合、リダクションはそれ自体で並列ワークロードになり得ます。

集計にはいくつかのオーバーロードがありますが、この記事ではスペースに制約があるため、これらのオーバーロードについては説明しません (PLINQ のリダクションの詳細については、blogs.msdn.com/pfxteam/archive/2008/01/22/7211660.aspx (英語) およびblogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx (英語) を参照してください)。集計のオーバーロードで最も一般的なものには、次のシグネチャがあります。

public static TResult Aggregate<TSource, TAccumulate, TResult>(
    this ParallelQuery<TSource> source,
    Func<TAccumulate> seedFactory,
    Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
    Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
    Func<TAccumulate, TResult> resultSelector)

以下は、これを使用して並列の Average 演算子を実装する方法です。

public static double Average(this IEnumerable<int> source)
{
      return source.Aggregate(
             () => new double[2],
             (acc, elem) => {
                   acc[0] += elem; acc[1]++; return acc;
             },
             (acc1, acc2) => {
                   acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1;
             },
             acc => acc[0] / acc[1]);
}

各 PLINQ ワーカーは、seedFactory を使用してそれぞれのアキュムレータを初期化し、2 つの double 値から成る独自の配列を取得します。次に、ワーカーは入力シーケンスの部分を処理し、updateAccumulatorFunc を使用して、各要素でアキュムレータを更新します。その後、異なるワーカーどうしがそれぞれのアキュムレータを combineAccumulatorsFunc を使用して組み合わせ、最終的に、1 つのアキュムレータが resultSelector を使用することで戻り値に変換されます。

並列の Average 演算子の例は、Aggregate 演算子のセマンティクスの説明に便利ですが、その要素ごとの作業 (2 回の加算) は並列化を行うにはおそらくあまりにコストが低すぎるでしょう。ただし、実際には、よりコストの高いリダクション関数を使用する状況が発生することが多くあります。

並べ替え

LINQ は OrderBy 演算子による並べ替えをサポートします。PLINQ は、当然、並列アルゴリズムを使用して、並べ替えを実装します。通常、この並べ替えのアルゴリズムは、LINQ-to-Objects の並べ替えに対して、ほどほどの高速化しか実現されないように思えます (おそらく、4 コアのコンピューターで 2 ~ 3 倍)。ただし、LINQ-to-Objects の並べ替えモデルでは、かなり負荷の高いインターフェイスが OrderBy に使用されるということを忘れないでください。キー セレクターは必須で、式ツリーではなくデリゲートとして渡されるため、キー セレクターが ID 関数 x => x であったとしても、PLINQ はキー セレクターを無視しません。したがって、キーと値が同じ場合でも、PLINQ はキーと値のペアを処理します。また、LINQ の関数的な性質の結果、PLINQ はシーケンスが配列に保持されている場合でも、シーケンスをインプレースで並べ替えることはできません。並べ替えをした場合、元の順序が崩れるためです。

この点を考慮して、LINQ-to-Objects OrderBy 演算子を使用する場合は、PLINQ を使用してクエリを高速化できます。ただし、整数配列のみを並べ替える必要がある場合は、Array.Sort などのインプレースの並べ替えの方が、PLINQ の OrderBy よりも速くなるでしょう。インプレースの並べ替えを高速化する必要がある場合は、Task Parallel Library に加えて、独自の並列の並べ替えアルゴリズムを実装する必要がある可能性があります。

一対多の変換

プロジェクションでは、各入力要素が正確に 1 個の出力要素に変換されます。フィルターを使用すると、すべての入力要素を 0 個または 1 個の出力要素に変換できます。しかし、各入力から任意の数の出力を生成したい場合はどうなるでしょう。PLINQ ではこのような場合、SelectMany 演算子を使用して対応します。次に例を示します。

IEnumerable<int> inputSeq = ...
int[] results = 
      inputSeq.AsParallel()
      .SelectMany(input => ComputeResults(input))
      .ToArray();

このコードは、入力シーケンス内のすべての要素に対して ComputeResults を呼び出しています。各 ComputeResults は、0 個、1 個、または複数の結果を保持した IEnumerable 型 (配列など) を返します。クエリの出力には、ComputeResults 呼び出しによって返されたすべての結果が含まれます。

このパターンは、この記事の他のパターンに比べるとやや直感的でないため、使い方を具体例で見てみましょう。一対多のパターンは、おなじみの N クイーン問題 (どのクイーンも別のクイーンを攻撃しないようなクイーンのチェス盤での配置をすべて見つける) のような問題を解く検索アルゴリズムを実装できます。この入力シーケンスは、いくつかのクイーンが既に配置されているチェス盤のシーケンスになります。SelectMany 演算子を使ったクエリを使用して、入力の最初の状態から開始し、割り出せるすべての N クイーンの答えを見つけます。

IEnumerable<ChessboardState> initStates = GenerateInitialStates();
ChessboardState[] solutions = 
      initStates.AsParallel()
      .SelectMany(board => board.FindAllSolutions())
      .ToArray();

より複雑なクエリ

今回説明した PLINQ パターンは、どれも短いクエリ スニペットで、主に 1 つか 2 つの演算子を使用しています。もちろん、1 クエリで複数のパターンを組み合わせて使用できます。次のクエリでは、フィルター、プロジェクション、および独立したアクションを組み合わせています。

int[] src = Enumerable.Range(0, 100).ToArray();
src.AsParallel()
            .Where(x => ExpensiveFilter(x))
            .Select(x => ExpensiveFunc(x));
            .ForAll(x => { ExpensiveAction(x); });

フィルター、プロジェクション、および副作用アクションがほぼ同じコストであろうと、そのうちの 1 つが実行時間の大部分を使用しようと、PLINQ はこのクエリを効果的に並列化します。また、LINQ to Objects と同様に、PLINQ は各演算子の処理後に結果を具体化しません。したがって、PLINQ では、シーケンス全体に Where を実行し、フィルターされたシーケンスを格納して、Select、そして最終的に ForAll を実行するような処理を行いません。操作はできる限り組み合わされ、簡素化された複数のクエリにより、ワーカーが入力要素を次々に "渡す" ことで、クエリ全体が処理されます。

並列パターンを互いに組み合わせるだけでなく、並列パターンを任意の LINQ 演算子と組み合わせることもできます。PLINQ は LINQ to Objects と機能的に同等になるように配慮されているため、すべての LINQ 演算子を使用できます。PLINQ は、ほぼすべての LINQ to Objects クエリ を実行しますが、必ずしも LINQ to Objects クエリを高速化しません。一部の演算子とクエリ形式は、並列化されるとしても、あまり有効ではありません。理想的には、クエリの中で最も計算コストの高い部分に、この記事で説明した並列パターンの 1 種が使用されていると有効です。

1 つ注意が必要なのは、複雑なクエリでは、並列実行に必要でもコストが高くなる可能性があるアルゴリズムを使用せずに、クエリの各部分を順次実行する方を PLINQ が選択する場合があることです。これは、特になんらかの形で実行時間の大部分を使用するコストの高いデリゲートがクエリに含まれている場合は、望ましい処理ではありません。次のコード例では、PLINQ は ExpensiveFunc() デリゲートを順次実行します。

int[] src = Enumerable.Range(0, 100).ToArray();
int[] res = src.AsParallel()
             .Select(x => ExpensiveFunc(x))
             .TakeWhile(x => x % 2 == 0)
             .ToArray();

この問題の解決方法は 2 とおりあります。1 つ目は、コストがかかる可能性があるアルゴリズムを使用しなければならないとしても、クエリを並列で実行するように PLINQ にヒントを示します。

int[] src = Enumerable.Range(0, 100).ToArray();
int[] res = src.AsParallel()
             .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
             .Select(x => ExpensiveFunc(x))
             .TakeWhile(x => x % 2 == 0)
             .ToArray();

2 つ目は、クエリを分解して、クエリの中でコストの高い部分のみを PLINQ により実行し、残りの部分は LINQ to Objects を使用して実行します。PLINQ クエリに AsSequential 演算子を使用して、LINQ to Objects にバインドする後続の演算子を取得できます。

int[] src = Enumerable.Range(0, 100).ToArray();
int[] res = src.AsParallel()
             .Select(x => ExpensiveFunc(x))
             .AsSequential()
             .TakeWhile(x => x % 2 == 0)
             .ToArray();

PLINQ を最大限活用する

マルチコア アプリケーションを作成するのは困難ですが、必ずしもそうとは限りません。PLINQ は、必要なときに、データ並列の計算を高速化するためのツールとして、利用できるようにしておくと便利です。ここで紹介したパターンを覚え、開発するプログラムでこれらのパターンを活用してください。        

Igor Ostrovsky は、マイクロソフトの並列コンピューティング プラットフォーム チームのソフトウェア開発エンジニアであり、PLINQ 開発の第一人者です。

この記事のレビューに協力してくれた技術スタッフの Michael Blome に心より感謝いたします。

ご質問やご意見は、mmsync@microsoft.com (英語のみ) までお送りください。