並列プログラミング

.NET アプリケーションの並列処理についての過去、現在、未来

Stephen Toub

過去の並列処理

スレッドの直接操作は、応答性の高いクライアント アプリケーション、並列アルゴリズム、およびスケーラブルなサーバーを実現するために、以前から開発者が採用している方法です。しかし、この手法は、デッドロック、ライブロック、ロック コンボイ、2 ステップ ダンス、競合、オーバーサブスクリプションといったアプリケーションを望ましくない状態に陥らせる温床でもありました。Microsoft .NET Framework は、その誕生以来、専用の名前空間 (System.Threading) など、同時実行アプリケーションをビルドするための多種多様な下位レベル ツールを提供してきました。
.NET Framework 3.5 のコア アセンブリに含まれるこの名前空間には約 50 種類のツール (Thread、ThreadPool、Timer、Monitor、ManualResetEvent、ReaderWriterLock、Interlocked など) があるため、.NET Framework がスレッドのサポートに乏しいとは言えません。しかし、個人的には、以前のバージョンの .NET Framework について、スケーラブルで高度に並列化されたアプリケーションを開発者が適切にビルドするための基本的なサポートが乏しいと思います。ありがたいことに、この問題が .NET Framework 4 で是正され、引き続き、将来の .NET Framework のバージョンにも多大な取り組みが行われる見込みです。

マネージ言語に含まれる、並列コードを記述するための豊富なサブシステムの価値を疑問に思う方がいるかもしれません。結局のところ、並列処理と同時実行の目的はパフォーマンスの向上であり、パフォーマンスの向上を目指す開発者は、ハードウェアへの高速アクセス、あらゆる機能や状態の完全な制御、キャッシュ ラインの操作、操作のインターロックなどを適切に提供するネイティブ言語を求めます。もし本当にそうであれば、業界の状態に不安を感じます。C#、Visual Basic、F# のようなマネージ言語は、強力で効率の高いコードを迅速に開発するために、安全かつ生産性の高い環境をすべての開発者に (スキルのレベルに関係なく) 提供するために存在します。開発者には、何千ものビルド済みライブラリ クラスと、開発者の期待に応える近代的なサービスをすべて備えた成熟した言語が提供されるため、複雑な計算や浮動小数点数を集中的に使用するワークロードにおいてすばらしいパフォーマンスを発揮できます。つまり、マネージ言語とその関連フレームワークが、高パフォーマンスの同時実行アプリケーションをビルドするための確固としたサポートを提供するため、最新のハードウェアを使用する開発者は大きな成果が得られます。

以前から、パターンを学習することは優れた方法だと考えているため、ここでもパターンを使って説明します。"厄介な並列処理" パターンまたは "快適な並列処理" パターンの場合、最も一般的に必要とされる分岐と結合の構成要素の 1 つは並列ループです。並列ループは、並列処理に含まれるすべての独立したループの反復を処理することが目的です。このような処理を、前述の低レベルのプリミティブを使用して実行する方法を理解しておくと役に立ちます。そのため、C# で実装する単純な並列ループの基本実装について説明します。次のような、典型的な For ループを考えてみます。

for (int i=0; i<N; i++) {
  ... // Process i here
}

スレッドを直接使用して、このループの並列処理を行います (図 1 参照)。

図 1 For ループの並列処理

int lowerBound = 0, upperBound = N;
int numThreads = Environment.ProcessorCount;
int chunkSize = (upperBound - lowerBound) / numThreads;

var threads = new Thread[numThreads];
for (int t = 0; t < threads.Length; t++) {
  int start = (chunkSize * t) + lowerBound;
  int end = t < threads.Length - 1 ? start + chunkSize : upperBound;
  threads[t] = new Thread(delegate() {
    for (int i = start; i < end; i++) {
      ... // Process i here
    }
  });
}

foreach (Thread t in threads) t.Start(); // fork
foreach (Thread t in threads) t.Join();  // join

当然、この並列処理のアプローチには無数の問題があります。ループ専用に新しいスレッドを開始していますが、これは (特に、実行する処理の中でループ本体がささいなものである場合) オーバーヘッドを増加させるだけでなく、他の処理を同時実行しているプロセスでかなりのオーバーサブスクリプションが発生する可能性があります。静的分割を使用して複数のスレッドに作業を分散する場合、反復の範囲内でワークロードを均等に分散しないと、負荷がかなり不均等になる可能性があります (反復の回数を使用するスレッドに均等に分散していないと、最後のスレッドにオーバーフローの負荷がかかることは言うまでもありません)。もっと最悪なことは、そもそも開発者がこのコードを記述しなければならないことにあります。開発者が並列化を試みるすべてのアルゴリズムには同様のコード (ひいき目に見てももろいところがあるコード) が必要になります。

開発者が、このような並列ループを、並列プログラムに存在する多数のパターンの 1 つにすぎないと考えていると、先ほどのコードで示した問題がさらに大きくなります。開発者がこのような低レベルのコーディングを行って、このような並列パターンをすべて記述しなければならないことは、優れたプログラミング モデルになるわけでもなく、大規模並列ハードウェアを使用できる世界中の多数の開発者に成功をもたらすわけでもありません。

現在の並列処理

.NET Framework 4 の説明に移りましょう。.NET Framework 4 では、開発者がアプリケーションで並列処理を表現し、その並列処理を効率的に実行する作業を大幅に容易にする多数の機能を備えるように補強されています。これは先ほどの並列ループとは比べものになりませんが、説明は並列ループから始めます。

System.Threading 名前空間は、.NET Framework 4 で拡張され、新しく System.Threading.Tasks というサブ名前空間が導入されました。この名前空間には、新しく Parallel という型が含まれます。この型は、並列ループと構造化された分岐と結合のパターンを実装するための多数の静的メソッドを公開します。この使用方法の例として、先ほどの For ループを使用します。

for (int i=0; i<N; i++) {
  ... // Process i here
}

Parallel クラスを使用すると、次のように簡単に並列化できます。

Parallel.For(0, N, i => {
  ... // Process i here
});

ここでも開発者は、引き続きループの各反復処理が実際に独立するようにしなければなりませんが、それ以外はこの Parallel.For という構成要素がこのループの並列処理のすべての側面に対応します。この構成要素は、コンピューター処理に関与する、基になるすべてのスレッド間で入力範囲の動的なパーティション分割を処理する一方で、このパーティション分割のオーバーヘッドを最小限に抑え、静的パーティション分割の実装によって生じるオーバーヘッドに近づけます。また、コンピューター処理に関与するスレッドの数を動的にスケール変換して、特定のワークロードに見合う最適なスレッド数を見つけます (これは、おおかたの予想とは裏腹に、必ずしもハードウェア スレッドの数と同じになるわけではありません)。さらに、先ほどの単純な実装に含まれていない、例外処理機能なども提供します。最も重要な点は、開発者がスレッドの低レベルの OS 抽象化で並列処理を考える必要がなく、ワークロードのパーティション分割、マルチコアへのオフロード、および効率の良い結果の結合に、コードを駆使したソリューションを必要としなくなることです。代わりに、開発者は、重要な作業 (収益を上げるビジネス ロジック) に専念できます。

また、Parallel.For には、ループの操作をより細かく制御することを求める開発者向けの機能があります。For メソッドには複数のオプションが用意され、開発者は、ループの実行基盤となるスケジューラ、使用する並列処理の最大度数、およびループを早期終了するために外部のエンティティが使用するキャンセル トークンを制御できます。

var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.For(0, N, options, i=> {
  ... // Process i here
});

このようなカスタマイズ機能は、.NET Framework 内での並列化の取り組みに関する 1 つの目標を示しています。つまり、開発者が複雑なプログラミングを行うことなく、並列処理を大幅に容易に実現できると同時に、熟練の開発者が処理と実行を微調整できるツールを提供します。このようにさまざまな追加機能がサポートされます。Parallel.For の他のオーバーロードにより、開発者はループを早期終了することができます。

Parallel.For(0, N, (i,loop) => {
  ... // Process i here
  if (SomeCondition()) loop.Break();
});

また、他のオーバーロードでは、基盤として同じスレッドで実行されることになる反復処理全体に状態を遷移させ、削減などのアルゴリズムをはるかに効率的に実装できるようにします。

static int SumComputations(int [] inputs, Func<int,int> computeFunc) {
  int total = 0;
  Parallel.For(0, inputs.Length, () => 0, (i,loop,partial)=> {
    return partial + computeFunc(inputs[i]);
  }, 
  partial => Interlocked.Add(ref total, partial));
}

Parallel クラスは、整数の範囲だけでなく、任意の IEnumerable<T> ソース (.NET Framework における列挙可能なシーケンスの表現) もサポートします。つまり、コードでは、列挙子で MoveNext を連続的に呼び出して、次の Current 値を取得できます。列挙可能な任意のシーケンスを使用できることで、メモリ内表現に関係なく、任意のデータ セットの並列処理が可能になります。つまり、データ ソースを必要に応じて具体化して、MoveNext 呼び出しとしてページインし、ソース データのまだ具体化していないセクションにアクセスできます。

IEnumerable<string> lines = File.ReadLines("data.txt");
Parallel.ForEach(lines, line => {
  ... // Process line here
});

Parallel.For と同様、Parallel.ForEach でもさまざまなカスタマイズ機能がサポートされ、Parallel.For よりも細かい制御が可能です。たとえば、ForEach を使用して、入力データ セットのパーティション分割方法をカスタマイズできます。これは、パーティション分割に重点を置いた一連の抽象クラス (並列処理の構成要素が固定数または可変数のパーティションを要求できるようにします) を通じて実行され、パーティション分割機能が、入力データ セット全体にこのようなパーティションの抽象化を分配し、必要に応じて、データをそれらのパーティションに静的または動的に割り当てることができます。

Graph<T> graph = ...;
Partitioner<T> data = new GraphPartitioner<T>(graph);
Parallel.ForEach(data, vertex => {
  ... // Process vertex here
});

Parallel クラスの Parallel.For と Parallel.ForEach は Invoke メソッドによって補完されます。この Invoke メソッドは、基盤システムが対応できる、可能な限り多くの並列化を行って呼び出される任意の数の操作を受け取ります。この従来からの分岐と結合の構成要素により、よく使用されるクイック ソートの例など、再帰的な分割統治アルゴリズムの並列化が容易になります。

static void QuickSort<T>(T [] data, int lower, int upper) {
  if (upper – lower < THRESHOLD) {
    Array.Sort(data, index:lower, length:upper-lower);
  }
  else {
    int pivotPos = Partition(data, lower, upper);
    Parallel.Invoke(
      () => QuickSort(data, lower, pivotPos),
      () => QuickSort(data, pivotPos, upper));
  }
}

これは大きな第一歩ですが、Parallel クラスは使用可能な機能のほんの一部でしかありません。.NET Framework 4 で取り入れられた重要な並列化の取り組みの 1 つが、並列 LINQ の導入です。これは、親愛を込めて PLINQ (ピーリンクと発音) と呼ばれます。LINQ (統合言語クエリ) は、.NET Framework 3.5 で導入されました。LINQ には 2 つの説明があります。1 つは、データ セットを操作する際にメソッドとして公開される一連の演算子というもの、もう 1 つは、言語でこのようなクエリを直接表現するための、C# と Visual Basic のコンテキスト キーワードというものです。LINQ に含まれる多くの演算子は、データベース コミュニティで長年知られているのと同等の操作に基づいており、Select、SelectMany、Where、Join、GroupBy など約 50 種類あります。.NET Framework の標準クエリ演算子 API は、これらのメソッドのパターンを定義しますが、これらの操作の対象となる正確なデータ セットは定義せず、これらの操作の実装方法も正確には定義していません。さまざまな "LINQ プロバイダー" は、このパターンをさまざまなデータ ソースと対象の環境 (メモリ内コレクション、SQL データベース、オブジェクト/リレーショナル マッピング システム、HPC Server の計算クラスター、一時的なデータ ソースとストリーミング データ ソースなど) に実装します。最もよく使用されるプロバイダーの 1 つは、LINQ to Objects と呼ばれ、IEnumerable<T> の上位に実装される LINQ 演算子の完全なスイートを提供します。これにより、C# および Visual Basic でクエリを実装できます。たとえば、次のスニペットでは、ファイルのすべてのデータを 1 行ずつ読み取り、"secret" という単語が含まれている行のみを抽出し、その行を暗号化します。最終結果は、列挙可能なバイト配列になります。

IEnumerable<byte[]> encryptedLines = 
  from line in File.ReadLines("data.txt")
  where line.Contains("secret")
  select DataEncryptor.Encrypt(line);

計算を集中的に実行するクエリや、待ち時間の長い I/O 操作が多く関係するクエリについても、PLINQ はエンド ツー エンドの並列アルゴリズムを使用する完全な LINQ 演算子のセットを実装し、自動並列処理機能を提供します。したがって、前述のクエリは、".AsParallel()" をデータ ソースに追加するだけで並列処理されます。

IEnumerable<byte[]> encryptedLines = 
  from line in File.ReadLines("data.txt").AsParallel()
  where line.Contains("secret")
  select DataEncryptor.Encrypt(line);

Parallel クラスと同様、このモデルは計算を並列実行することによる影響を開発者が評価します。ただし、それ以外はシステムが実際の並列処理、パーティション分割、スレッドの絞り込みなどに関する低レベルの詳細を処理します。また、Parallel と同様に、これらの PLINQ クエリもさまざまな方法でカスタマイズできます。開発者は、パーティション分割の実現方法、実際に使用する並列処理の度数、同期と待ち時間のトレードオフなどを制御できます。

IEnumerable<byte[]> encryptedLines = 
  from line in new OneAtATimePartitioner<string>(
    File.ReadLines("data.txt"))
    .AsParallel()
    .AsOrdered()
    .WithCancellation(someExternalToken)
    .WithDegreeOfParallelism(4)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
  where line.Contains("secret")
  select DataEncryptor.Encrypt(line);

こうしたループとクエリの強力かつ高レベルなプログラミング モデルは、同程度に強力でも低レベルなタスク ベースの API のセットの上位に構築され、System.Threading.Tasks 名前空間の Task 型および Task<TResult> 型を中心に機能します。実際に、並列ループとクエリのエンジンはタスク ジェネレーターで、基になるタスク インフラストラクチャを使用して、表現される並列化を、基盤システムで利用可能なリソースにマップします。その中核にある Task は作業単位を表します。タスクは、一般に、非同期処理の単位であり、生成されてから、後でさまざまな方法で結合できる作業項目です。Task は、Wait、WaitAll、および WaitAny の各メソッドを提供し、対象のタスクが完了するまで、またはこれらのメソッドのオーバーロードに指定された追加の制約 (たとえば、タイムアウトやキャンセル トークン) が満たされるまで、処理の継続を同期方式でブロックできます。Task は、IsCompleted プロパティを通じて作業完了のポーリングをサポートし、より一般的には、Status プロパティを通じてライフサイクル処理での変化のポーリングをサポートします。最も重要な点は、ContinueWith、ContinueWhenAll、および ContinueWhenAny の各メソッドを提供することです。これらのメソッドにより、前の特定のタスク セットの完了時のみにスケジュールが設定されるタスクの作成が可能になります。この連続性のサポートにより、無数のシナリオを簡単に実装できるため、計算処理どうしの依存関係を可能にし、システムはそれらの依存関係に基づいて作業のスケジュールを設定できるようになります。

Task t1 = Task.Factory.StartNew(() => BuildProject(1));
Task t2 = Task.Factory.StartNew(() => BuildProject(2));
Task t3 = Task.Factory.StartNew(() => BuildProject(3));
Task t4 = Task.Factory.ContinueWhenAll(
  new [] { t1, t2 }, _ => BuildProject(4));
Task t5 = Task.Factory.ContinueWhenAll(
  new [] { t2, t3 }, _ => BuildProject(5));
Task t6 = Task.Factory.ContinueWhenAll(
  new [] { t4, t5 }, _ => BuildProject(6));
t6.ContinueWith(_ => Console.WriteLine("Solution build completed."));

Task から派生する Task<TResult> クラスにより、完了した操作から結果を渡すことができるため、.NET Framework の中核となる "将来の" 実装を提供します。

int SumTree<T>(Node<T> root, Func<T,int> computeFunc) {
  if (root == null) return 0;
  Task<int> left  = Task.Factory.StartNew(() => SumTree(root.Left));
  Task<int> right = Task.Factory.StartNew(() => SumTree(root.Right));
  return computeFunc(root.Data) + left.Result + right.Result;
}

これらのすべてのモデル (ループ、クエリ、タスクなど) の下、.NET Framework はワーク スティーリング技法を使用して、特殊なワークロードをより効率的に処理し、既定で、山登り法ヒューリスティックを使用して、時間の経過と共に使用するスレッド数を変化させ、最適な処理レベルを見つけます。また、ヒューリスティックはこれらのコンポーネントの一部に組み込まれ、実行しようとしている並列処理が結果的にシーケンシャル処理の時間よりも長くなるとシステムが確信した場合は、シーケンシャル処理に自動的に戻します。ただし、前述の既定値と同様、これらのヒューリスティックもオーバーライドされる場合があります。

Task<TResult> では、計算中心の処理のみに使用する必要はありません。Task<TResult> は、任意の非同期処理を実現するためにも使用できます。.NET Framework の System.IO.Stream クラスについて考えてみます。このクラスには、ストリームからデータを抽出する Read メソッドがあります。

NetworkStream source = ...;
byte [] buffer = new byte[0x1000];
int numBytesRead = source.Read(buffer, 0, buffer.Length);

この Read 操作は同期方式でブロックするため、Read 呼び出しを行うスレッドは、I/O ベースの Read 操作が完了するまで他の作業に使用できません。スケーラビリティを向上するため、Stream クラスには、BeginRead と EndRead という 2 つのメソッドの形式で、Read メソッドに対応する非同期メソッドがあります。これらのメソッドは、その誕生以来、.NET Framework で使用できるパターンに従っています。このパターンを、APM (非同期プログラミング モデル) と呼びます。前述のコード サンプルの非同期バージョンを次に示します。

NetworkStream source = …;
byte [] buffer = new byte[0x1000];
source.BeginRead(buffer, 0, buffer.Length, delegate(IAsyncResult iar) {
  int numBytesRead = source.EndRead(iar);
}, null);

ただし、この方式ではコンポーザビリティに乏しくなります。TaskCompletionSource<TResult> 型は、そのような非同期の読み取り操作をタスクとして公開できるようにすることで、この問題を解決します。

public static Task<int> ReadAsync(
  this Stream source, byte [] buffer, int offset, int count) 
{
  var tcs = new TaskCompletionSource<int>();
  source.BeginRead(buffer, 0, buffer.Length, iar => {
    try { tcs.SetResult(source.EndRead(iar)); }
    catch(Exception exc) { tcs.SetException(exc); }
  }, null);
  return tcs.Task;
}

これにより、複数の非同期処理を計算中心の例の場合と同じように構成できます。次の例では、すべての Streams ソースから同時に読み取りを行い、すべての処理が完了した場合のみコンソールに書き込みます。

NetworkStream [] sources = ...;
byte [] buffers = ...;
Task.Factory.ContinueWhenAll(
  (from i in Enumerable.Range(0, sources.Length)
   select sources[i].ReadAsync(buffers[i], 0, buffers[i].Length))
  .ToArray(), 
  _ => Console.WriteLine("All reads completed"));

並列同時実行処理を開始するメカニズム以外に、.NET Framework 4 では、タスクとスレッド間の作業をさらに連携させるためのプリミティブも提供します。これには、スレッドセーフでスケーラブルな一連のコレクション型があり、開発者が共有コレクションへのアクセスを手動で同期する必要性を大幅に減らします。ConcurrentQueue<T> は、スレッドセーフでロックを必要としない先入れ先出し方式のコレクションを提供し、任意の数のプロデューサーやコンシューマーから同時に使用できるようにします。また、同時実行の列挙子にスナップショットのセマンティクスをサポートするため、コードは、他のスレッドがそのインスタンスにアクセスしているタイミングでキューの状態を調べることができます。ConcurrentStack<T> も同じように機能しますが、後入れ先出しのセマンティクスを提供します。ConcurrentDictionary<T> は、ロックを必要としない技法、および粒度が細かいロック技法を使用して、スレッドセーフなディクショナリを提供し、任意の数のリーダー、ライター、および列挙子もサポートします。また、GetOrAdd や AddOrUpdate など、複数手順からなる操作の複数のアトミック実装もサポートします。もう 1 つの ConcurrentBag<T> 型は、ワーク スティーリングのキューを使用する、順序なしのコレクションを提供します。

.NET Framework のコレクション型はとどまることろを知りません。Lazy<T> は、スレッドの安全性を確保するための構成可能なアプローチを使用して、変数の遅延初期化を提供します。ThreadLocal<T> は、スレッド単位、インスタンス単位のデータを提供し、これらのデータに最初にアクセスするときまで初期化を遅らせます。Barrier 型では、段階的な操作が可能になるため、アルゴリズムを通じて複数のタスクまたは複数のスレッドの処理を厳密に進めることができます。例を挙げればきりがありませんが、すべては 1 つの基本原則に起因します。つまり、開発者がアルゴリズムの並列化についての低レベルで基本的な側面に重点を置く必要がなくなり、.NET Framework がメカニズムと処理効率の向上に対処できるようにすることです。

未来の並列処理

ディケンズの物語の未来とは異なり、.NET Framework の並列処理と同時実行の未来は、興奮に満ち、期待の持てるもので、.NET Framework 4 に含まれる基盤上に構築されます。.NET Framework の今後のバージョンでは、既存のプログラミング モデルのパフォーマンスを向上することにとどまらず、既存の高レベル モデルのセットを拡張して、並列ワークロードのより多くのパターンに対処することに力を注いでいます。そのような拡張の 1 つが、データフローに基づいて並列システムを実装し、エージェント ベースのモデルを使用してアプリケーションを設計するための、新しいライブラリです。新しい System.Threading.Tasks.Dataflow ライブラリには、多数の "データフロー ブロック" が用意されており、データのバッファー、プロセッサ、および伝達子として機能します。データはこのようなブロックに書き込まれ、ソース ブロックのセマンティクスに基づいて処理されたのち、リンク ターゲットに自動転送されます。データフロー ライブラリは、内部でタスクを準備してデータを処理および伝達するブロックを使用して、タスク上にも構築されます。

パターンの観点からは、このライブラリは、プロデューサーとコンシューマーのチェーンを形成するデータフロー ネットワークの処理に特に適しています。アプリケーションを経由して行き来するデータのストリームを使用して、データを圧縮してから暗号化し、ファイルに書き込む必要性について考えてみます。これは、次のようにデータフロー ブロックの小さなネットワークを構成することで実現できます。

static byte [] Compress(byte [] data) { ... }
static byte [] Encrypt(byte [] data) { ... }
...
var compressor = new TransformBlock<byte[],byte[]>(Compress);
var encryptor = new TransformBlock<byte[],byte[]>(Encrypt);
var saver = new ActionBlock<byte[]>(AppendToFile);
compressor.LinkTo(encryptor);
encryptor.LinkTo(saver);
...
// As data arrives
compressor.Post(byteArray);

ただし、データフロー ライブラリ以外に .NET Framework の並列処理と同時実行にもたらされる最も重要な機能は、タスクを作成して非同期に待機させるための、C# と Visual Basic のすばらしい言語サポートです。これらの言語は、ステート マシン ベースのリライト機能で補強され、言語のシーケンシャル制御フローのすべての構成要素を使用できるようにすると同時に、タスクが完了するまで非同期に待機できるようにします (Visual Studio 2010 の F# では、関連する非同期形式が、その非同期ワークフロー機能の一部としてサポートされます。このワークフロー機能は、タスクとも統合されます)。次のメソッドは、1 つの Stream から別の Stream にデータを同期方式でコピーし、コピーしたバイト数を返します。

static long CopyStreamToStream(Stream src, Stream dst) {
  long numCopied = 0;
  byte [] buffer = new byte[0x1000];
  int numRead;
  while((numRead = src.Read(buffer,0,buffer.Length)) > 0) {
    dst.Write(buffer, 0, numRead);
    numCopied += numRead;
  }
  return numCopied;
}

前述の Stream の BeginRead メソッドや EndRead メソッドなどのサポートを使用して、この関数 (条件とループを含む) を実装すると、エラーが発生しやすくデバッグが非常に難しいコールバックやロジックとなり、悪夢のような結果になります。そこで、先ほど説明した ReadAsync メソッドを使用するアプローチを検討します。このメソッドは Task<int> を返します。なお、対応する WriteAsync メソッドは Task を返します。新しい C# の機能を使用して、前述のメソッドを次のように書き直すことができます。

static async Task<long> CopyStreamToStreamAsync(Stream src, Stream dst) {
  long numCopied = 0;
  byte [] buffer = new byte[0x1000];
  int numRead;
  while((numRead = await src.ReadAsync(buffer,0,buffer.Length)) > 0) {
    await dst.WriteAsync(buffer, 0, numRead);
    numCopied += numRead;
  }
  return numCopied;
}

ちょっとした変更をいくつか加えて、同期メソッドを非同期メソッドに変更しています。この関数は "async" と注釈が付けられ、コンパイラに対して、関数の書き直しを行う必要があることを通知します。それによって、Task または Task<TResult> で "待機" 操作が求められたとき、関数の残りの実行が、事実上継続として、そのタスクにフックされます。タスクの完了時点まで、このメソッド呼び出しがスレッドを占領することはありません。Read メソッド呼び出しは ReadAsync 呼び出しに変換されているので、待機のコンテキスト キーワードを使用して生成ポイントを指定し、残りは続行に変更します。これは WriteAsync と同じです。この非同期メソッドの最終完了時に返される long 型の値は Task<long> に変換されます。これは、前述のようなメカニズムと TaskCompletionSource<TResult> を使用して、CopyStreamToStreamAsync の最初の呼び出し元に返されます。ここで、CopyStreamToStreamAsync からの戻り値を Task として使用して、待機するか、続行をフックするか、他のタスクを経由して構成または待機することも可能です。ContinueWhenAll や WaitAll のような関数を使用して、複数の非同期処理を開始し、後で互いに結合して、高レベルの同時実行を実現し、アプリケーションの全体的なスループットを向上できます。

非同期に関するこの言語サポートは、I/O を頻繁に実行する操作だけでなく、CPU を集中利用する操作のパフォーマンスも向上し、特に、開発者が (UI スレッドと関連付けられていないアプリケーションを無応答状態にしない) 応答性の高いクライアント アプリケーションをビルドできるようにする一方で、引き続き大量の並列処理にもメリットが得られます。長い間、開発者にとって、UI スレッドから離れて処理を実行してから、UI スレッドに戻って UI 要素を更新し、ユーザーと対話するのは煩わしい作業でした。.NET Framework の重要なコンポーネントとの非同期対話の言語サポートは、待機操作の完了時に、既定で、操作を最初のコンテキストに戻します (たとえば、待機処理を UI スレッドから発行する場合、フックされた続行機能は UI スレッドに戻って引き続き実行されます)。つまり、タスクを開始して、計算を集中的に行う作業をバックグラウンドで実行でき、開発者は単純に待機して、結果を取得し、UI 要素に格納できます。

async void button1_Click(object sender, EventArgs e) {
  string filePath = txtFilePath.Text;
  txtOutput.Text = await Task.Factory.StartNew(() => {
    return ProcessFile(filePath);
  });
}

そのバックグラウンド タスク自体は、PLINQ クエリを使用するなどして、バックグラウンドの計算を並列化するために、複数のタスクに分離します。

async void button1_Click(object sender, EventArgs e) {
  string filePath = txtFilePath.Text;
  txtOutput.Text = await Task.Factory.StartNew(() => {
    return File.ReadLines(filePath).AsParallel()
      .SelectMany(line => ParseWords(line))
      .Distinct()
      .Count()
      .ToString();
  });
}

言語サポートは、データフロー ライブラリと組み合わせて使用して、非同期のプロデューサーとコンシューマーのシナリオを簡単かつ自然に表現できます。一連の調整済みプロデューサーを実装する場合を考えます。各プロデューサーはいくつかデータを生成して、多数のコンシューマーに送信します。これは BlockingCollection<T> のような型を使用して同期方式で実行できます (図 2 参照)。この型は、.NET Framework 4 の一環として導入されています。

図 2 BlockingCollection の使用

static BlockingCollection<Datum> s_data = 
  new BlockingCollection<Datum>(boundedCapacity:100);
...
static void Producer() {
  for(int i=0; i<N; i++) {
    Datum d = GenerateData();
    s_data.Add(d);
  }
  s_data.CompleteAdding();
}

static void Consumer() {
  foreach(Datum d in s_data.GetConsumingEnumerable()) {
    Process(d);
  }
}
...
var workers = new Task[3];
workers[0] = Task.Factory.StartNew(Producer);
workers[1] = Task.Factory.StartNew(Consumer);
workers[2] = Task.Factory.StartNew(Consumer);
Task.WaitAll(workers);

これは、プロデューサーとコンシューマーの両方がスレッドをブロックするというアプリケーションの目標を満たしている限り、優れたパターンです。これを受け入れられない場合は、BufferBlock<T> というもう 1 つのデータフロー ブロックと、ブロックから非同期に送受信する機能を使用して、非同期コードを記述できます (図 3 参照)。

図 3 BufferBlock の使用

static BufferBlock<Datum> s_data = new BufferBlock<Datum>(
  new DataflowBlockOptions { BoundedCapacity=100 });
...
static async Task ProducerAsync() {
  for(int i=0; i<N; i++) {
    Datum d = GenerateData();
    await s_data.SendAsync(d);
  }
  s_data.Complete();
}

static async Task ConsumerAsync() {
  Datum d;
  while(await s_data.OutputAvailableAsync()) {
    while(s_data.TryReceive(out d)) {
      Process(d);
    }
  }
}
...
var workers = new Task[3];
workers[0] = ProducerAsync();
workers[1] = ConsumerAsync();
workers[2] = ConsumerAsync();
await Task.WhenAll(workers);

ここで、SendAsync メソッドと OutputAvailableAsync メソッドの両方からタスクを返し、コンパイラが続行をフックして、プロセス全体を非同期に実行できます。

並列処理の新しいあり方

並列プログラミングは、長い間、上級開発者 (コードをマルチコアに拡張する方法にかなり詳しい有能な開発者) の領域でした。これらの開発者は、何年もトレーニングを請けたり、仕事上の経験を積んだ結果、上級開発者になりました。こういった上級開発者は非常に高く評価されますが、その数は多くはありません。現在のマルチコアとメニーコアのすばらしい新世界では、並列処理を上級開発者に委ねるだけでは不十分です。アプリケーションやソフトウェアを市販のソフトウェア パッケージとしてデザインしていても、社内で使用することだけを目的としていても、またはより重要な作業を完了できるようにするためのツールすぎないとしても、並列処理は、すべての開発者が少なくとも検討しなければならないものであり、マネージ言語を使用する何百という多くの開発者が、(コンポーネント自体で並列処理がカプセル化されるコンポーネントを通じてであっても) 活用できなければならないものです。.NET Framework 4 で公開される並列プログラミング モデルや、.NET Framework の今後のバージョンで導入される並列プログラミング モデルは、このすばらしい未来を可能にするために必要なものです。

Stephen Toub は、マイクロソフトの並列コンピューティング プラットフォーム チームの主席アーキテクトです。

この記事のレビューに協力してくれた技術スタッフの Joe HoagDanny Shih に心より感謝いたします。