ピアの楽しみ

WCF を使用したピアツーピアの作業処理アプリケーション

Matt Neely

コードは MSDN コード ギャラリーからダウンロードできます。
オンラインでのコードの参照

この記事では、次の内容について説明します。

  • 基本的なピアツーピア通信
  • Worker ピアの作成
  • 負荷分散と作業の共有
  • 作業項目のバックアップと復元
この記事では、次のテクノロジを使用しています。
WCF

目次

基本的なピア通信
Worker ピア
Worker が作業できるようにする
User ピア
Spectator
負荷分散と作業の共有
WorkItem バックアップ
ピアの楽しみ

私が所属しているマイクロソフトのグループでは、1 週間だけ通常の業務を離れて特定のトピックを研究することができます。最近、私が選択したトピックは、ピアツーピア (P2P) ネットワークです。この研究をきっかけに、この新しいネットワーク パラダイムへの興味は永続的なものになりました。

P2P に興味を抱いた理由の 1 つは、我々のチームが所有する内部的なドキュメント ビルド システムでした。チームの複数の作者により、解析、変換、パッチ、およびコンパイルを必要とする多数のヘルプ ドキュメントが記述されます。これらのドキュメントは、毎日さまざまな出力形式および多彩な言語にコンパイルされます。このタスクを処理するために、我々のチームではビルド ソフトウェアを実行するサーバー ファームを持っています。

現在使用しているメカニズムは、標準的なクライアント/サーバー アーキテクチャです。ビルド ジョブは中央サーバーのキューに配置され、ビルド サーバーの処理能力の状態に応じて少しずつ処理されます。中央サーバーでのジョブのバックログや、ビルド サーバーでの不適切な動作が発生することがあります。チーム メンバの多くは、ビルド システムのステータスに注意を払っていますが、システムは不透明で監視が困難です。

この記事は、このビルド システムの P2P バージョンを予測した結果です。Windows Communication Foundation (WCF) PeerChannel に基づいた、一般的な作業手順の新しい設計およびピア対応アプリケーションについて説明します。プロセスの中で必要となる設計上の意思決定について解説しながら、このシステムを構築する手順をご紹介します。

基本的なピア通信

このプロジェクトでは、主要な作業処理のピアを定義することから開始します。汎用名として Worker という名前を付けます。この計画では、物理的な 1 台のコンピュータごとに 1 つの Worker を保持します (ただし、これには技術的な制限はありません)。

各 Worker は、作業を処理するために他の Worker とやり取りし、連携する必要があります。そのためには、他の Worker に関する知識が必要です。各 Worker がそのような情報を入手する方法について考えましょう。

1 つのアプローチは、各 Worker がポーリングを行って他の Worker の状態を確認することです。これには 2 つのフェーズが必要なので、そのメカニズムは複雑です。Worker による検出と、Worker による照会です。さらに、各 Worker が他のすべての Worker に対して ping を実行する場合、多数の不要なメッセージがネットワークを流れることになります。WCF PeerChannel が本質的にマルチキャストであるという事実を考慮すると、Worker に対して ping を実行するとすべての Worker にメッセージが送信されます。n 個の Worker ピアに対して、n を 3 乗した数のメッセージが送信されるのです。では、別のアプローチを検討してみましょう。ブロードキャスト モデルです。

ブロードキャスト モデルでは、Worker は定期的にステータスを送信します。これをハートビート メッセージと呼びましょう。他の Worker は、この情報を何に使うかを決定できます。無視することもできます。私は、このアプローチが気に入りました。Worker メッシュ トポロジが変化したときに、ハートビート メッセージの有無に基づいてオンラインおよびオフラインになるノードを推測できるからです。Worker ピア検出は、アクティブではなくパッシブです。このモデルでは、ネットワーク トラフィックを調整してシステムを保持するのに必要な量まで最小化することができます。このモデルでは、n 個の Worker ピアで送信されるメッセージ数は n の 2 乗です。この数は、最初のアプローチに勝っています。

WCF でサービスを作成するには、通常、サービス コントラクトを定義することから開始します。これは、サービスがクライアントに公開する機能を定義するインターフェイスです。P2P シナリオでは、各ピアにはサーバーとクライアントの両方の役割があり、ここで作成されるサービス コントラクトは、ピアが相互にやり取りするときの方法を定義するものです。Worker ステータスをブロードキャストする場合、サービス コントラクトを次のように定義できます。

[ServiceContract]
public interface IWorkerPeer {
  [OperationContract(IsOneWay=true)]
  void Heartbeat(WorkerStatus status);
}

私の IWorkerPeer サービス コントラクトには、Heartbeat という単一方向の操作が 1 つ含まれます。Worker がこのメソッドを呼び出すと、私のメッシュに参加している他のすべての Worker ピアに対して、設定済みの WorkerStatus オブジェクト インスタンスが送信されます。

次に、WorkerStatus クラスの単純な定義を示します。

[DataContract]
public class WorkerStatus {
  [DataMember]
  public Guid WorkerId;

  [DataMember]
  public WorkerState State;
}

これには、Worker の一意の識別子、および単純な状態の列挙値が含まれます。より実用的なシステムでは、追加の記述子、パフォーマンス カウンタ、Worker 統計 (たとえば、失敗した作業項目の割合) などを含むデータの詳細なセットになるでしょう。この定義どおりで、少なくとも、各 Worker はメッシュ内の他のすべての Worker のステータスを認識できます。

私が定義した状態は、Unknown、Ready、Busy、および Offline です。Unknown は、Worker の状態が不明であることを示します (おそらく、特定の期間にわたってその Worker からのハートビートが送られてこないため)。Ready 状態は、Worker が適切に動作しており、何かの作業を行う準備が整っていることを示します。Busy 状態は、Worker は適切に動作しているが、何か作業を行っていてビジーであることを示します。Offline 状態は、Worker を作業に利用できないことを示します。これは、潜在的に、Worker の最後のハートビートから、または、おそらく管理者が保守やデバッグのためにオフラインにした Worker から送信される状態です。

Worker ピア

機能的な Worker ピア アプリケーションを作成するには、固有のサービスを公開して使用する必要があります。私は、単純なコンソール アプリケーションを作成し、Worker という名前のクラスを提供しました (図 1 を参照)。この Worker クラスで、公開したサービスの設定と破棄を行うために Start メソッドと Stop メソッドを追加し (メッセージ受信用)、クライアント チャネルも追加しました (メッセージ送信用)。また、System.Threading.Timer インスタンスを使用して、SendHeartbeat メソッドを定期的に呼び出すようにしました。SendHeartbeat は、WorkerStatus オブジェクトをインスタンス化し、適切なデータを設定してから、すべてのピアに送信します。IWorker.Heartbeat メソッドが呼び出されると、サービスとして、単純にメッセージ コンテンツを標準の出力に出力します。

図 1 Woker クラス

namespace WorkerPeerApplication {
  [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]

  public class Worker : IWorker {
    private ServiceHost _host;
    private ChannelFactory<IWorker> _factory;
    private IWorker _workerClientChannel;

    private Guid _id;
    private volatile WorkerState _state;

    private Timer _heartbeat;

    public Worker() {
      _id = Guid.NewGuid();
      _state = WorkerState.Offline;
    }

    public void Start()  {
      _host = new ServiceHost(this);
      _host.Open();

      _factory = new ChannelFactory<IWorker>("IWorkerClient");
      _workerClientChannel = _factory.CreateChannel();

      _state = WorkerState.Ready;
      _heartbeat = new Timer(
        this.SendHeartbeat,
        null,
        new TimeSpan(0),
        //Time should really be read in from config.
        new TimeSpan(0, 0, 1)); 
    }

    public void Stop() {
      _state = WorkerState.Offline;
      _heartbeat.Dispose();
      this.SendHeartbeat(null);
      _factory.Close();
      _host.Close();
    }

    private void SendHeartbeat(object context) {
      WorkerStatus ws = new WorkerStatus();
      ws.WorkerId = _id;
      ws.State = _state;

      _workerClientChannel.Heartbeat(ws);
    }

    void IWorker.Heartbeat(WorkerStatus status) {
      Console.WriteLine(
        "{0}: {1}", 
        status.WorkerId, 
        status.State);
    }
  }
}

完成するためには、Main メソッドおよび適切な WCF 構成が必要です。これをアプリケーション構成ファイルに追加します。Main メソッドは、コード ダウンロードで参照できます。また、使用した構成は図 2 のとおりです。

図 2 Worker の構成

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>

    <services>
      <service name="WorkerPeerApplication.Worker">
        <host>
          <baseAddresses>
            <add baseAddress = "net.p2p://MyPeerApplication" />
          </baseAddresses>
        </host>

        <endpoint           
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.IWorker"/>

      </service>
    </services>

    <bindings>
      <netPeerTcpBinding>
        <binding name="unsecureBinding">
          <security mode="None"/>
          <resolver mode="Pnrp"/>
        </binding>
      </netPeerTcpBinding>
    </bindings>

    <client>
      <endpoint
        name="IWorkerClient"
        address="net.p2p://MyPeerApplication"
        binding="netPeerTcpBinding"
        bindingConfiguration="unsecureBinding"
        contract="PeerWcfComm.IWorker" />
    </client>

  </system.serviceModel>
</configuration>

おわかりのとおり、ピアが公開するサービスと、ピアのクライアント側の構成の両方を定義しました。サービス構成は、サービスのタイプを指定し、その基本のアドレスおよびエンドポイントを指定します。私は、安全でないバインドと、ピアの名前を解決するためのピア名解決プロトコル (PNRP) を使用しています。クライアント構成は、ほとんどがサービスと同じです。唯一明らかな違いは、ChannelFactory コンストラクタが目的の構成に簡単にアクセスできるように、クライアント エンドポイントに名前を付けたことです。

Worker ピアのシェルができたので、これを試用してみましょう。実験目的で、このプログラムの 2 つのインスタンスを同時に実行します。最初に、各ピアから固有のハートビート情報が出力されることに気が付くでしょう。その数秒後に、ピアは相互に検索を開始し、各 Worker が他の Worker からステータスを受信します。

自分自身のメッセージが送り返されるというのは、率直なところ、面倒で紛らわしい作業です。さいわいにも、「PeerChannel team blog (PeerChannel チーム ブログ)」で必要なコードが提供されています。RemoteOnlyPropagationFilter というクラスが、非常に便利なクラスであることがおわかりでしょう。これを拝借すると、次の 2 行のコードを Worker.Start メソッドに追加することができ、その結果、送信したメッセージが自分に送り返されないように指定できます。

PeerNode pn = ((IClientChannel)_clientChannel).GetProperty<PeerNode>();
pn.MessagePropagationFilter = new RemoteOnlyMessagePropagationFilter();

これらの 2 行のコードは、ハートビート タイマ開始の前に追加しました。

Worker が作業できるようにする

これで、Worker ピア間でのやり取りを行うだけのアプリケーションは用意できました。ただし、このままでは Worker は実際の作業を行いません。次のメソッドを追加して IWorker インターフェイスを増強することにより、これを改善しましょう。

[OperationContract(IsOneWay = true)]
void DoWork(Guid workerId, WorkItem work);

このメソッドにより、外部エンティティは、作業の項目を実行するように Worker ピアに指示できます。WCF メッセージングはマルチキャストなので、作業を実行するための Worker の ID を指定するパラメータを追加しました。WorkItem クラスについては後で説明します。ここでは、図 3 に示すように、インターフェイスを変更するために Worker クラスの更新が必要です。ある作業を実行するように Worker が指示された場合、この要求は内部キューに追加され、最終的には受信された順序どおりに実行されます。実際に作業を実行するために、MonitorWork という名前のメソッドを Worker クラスに追加しました。Start メソッドは、単純に ThreadPool.QueueUserWorkItem を呼び出し、MonitorWork method を差します。

図 3 改訂された Worker

public class Worker : IWorker {
  // Omitted for clarity ...
  private Queue<WorkItem> _workQueue;

  public Worker() {
    // Omitted for clarity ...
    _workQueue = new Queue<WorkItem>();
  }

  public void Start() {
    // Omitted for clarity ...
    ThreadPool.QueueUserWorkItem(MonitorWork);
  }

  private void MonitorWork(object context) {
    if (_workQueue.Count > 0) {
      lock (_workQueue)
        _currentWorkItem = _workQueue.Dequeue();

      DoWorkInternal(_currentWorkItem);
      _currentWorkItem = null;
    }

    if (_state != WorkerState.Offline)
      ThreadPool.QueueUserWorkItem(MonitorWork);
  }

  private void DoWorkInternal(WorkItem wrk) { ...}

  void IWorker.DoWork(Guid workerId, WorkItem work) {
    if (workerId == _id)
      lock (_workQueue)
        _workQueue.Enqueue(work);
  }
}

大まかに言うと、MonitorWork は何かを実行し、その後で別の ThreadPool.QueueUserWorkItem 呼び出しを経由して自身の再スケジュールを行います。これは、Worker の状態が Offline になる (Worker オブジェクトで Stop が呼び出されたとき) まで続行されます。実行する内容は、作業のキューに何か入っていないかをチェックすることです。キューに作業項目が含まれている場合、MonitorWork は次の WorkItem をキューから削除して処理を開始します。私が Worker に指定する動作は作業を順次処理することです。ただし、項目を並列に処理するように変更するのは簡単です。

図 3 のコードからおわかりのとおり、私は作業が何であるかをまだ記述していません。定義されていない DoWorkInternal メソッドに渡しただけです。では、最初に WorkItem クラスの定義を見てみましょう。この記事でビルドするシステムはタスク依存ではないので、作業項目を次のように単純に定義しようと思います。

[DataContract]
public class WorkItem {
  [DataMember]
  public Guid WorkItemId { get; set; }

  [DataMember]
  public int PretendToWorkSeconds { get; set; }
}

ここで、私のカスタム クラスが WCF で適切にシリアル化されるように、DataContract 属性または MessageContract 属性を使用する必要があります。WorkItem クラスには、識別子フィールドの WorkItemId と、実際の作業を行うように見せかける秒数を指定するフィールド (PretendToWorkSeconds という名前) があります。

WorkItem が定義済みとなったので、これを処理できます。次に示すのは、DoWorkInternal メソッドの実装です。

private void DoWorkInternal(WorkItem wrk) {
  this._state = WorkerState.Busy;
  Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds));
  this._state = WorkerState.Ready;
}

これは、通常は意味のある作業 (たとえば、ブラジルのポルトガル語で SQL Server 2008 セットアップ ドキュメントをビルドする) を行う場所です。しかし、この記事では、指定された時間を休憩に充てましょう。

最後に、Worker のハートビートの送信前に設定される 2 つの新しいデータ メンバで WorkStatus クラスを更新します。これらは、現在実行中の作業項目の識別子と、Worker ピアで保留中の作業項目の数です。

[DataContract]
public class WorkerStatus {
  // Omitted for clarity...

  [DataMember]
  public Guid CurrentWorkItem;

  [DataMember]
  public int PendingWorkItemCount;
}

Worker ピアにデータが設定されたので、現実的にデータ損失のシナリオを意識する必要が出てきました。P2P のような分散型ネットワーク パラダイムのトレードオフの 1 つは、データの中央リポジトリがないことです。各ピアは、システムのデータをまとめて保存する責任を負っています。Worker ピアがダウンしたらどうなるでしょうか。私のシステムで言えば、キューに配置された WorkItem はどうなるでしょうか。

では、ピアがシャットダウンするシナリオを 2 つのカテゴリに分類しましょう。想定された場合と、予期しない場合です。想定されたシャットダウンの場合、キューに配置された WorkItem に対応する必要があります。これらの項目を処理するためのパラダイムは、アプリケーションおよびデータの性質によって異なります。データが重要ではない場合は、キューに配置された WorkItem をすべて廃棄するだけです。アプリケーションが一時停止するように設計されている (Windows サービスのように自動的に開始される) 場合、おそらく、項目をディスクにシリアル化して起動時に読み取るのが適切なソリューションでしょう。ただし、シャットダウン前に残りの WorkItem を別の Worker に転送するという方法もあります。予期しないシャットダウンについては、この記事のデータ バックアップについての記述の中で説明します。

User ピア

これで、定義済み作業項目を受け入れて処理することのできるピア対応アプリケーションができました。しかし、これらの作業項目を作成して Worker ピアに送信するのはだれでしょうか。P2P システムの強力な機能の 1 つは、だれでも同じ通信インフラストラクチャに接続できることです。参加に必要なことは (安全でないメッシュの場合)、メッシュ アドレスとサービス コントラクトに関する知識だけです。User ピアという新しいピア型を追加することにより、このことを説明します。

User ピア用のクラスは、前述の Worker ピアに似ています。User ピアも IWorker インターフェイスを実装しますが、DoWork メソッドの実装を提供する必要はありません (ServiceContract インターフェイスは、ID の宣言ではなく通信コントラクトと考えてください。User は Worker ではありませんが、IWorker ピア メッシュに参加できるので、そのインターフェイスを実装します)。

User ピアでは、Worker を追跡し、新しい作業項目を作成して Worker に返します。図 4 には、User アプリケーションのスクリーンショットが示されています。このアプリケーションは、Worker ステータス メッセージから受信した情報を一覧表示します。ただし、ここで注目したい機能は、WorkItem の作成および Worker への割り当てです。

fig01.gif

図 4 User ピア アプリケーション

WorkItem をどのように Worker に割り当てるかについて、設計上の意思決定を行う必要があります。1 つの方法は、PeerChannel のマルチキャストの性質に依存して、新しい WorkItem が利用可能であることを伝えるメッセージをすべての Worker にブロードキャストする手法です。準備が整っているという応答を返した最初の Worker は、新しい WorkItem を取得する権利を得ます。

私にとって、この手法は魅力的ではありません。いくつか理由があります。最初に、通信プロトコルの負荷が非常に大きくなる点です。User は、"だれが Work Item を受け取るのか" をたずねるメッセージを送信します。従順な各 Worker から、"受け取ります" というメッセージが返されるでしょう。User は、(Work Item を受け取る) 勝者の Worker に対して "作業してください" というメッセージを送る必要があります。また、(複数の対話型ピアが持つ非同期という性質のために) すべての潜在的な競合状態に注意を払う必要があります。

2 番目に、Worker にとって何が "準備完了で処理可能" なのかを定義する必要があります。すべての Worker がビジーである場合、すべての Worker から応答がないのでしょうか。1 つも応答が戻らなかった場合、User はどうするのでしょうか。

この作業通知方法の代わりに、私は、Worker ハートビートから既に提供されている情報の再利用という方法を選択しようと思います。新しく作成された WorkItem をどの Worker が受け取るかについて、この情報から十分な知識を得ることができます。したがって、勝者の Worker を選択するためのアルゴリズムは Worker のコード内に常駐せず、このアルゴリズムは Worker とは別に変更することもできます。わかりやすくするために、WorkerStatus オブジェクトの WorkerState、CurrentWorkItem、および PendingWorkItemCount の各メンバに基づいて WorkItem を送信する Worker を選択します。[Create Work] ボタンの分離コードを次に示します。

private void btnNewWork_Click(object sender, EventArgs e) {
  WorkItem wrkItem = new WorkItem();
  wrkItem.WorkItemId = Guid.NewGuid();
  wrkItem.PretendToWorkSeconds = _rnd.Next(15);

  string nextWorker = GetNextWorker();
  _clientChannel.DoWork(nextWorker, wrkItem);

  lock (_workers)
    _workers[nextWorker].PendingWorkItemCount++;
}

0 ~ 15 の範囲で任意に選択された秒数を使用して、一意に識別された新しい WorkItem が生成されます (_rnd は Random 型のクラスレベル フィールド)。作業を送信する次の Worker を決めて、作業を送信します。さらに、Worker の PendingWorkItemCount のキャッシュ バージョンを手動でインクリメントします。これにより、2 つ以上の Worker が同じステータスを持つという状況を回避できます。その場合、Worker がステータス ハートビートを送信するよりも先に作業が生成される可能性があります。したがって、キャッシュされた Worker のステータスを調整し、多数の作業項目が同じ Worker に偏って送信されないようにします。

作業を受け取る次の Worker を選択するための意思決定は、次に示す GetNextWorker メソッドの目的です。

private WorkerStatus GetNextWorker() {
  List<WorkerStatus> workers = new List<WorkerStatus>();
  lock (_workers)
    workers.AddRange(_workers.Values);

  return workers
    .Where<WorkerStatus>(
      w => w.State != WorkerState.Offline
      && w.State != WorkerState.Unknown)
    .OrderBy<WorkerStatus, int>(w => w.PendingWorkItemCount)
    .FirstOrDefault<WorkerStatus>();
}

Microsoft .NET Framework 3.5 ライブラリの新しい拡張メソッドのおかげで、オフライン状態の Worker または未知の Worker を排除して、保留中の作業項目の数によって簡単に順序付けることができます。単一の (しかし冗長な) ステートメントで両方を行うことができます。最初の項目を列挙で返すこともできます。FirstOrDefault メソッドを使用して、利用可能な Worker がない場合でも呼び出しを正常に行うことができます。

ここで、別の実験に移りましょう。User ピア アプリケーションを起動し、続いて 2 つの別個の Worker アプリケーションを起動します。数秒後に、2 つの Worker が User のアプリケーション一覧に表示されます。ここで、[Create Work] ボタンを数回クリックします。すべてが予測どおりに進行している場合は、User ピアが両方の Worker に作業項目を渡します。適切なラウンドロビン方式で行われます。Worker は、完了するまで作業項目の処理を続行します。

Spectator

多くのアプリケーションの要件として、ログとトレースが必須とされています。P2P ベース システムの柔軟性を際立たせるために、Logger という新しいピア型を作成しましょう。Logger ピアは、システムに適したイベントを受け取ります。現在追跡できるイベントは、IWorker インターフェイスにより公開されるイベント (Worker ステータス情報および特定の Worker に対する WorkItem の割り当て) だけです。この詳細のレベルでは不十分です。ログおよび追跡が可能な他のイベントが多数あります。

IWorker インターフェイスの新しい機能を追跡できますが、追跡のために送信されるメッセージは、Worker アプリケーションと User アプリケーションのどちらにとっても重要ではありません。したがって、ISpectator という新しいサービス コントラクトを作成します (Logger または Reporter の代わりに一般的な "spectator (観客)" という語を使用するのは、2 つのことを強調したいからです。最初に、このピア型は通信を受け取るだけで、何も生成しません。2 番目に、このサービスが受け取った情報でピアが何をするかを私は指定しません。UI. 内に情報を表示するだけかもしれません)。サービス コントラクト定義を次に示します。

[ServiceContract]
public interface ISpectator {
  [OperationContract(IsOneWay=true)]
  void WorkItemStateChanged(WorkItemStatus status);
}

デモとして、WorkItemStateChanged という単一のメソッドを宣言します。すべての機能を持つシステムには、spectator (観客) に通知すべき多数のイベントがあります。WorkItemStateChanged メソッドは、次に定義されている WorkItemStatus クラスを使用します。

[DataContract]
public class WorkItemStatus {
  [DataMember]
  public Guid WorkerId;

  [DataMember]
  public Guid WorkItemId;

  [DataMember]
  public WorkItemState NewState;
}

このクラスにより、送信元は、Worker 送信メッセージ、このメッセージを送信する対象 WorkItem、および WorkItem の新しい状態を指定できます。有効な状態は、Queued、Executing、Finished、および Errored です。お気付きかと思いますが、対応する定義済みの WorkerStateChanged イベントがありません。その理由は、Worker ステータスがハートビート メッセージ経由で送られ、spectator (観客) でこれらをリッスンすることを前提としているからです。WCF により、これは非常に簡単に行えます。私のサービス クラスは、単純に、IWorker インターフェイスと ISpectator インターフェイスを両方とも実装します。

Logger サービス クラスを図 5 に示しています。このクラスと Worker との間に構造的に大きな違いはありません。IWorker.Heartbeat メソッドでは、Logger は Worker ステータス データの典型的なキャッシュを持っています。さらに、これが Worker に関して確認される最初のステータスである場合、または Worker に対してその状態が変更された場合は、メッセージが標準の出力に書き込まれます。同様に、ISpectator.WorkItemStateChanged メソッドは、メッセージを標準の出力に書き込みます。運用システムでは、カスタム レポート用のログ データベースへの書き込みなど、さらに興味深い操作が行われるでしょう。

図 5 Logger

[ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)]
public class Logger : IWorker, ISpectator {
  private ServiceHost _host;
  private Dictionary<string, WorkerStatus> _workers;

  public Logger() {
    _workers = new Dictionary<string, WorkerStatus>();
  }

  public void Start() {
    _host = new ServiceHost(this);
    _host.Open();
  }

  public void Stop() {
    _host.Close();
  }

  void IWorker.Heartbeat(WorkerStatus status) {
    bool changed = false;

    lock (_workers) { 
      if (_workers.ContainsKey(status.WorkerId)) {
        WorkerStatus s = _workers[status.WorkerId];

        if (s.State != status.State)
          changed = true;
      }
      else
        changed = true;

      _workers[status.WorkerId] = status;
    }

    if (changed)
      LogWorkerStatusChange(status);
  }

  void IWorker.DoWork(string workerId, WorkItem work) {
    //Ignored. This can be assumed by the
    //WorkItemState.Queued status event.
  }

  void ISpectator.WorkItemStateChanged(WorkItemStatus status) {
    Console.WriteLine(
      "Worker: {0}| WorkItem: {1}| State: {2}", 
      status.WorkerId, 
      status.WorkItemId, 
      status.NewState);
  }

  private void LogWorkerStatusChange(WorkerStatus status) {
    Console.WriteLine(
      "Worker {0}| State: {1}", 
      status.WorkerId, 
      status.State);
  }
}

Logger の実装における最後の手順は、構成の設定です。図 6 に、この WCF サービスの構成を示します。明確な違いは、クライアント セクションがないこと、およびサービス エンドポイントが追加されていることです。spectator (観客) は、論理的に情報のコンシューマであるため、クライアントのように動作する必要がありません。エンドポイントをサービスに追加することにより、IWorker メッセージ シンクと ISpectator メッセージ シンクに同じサービス オブジェクト インスタンスを使用できます。

図 6 Logger の構成

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>

    <services>
      <service name="LoggerPeerApplication.Logger">
        <host>
          <baseAddresses>
            <add baseAddress = "net.p2p://MyPeerApplication" />
          </baseAddresses>
        </host>

        <endpoint
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.IWorker"/>

        <endpoint
          address =""
          binding="netPeerTcpBinding"
          bindingConfiguration="unsecureBinding"
          contract="PeerWcfComm.ISpectator"/>

      </service>
    </services>

    <bindings>
      <netPeerTcpBinding>
        <binding name="unsecureBinding">
          <security mode="None"/>
          <resolver mode="Pnrp"/>
        </binding>
      </netPeerTcpBinding>
    </bindings>

  </system.serviceModel>
</configuration>

ISpectator 機能を完成させるために、Worker ピアを変更して適切な呼び出しが行われるようにする必要があります。新しいクライアント エンドポイントを app.config に含めます。

<endpoint
  name="ISpectatorClient"
  address="net.p2p://MyPeerApplication"
  binding="netPeerTcpBinding"
  bindingConfiguration="unsecureBinding"
  contract="PeerWcfComm.ISpectator" />

このエンドポイントは、(ChannelFactory コンストラクタが適切な設定を検出できるように) 新しい名前と、新しい ISpectator サービス コントラクトの型を持っています。必要なコードの変更は、新しい ChannelFactory<ISpectator> および新しい ISpectator クライアント チャネルの作成だけです。次に、Worker クラスの DoWorkInternal メソッドを変更します (図 7 に示すように多少見た目がよくありませんが)。基本的に、作業項目の処理 (現在のスレッドをスリープさせる) の前と後で、ISpectator チャネルで WorkItemStateChanged メソッドを呼び出して適切な状態を提供します。

図 7 作業ステータスの追跡

this._state = WorkerState.Busy;
_spectatorClientChannel.WorkItemStateChanged(
  new WorkItemStatus() { 
    WorkerId = _id, 
    WorkItemId = wrk.WorkItemId, 
    NewState = WorkItemState.Executing 
  });

Thread.Sleep(new TimeSpan(0, 0, wrk.PretendToWorkSeconds));

this._state = WorkerState.Ready;
_spectatorClientChannel.WorkItemStateChanged(
  new WorkItemStatus() {
    WorkerId = _id,
    WorkItemId = wrk.WorkItemId,
    NewState = WorkItemState.Finished
  });

負荷分散と作業の共有

では、さらに別の実験に進みましょう。ここでも 2 つの Worker と 1 つの User を作成します。User で Worker 用の多数の作業項目を生成します。次に、2 つの Worker が作業している間に、3 番目の Worker インスタンスを起動します。ここから、私のシステムが好ましくない動作を行うことになります。1 つの Worker がアイドル状態になり、他の Worker はより多くの作業を行わなければなりません。

P2P アプリケーションでは、いつでも新しいピアをオンラインにしてメッシュに参加させることができます。サーバー ファーム シナリオでは、新しいピアが自動的に参加し、即座に全体的な負荷の処理を開始することが求められます。

最初の変更は、Worker の IWorker.Heartbeat メソッドの実装に対して行う必要があります。現在のコードでは、Worker の状態がコンソールに出力されるだけです。これを、他の Worker の状態をキャッシュするコードに置き換える必要があります。User ピアおよび Spectator ピアにおいてコードを置き換えました。たいていは、そのコードを切り取り、Worker クラスに貼り付けて行うことができます。

Worker ピアが他の Worker の状態を追跡できるようになったので、Worker がアイドル状態の Worker と作業項目を共有する (転送する) 状態があるかどうかをチェックするためのコードを追加します。図 8 に示すように、Worker の MonitorWork メソッドを変更します。

図 8 更新された MonitorWork

if (_state != WorkerState.Offline) {
  if (_workQueue.Count > 1) {
    WorkerStatus firstIdleWorker = GetNextIdleWorker();

    if (firstIdleWorker != null) {
      WorkItem workToForward;
      lock (_workQueue)
        workToForward = _workQueue.Dequeue();

       _workerClientChannel.DoWork(
        firstIdleWorker, 
        workToForward);
    }
  }

  ThreadPool.QueueUserWorkItem(MonitorWork);
}

基本的に、作業項目を処理した後、それ自体を再スケジュールする前に、このメソッドはアイドル状態の Worker ノードがあるかどうかをチェックします (複数の項目が作業キューにある場合のみ、この操作を行うように、条件を追加しました。残りの作業項目が 1 つなら、別の Worker に転送する意味がないからです)。アイドル状態の Worker が検出された場合、WorkItem がキューから削除され、そのアイドル状態の Worker に送られます。GetNextIdleWorker メソッドには、選択ロジックが含まれます。

private WorkerStatus GetNextIdleWorker() {
  List<WorkerStatus> workers = new List<WorkerStatus>();
  lock (_workers)
    workers.AddRange(_workers.Values);
  return workers
    .Where<WorkerStatus>(
      w =>
        (w.State == WorkerState.Ready
        && w.PendingWorkItemCount == 0))
    .FirstOrDefault<WorkerStatus>();
}

最初に行うことは、_workers データ キャッシュをロックし、その中の WorkerStatus 値のスナップショットを取得することです。次に、保留状態の作業を持っていない、Ready 状態の Worker を探して、最初に見つかった Worker を返します。

WorkItem バックアップ

WorkItem があるコンピュータに移され、キューに配置された場合、その Worker ピアが予定外にダウンしたらどうなるでしょうか。プロセスがクラッシュすると、エラー発生時にデータを保存できません。トラブル回避策は、近接しているピアに WorkItem のコピーをバックアップしておくことです。完全なバックアップ ソリューションは、バックアップと復元という 2 つの動作で構成されます。これは簡単なタスクではありません。

最も単純なバックアップ ソリューションは、DoWork method の再利用です。最初のパラメータは、その作業を行うことになっている Worker の ID を指定します。ただし、そのメッセージは実際はメッシュ全体に届きます。この事実を活用して、特定の Worker ID に一致しないすべてのピアで WorkItem がバックアップとして保存されるように指定します。そのメッセージの実際の対象 Worker は、最終的にそのバックアップを削除するように他の Worker に指示します。

ここでのトレードオフは、各 Worker が作業項目のバックアップをキャッシュするリソースを使い果たしてしまうことです。作業項目が少なく、Worker が非常に迅速に作業項目を処理できる場合は、この方法は適切なソリューションです。ただし、システム内に多数の作業項目がある場合は、負荷が非常に高くなる可能性があります。

より適切なソリューションは、バックアップの合計数を最小化することですが、いくつか余分のメッシュ メッセージがやり取りされます。

Kevin Hoffman による記事「ピア ネットワークでの状態共有の設計方法」を参照すると、メッセージの送信先までのホップ数を 1 ホップに限定することで、近接するピアに WorkItem をバックアップできることがわかります。バックアップ要求と後に続く通信は、必要なだけやり取りできます。User または Spectator にとってこれらのバックアップ メッセージは意味がないため、私は IWorker サービス コントラクトを使用しないことにします。IWorkBackup という新しいものを作成しました。

[ServiceContract]
public interface IWorkBackup {
  [OperationContract(IsOneWay = true)]
  void BackupItem(WorkItemBackupRequest request);

  [OperationContract(IsOneWay = true)]
  void DeleteBackup(Guid workItemId);

  [OperationContract(IsOneWay = true)]
  void RestoringBackup(Guid workerId, Guid workItemId);
}

最初のメソッドは、WorkItem をバックアップするために Worker により呼び出されます。2 番目のメソッドは、WorkItem が完了したときに呼び出され、他の Worker はそのコピーを削除できます。最後のメソッドは、Worker がバックアップ WorkItem を復元することを決定した場合に呼び出され、復元を実行します。

BackupItem により送信されるメッセージは、WorkItemBackupRequest クラスにより定義されます。

[MessageContract]
public class WorkItemBackupRequest {
  [PeerHopCount]
  public int HopCount = 1;

  [MessageBodyMember]
  public WorkItem Item;

  [MessageBodyMember]
  public Guid OriginalWorkerId;
  public DateTime BackupRequestTimeStamp;
  public DateTime RestoreAt;
}

HopCount メンバは PeerHopCount 属性を持っています。メッシュからのメッセージの移動を指定の数のホップまでに制限する属性です。この属性に値 1 をハードコードし、このメッセージが隣接する (1 ホップ) ピアにのみ送信されるようにします。MessageBodyMember 属性で指定される 2 つのメンバもあります。Item (バックアップする作業項目) と OriginalWorkerId (バックアップを要求する Worker) です。これらは、基になる SOAP メッセージで送信されます。内部の追跡用に 2 つの追加メンバも追加します。

Worker は RequestBackup メッセージを受け取ったときに、その項目をキャッシュする必要があります。これは、新しいプライベート フィールドを追加してバックアップを格納することにより、簡単に行うことができます。

private Dictionary<string, WorkItem> _workItemBackups;

Worker ピアの RequestBackup メソッドの実装を次に示します。

void IWorkBackup.BackupItem(WorkItemBackupRequest request) {
  request.BackupRequestTimeStamp = DateTime.Now;
  request.RestoreAt = DateTime.MaxValue;

  lock (_workItemBackups)
    _workItemBackups.Add(request.Item.WorkItemId, request);
}

このメソッドにより、BackupRequestTimeStamp に現在時刻が設定され、RestoreAt は既定で DateTime.MaxValue になります。これらのフィールドが有用であることは後でわかります。

ローカルの Worker ピアで作業が完了すると、そのピアは、作業項目のバックアップをキャッシュしている可能性のあるすべてのピアにメッセージを送信する必要があります。これは、DeleteBackup メソッドを呼び出して、完了した WorkItem の ID を提供することにより行います。

このメッセージについては、BackupItem メッセージの場合と異なり、メッセージの送信先を隣接するピアに限定していないことにお気付きでしょう。その理由は、ピア メッシュ トポロジが変わることです。バックアップが要求された後、WorkItem が完了するまでの間に、隣接する Worker ピアが変わる可能性があるからです。したがって、汎用メッセージをすべての Worker に送信して、この項目のバックアップがある場合にはそれを削除するように指示します。完全にするために、DeleteBackup を実装する Worker ピアのコードを次に示します。

void IWorkBackup.DeleteBackup(Guid workItemId) {
  lock (_workItemBackups)
    if (_workItemBackups.ContainsKey(workItemId))
      _workItemBackups.Remove(workItemId);
}

作業項目のバックアップと削除は楽ですね。しかし、真に困難な作業と言えるのは、バックアップした作業項目をいつ復元するかを決定することです。作業項目をいつ復元するか、だれが復元するかを決める必要があります。

作業項目をいつ復元するかを決定するには、各 Worker が定期的にバックアップ キャッシュを監視する必要があります。このために、私は、Worker の Start メソッドに設定される別の System.Threading.Timer called _backupMonitor を追加しました。

_backupMonitor = new Timer(
  this.MonitorBackups,
  null,
  new TimeSpan(0),
  new TimeSpan(0, 1, 0));

これは、ハートビートよりもはるかに長い間隔で実行できるので、1 分ごとに起動するようにタイマを設定します。

MonitorBackups メソッドは、バックアップを復元するためのロジックを処理します (図 9 を参照)。このメソッドは、バックアップされた項目を繰り返し処理して、各項目について復元する必要があるかどうかをチェックします。そのために、いくつかの前提を立てる必要があります。最初に、timeOutThreshold というローカル変数を保持します。これは、項目のバックアップが指示された時点から、その項目の処理が終わるまでの時間です。この値を 1 分間にハードコードしました (理想的には構成可能であるべき)。私の作業項目は 15 秒以上かかることはないからです (User アプリケーションでそのように定義したため)。したがって、使用量がピークに達した場合を除き、時間はたっぷりあります (より良いソリューションとして考えられるのは、バックアップ要求の送信元がこのタイムアウト時間を決定し、要求メッセージに指定することです。送信元は作業負荷の情報を持っており、より正確な見積もりを出すことができます)。WorkItemBackupRequest.BackupRequestTimeStamp に timeOutThreshold を加えた値よりも、現在時刻が進んでいる場合は、さらに調査します。元の Worker が非常にビジーな状態である場合に、自動的に、この作業項目が失敗したと見なされることはありません。ただし、その時刻が過ぎたときには、元の Worker のステータスをチェックする作業に進みます。状態が Busy または Ready で、作業のキュー内に項目がある場合、私はそのステータスを単なるビジーであると仮定し、さらにこの作業項目にアクセスします。

図 9 MonitorBackups

private void MonitorBackups(object context) {
  List<WorkItemBackupRequest> backups = 
    new List<WorkItemBackupRequest>();
  TimeSpan timeOutThreshold = 
    new TimeSpan(0, 1, 0); //This should be read in from config.

  lock (_workItemBackups) {
    backups.AddRange(_workItemBackups.Values);

    foreach (WorkItemBackupRequest backup in backups) {
      //If we've passed the RestoreAt time, we need to
      //take on this work item.
      if (backup.RestoreAt < DateTime.Now) {
        lock (_workQueue)
          _workQueue.Enqueue(backup.Item);

        _workItemBackups.Remove(backup.Item.WorkItemId);
      }

      //This work item has not been deleted yet. 
      //This is suspicious so investigate further.
      else if (backup.BackupRequestTimeStamp.Add(timeOutThreshold) >
         DateTime.Now) {

        //Check to see if the original worker is still operating.
        //Regardless, we won't do anything and the backup
        //remains in our cache.
        if (!WorkerIsStillActive(backup.OriginalWorkerId)) {
          //The machine doesn't seem to be actively working.
          //Set a deadline for restoration of the work item.
          backup.RestoreAt = DateTime.Now.Add(timeOutThreshold);

          //Tell the world our intentions to restore this.
          _backupClientChannel.RestoringBackup(
            this._id, backup.Item.WorkItemId);

          Console.WriteLine("Planning to restore '{0}'", 
            backup.Item.WorkItemId);
        }
      }
    }
  }
}

private bool WorkerIsStillActive(Guid workerId) {
  bool active = false;

  lock (_workers) {
    if (_workers.ContainsKey(workerId)) {
      WorkerStatus sts = _workers[workerId];
      if (sts.State == WorkerState.Busy)
        active = true;
      else if (sts.State == WorkerState.Ready && 
        sts.PendingWorkItemCount > 0)
        active = true;
    }
  }

  return active;
}

このポイントに到達したときに、想定したタイムアウトが過ぎていて、なおかつ元の Worker が応答不可能であると判断した場合は、作業項目を復元するプロセスを開始します。これは、他のすべてのピアに私の意図を伝える IWorkBackup.RestoringBackup メソッドを呼び出すことにより行われます。また、WorkItemBackupRequest オブジェクトの RestoreAt フィールドを将来の時刻に設定します。同じ TimeSpan 値を再利用する場合でも、この将来の時刻は論理的に異なります (理想的には一意の構成ポイントを持つべきです)。RestoreAt は、他のすべてのピアが私のメッセージを受信し、必要に応じて応答する時間があると仮定される時刻です。このロジックの詳細は後述します。ここでは、RestoreAt の値として、実際にバックアップ キャッシュから作業のキューに WorkItem を移動する時刻を設定します。次に MonitorBackups タイマが起動するときは、タイマはこの値をチェックし、適切に動作します。

このロジックは、いつ復元が発生するかを判断します。これは、最も簡単に決定できる部分です。最も難しいのは、だれが作業項目を復元するかを決定することです。PeerChannel は最適なメッシュを保持しようとし、トポロジはピアの参加と離脱によって変わる可能性があります。このように、1 つのピアに隣接するノードの数は随時 2 ~ 7 の範囲で変化します。1 ホップのバックアップ要求がすべてに送信された場合、複数の Worker が同じ作業を復元しようと競合します。WorkItem をいつ復元するかを決定するための同じロジックを持っているためです。では、すべてが同時に復元を試行した場合はどうなるでしょうか。復元するためには、ピアは最初に IWorkBackup.RestoringBackup メソッドを呼び出してその意思を伝える必要があります (図 10 を参照)。

図 10 IWorkBackup.RestoringBackup

void IWorkBackup.RestoringBackup(Guid workerId, Guid workItemId) {
  lock (_workItemBackups) {
    //Only proceed if we have this item backed up.
    if (_workItemBackups.ContainsKey(workItemId)) {
      //Get the backup request for this work item
      WorkItemBackupRequest req = _workItemBackups[workItemId];

      //Check to see if we've decided to restore this.
      if (req.RestoreAt != DateTime.MaxValue) {
        //The restorer with the greatest id wins!
        if (this._id.CompareTo(workItemId) < 0)
          _workItemBackups.Remove(workItemId);
      }
      else {
        //We haven't decided to restore it and 
        //someone else wants to. So let them.
        _workItemBackups.Remove(workItemId);
      }
    }
  }
}

この時点で、RestoringBackup メッセージを他の Worker から受け取った Worker の視点から見て 3 つのシナリオがあります。

  1. キャッシュしていない項目が Worker で復元される。
  2. キャッシュしているが復元することを決定していない項目が Worker で復元される。
  3. 復元することを決定した項目が Worker で復元される。

最初のシナリオは、何もアクションを必要としない単純なものです。2 番目のシナリオも、バックアップされた項目をキャッシュから削除するだけなので単純です。その項目を復元する必要があったと、私が判断したわけではなく、だれかが判断して復元されました。

3 番目のシナリオでは、決戦投票が必要です。各ピアは自主的に動作するため、項目を復元する意思を同時に通知する可能性があります。メッセージが相互に送信され、複数の Worker が上記の 3 番目のシナリオであることを認識する可能性があります。

さて、質問です。だれが勝者になるでしょうか。だれが復元し、だれが無視するでしょうか。意思決定は、ピア間に関係のないもの (システムの時刻など) で、なおかつ変わらないものに基づいて行われる必要があります。最も高い値を持つ Worker ID を勝者にしましょう。Guid.CompareTo の公平な呼び出しにより勝者が決定します。こうすることで、勝者は復元し、敗者は無視します。

ピアの楽しみ

認めざるを得ません。. . P2P ネットワークは楽しいものです。楽しいだけでなく、現実に、その分散型の性質をアプリケーションに活用できるシナリオがあるということも事実です。そういったアプリケーションは、チャットやメディアの不正コピーだけではありません。カスタム アプリケーション ファームまたはハイ パフォーマンス コンピューティング (HPC) シナリオであっても、一括処理を必要とするピア対応アプリケーションがそのまま適用できます。大規模なシステムでの作業の連携および処理が比較的簡単です。

WCF の設計によって、サービスを実装し、必要に応じて進化させることが簡単にできるようになります。P2P メッシュの共有データおよび通信を使用することで、さまざまなクライアントが進行中の処理を監視して、そこに参加することができます。この記事では、ピア対応の処理プラットフォームを作成するデモを示しています。このプラットフォームでは、複数のプレイヤー (Worker、User、および Spectator) がすべて共に機能して、共通目的 (作業を完了する) を達成できます。

Matt Neely は、マイクロソフトの SQL Server User Education Tools チームに所属するソフトウェア デザイン エンジニアです。彼は、友人の Caleb について次の記事で書くことを本人に納得させたり、犬 (ゴールデンドゥードル) の毛を自分でカットして見るも無残な姿にすることに、自分の時間の大半を費やしています。連絡先は、mneely@microsoft.com です。