Share via


方法: Context クラスを使用して協調セマフォを実装する

このトピックでは、concurrency::Context クラスを使用して協調セマフォ クラスを実装する方法について説明します。

解説

Context クラスを使用すると、現在の実行コンテキストをブロックまたは生成できます。 現在のコンテキストをブロックまたは生成する機能は、リソースを使用できないことが原因で現在のコンテキストを続行できない場合に有用です。 "セマフォ" は、リソースが使用できるようになるまで現在の実行コンテキストが待機する必要がある状況を示す 1 つの例です。 セマフォは、クリティカル セクション オブジェクトと同様に、1 つのコンテキストのコードがリソースに対して排他的にアクセスすることを可能にする同期オブジェクトです。 ただし、クリティカル セクション オブジェクトとは異なり、セマフォは、複数のコンテキストが並列的にリソースにアクセスできるようにします。 コンテキストの数が最大数に達してセマフォがロックされた場合、追加のコンテキストは、別のコンテキストがロックを解放するのを待機する必要があります。

semaphore クラスを実装するには

  1. semaphore という名前のクラスを宣言します。 このクラスに public セクションと private セクションを追加します。
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
private:
};
  1. semaphore クラスの private セクションで、セマフォのカウントを保持する std::atomic 変数と、セマフォを取得するために待機する必要があるコンテキストを保持する concurrency::concurrent_queue オブジェクトを宣言します。
// The semaphore count.
atomic<long long> _semaphore_count;

// A concurrency-safe queue of contexts that must wait to 
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
  1. public クラスの semaphore セクションで、コンストラクターを実装します。 このコンストラクターは、ロックを同時に保持できるコンテキストの最大数を指定する long long 値を受け取ります。
explicit semaphore(long long capacity)
   : _semaphore_count(capacity)
{
}
  1. public クラスの semaphore セクションで、acquire メソッドを実装します。 このメソッドは、分割不可能な操作として、セマフォのカウントをデクリメントします。 セマフォのカウントが負になった場合は、現在のコンテキストを待機キューの末尾に追加し、concurrency::Context::Block メソッドを呼び出して現在のコンテキストをブロックします。
// Acquires access to the semaphore.
void acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (--_semaphore_count < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}
  1. public クラスの semaphore セクションで、release メソッドを実装します。 このメソッドは、分割不可能な操作として、セマフォのカウントをインクリメントします。 インクリメント操作の前にセマフォのカウントが負になる場合は、ロックを待機しているコンテキストが 1 つ以上存在することを示します。 この場合は、待機キューの先頭にあるコンテキストのブロックを解除します。
// Releases access to the semaphore.
void release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (++_semaphore_count <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      while (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

この例の semaphore クラスは協調的に動作します。それは、Context::Block メソッドと Context::Yield メソッドによって実行が生成され、ランタイムが他のタスクを実行できるらめです。

acquire メソッドはカウンターをデクリメントしますが、別のコンテキストが release メソッドを呼び出す前にコンテキストを待機キューに追加できない可能性があります。 つまり、release メソッドは、concurrency::Context::Yield メソッドを呼び出すスピン ループを使用して、acquire メソッドがコンテキストの追加処理を完了するのを待ちます。

release メソッドは、Context::Unblock メソッドが acquire メソッドを呼び出す前に、Context::Block メソッドを呼び出すことができます。 ランタイムではこれらのメソッドが任意の順序で呼び出されることが考慮されているため、この競合状態に対する対策は必要ありません。 release メソッドが Context::Unblock を呼び出す前に同じコンテキストに対して acquire メソッドが Context::Block を呼び出した場合、このコンテキストは非ブロック状態のままになります。 ランタイムでは、Context::Block の各呼び出しが対応する Context::Unblock の呼び出しと一致することのみが求められます。

次の例は、完全な semaphore クラスを示しています。 wmain 関数に、このクラスの基本的な使用法が示されています。 wmain 関数では、concurrency::parallel_for アルゴリズムを使用して、セマフォへのアクセスを必要とするいくつかのタスクを作成しています。 3 つのスレッドがいつでもロックを保持できるため、いくつかのタスクは、別のタスクが完了してロックを解除するのを待機する必要があります。

// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

int wmain()
{
   // Create a semaphore that allows at most three threads to 
   // hold the lock.
   semaphore s(3);

   parallel_for(0, 10, [&](int i) {
      // Acquire the lock.
      s.acquire();

      // Print a message to the console.
      wstringstream ss;
      ss << L"In loop iteration " << i << L"..." << endl;
      wcout << ss.str();

      // Simulate work by waiting for two seconds.
      wait(2000);

      // Release the lock.
      s.release();
   });
}

この例では、次のサンプル出力が生成されます。

In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...

concurrent_queue クラスの詳細については、「並列コンテナーと並列オブジェクト」を参照してください。 parallel_for アルゴリズムの詳細については、「並列アルゴリズム」を参照してください。

コードのコンパイル

コード例をコピーし、Visual Studio プロジェクトに貼り付けるか、cooperative-semaphore.cpp という名前のファイルに貼り付けてから、Visual Studio のコマンド プロンプト ウィンドウで次のコマンドを実行します。

cl.exe /EHsc cooperative-semaphore.cpp

信頼性の高いプログラミング

Resource Acquisition Is Initialization (RAII) パターンを使用して、semaphore オブジェクトへのアクセスを特定のスコープに制限できます。 RAII パターンでは、データ構造はスタック上に割り当てられます。 データ構造は、作成されたときにリソースを初期化または取得し、破棄されたときにそのリソースを破棄または解放します。 RAII パターンでは、外側のスコープが終了する前に、常にデストラクターが呼び出されます。 したがって、例外がスローされた場合や、関数に複数の return ステートメントが含まれている場合でも、リソースは適切に管理されます。

次の例では、scoped_lock クラスの public セクションに定義されている、semaphore という名前のクラスを定義しています。 この scoped_lock クラスは、concurrency::critical_section::scoped_lock クラスおよび concurrency::reader_writer_lock::scoped_lock クラスに似ています。 semaphore::scoped_lock クラスのコンストラクターは特定の semaphore オブジェクトへのアクセスを取得し、デストラクターはこのオブジェクトへのアクセスを解放します。

// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
   // Acquires access to the semaphore.
   scoped_lock(semaphore& s)
      : _s(s)
   {
      _s.acquire();
   }
   // Releases access to the semaphore.
   ~scoped_lock()
   {
      _s.release();
   }

private:
   semaphore& _s;
};

次の例では、RAII を使用して関数から制御が返される前にセマフォが確実に解放されるようにするために、parallel_for アルゴリズムに渡される処理関数の本体に変更を加えています。 これにより、処理関数は例外セーフとなります。

parallel_for(0, 10, [&](int i) {
   // Create an exception-safe scoped_lock object that holds the lock 
   // for the duration of the current scope.
   semaphore::scoped_lock auto_lock(s);

   // Print a message to the console.
   wstringstream ss;
   ss << L"In loop iteration " << i << L"..." << endl;
   wcout << ss.str();

   // Simulate work by waiting for two seconds.
   wait(2000);
});

関連項目

コンテキスト
並列コンテナーと並列オブジェクト