Como usar a classe de contexto para implementar um semáforo cooperativo

Este tópico mostra como usar a classe concurrency::Context para implementar uma classe de sinal cooperativo.

Comentários

A classe Context permite que você bloqueie ou produza o contexto de execução atual. O bloqueio ou suspensão do contexto atual é útil quando o contexto atual não pode continuar em razão de um recurso não estar disponível. Um sinal é um exemplo de uma situação em que o contexto de execução atual deve aguardar a disponibilidade de um recurso. Um sinal, como um objeto de seção crítica, é um objeto de sincronização que permite que o código em um contexto tenha acesso exclusivo a um recurso. No entanto, ao contrário de um objeto de seção crítica, um sinal permite que mais de um contexto acesse o recurso simultaneamente. Se o número máximo de contextos mantiver um bloqueio de sinal, cada contexto adicional deve aguardar outro contexto para liberar o bloqueio.

Para implementar a classe semaphore

  1. Declare uma classe chamada semaphore. Adicione as seções public e private a essa classe.
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
private:
};
  1. Na seção private da classe semaphore, declare uma variável std::atomic que mantém a contagem de sinais e um objeto concurrency::concurrent_queue que mantém os contextos que devem aguardar para adquirir o sinal.
// The semaphore count.
atomic<long long> _semaphore_count;

// A concurrency-safe queue of contexts that must wait to 
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
  1. Na seção public da classe semaphore, implemente o construtor. O construtor obtém um valor long long que especifica o número máximo de contextos que podem manter o bloqueio de maneira simultânea.
explicit semaphore(long long capacity)
   : _semaphore_count(capacity)
{
}
  1. Na seção public da classe semaphore, implemente o método acquire. Esse método diminui a contagem de sinais como uma operação atômica. Se a contagem de sinais for negativa, adicione o contexto atual ao final da fila de espera e chame o método concurrency::Context::Block para bloquear o contexto atual.
// Acquires access to the semaphore.
void acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (--_semaphore_count < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}
  1. Na seção public da classe semaphore, implemente o método release. Esse método incrementa a contagem de sinais como uma operação atômica. Se a contagem de sinais for negativa antes da operação de incremento, há pelo menos um contexto que está aguardando o bloqueio. Nesse caso, desbloqueie o contexto que está na frente da fila de espera.
// Releases access to the semaphore.
void release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (++_semaphore_count <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      while (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

Exemplo

A classe semaphore neste exemplo se comporta de forma cooperativa porque os métodos os métodos Context::Block e Context::Yield suspendem a execução para que o runtime possa executar outras tarefas.

O método acquire diminui o contador, mas pode não terminar de adicionar o contexto à fila de espera antes que outro contexto chame o método release. Para levar em conta isso, o método release usa um loop de rotação que chama o método concurrency::Context::Yield para aguardar que o método acquire conclua a adição do contexto.

O método release pode chamar o método Context::Unblock antes que o método acquire chame o método Context::Block. Você não precisa se proteger contra essa condição de corrida porque o runtime permite que esses métodos sejam chamados em qualquer ordem. Se o método release chamar Context::Unblock antes que o método acquire chame Context::Block para o mesmo contexto, esse contexto permanecerá desbloqueado. O runtime necessita que somente cada chamada para Context::Block seja correspondida com uma chamada correspondente para Context::Unblock.

O exemplo a seguir mostra toda a classe semaphore. A função wmain mostra o uso básico dessa classe. A função wmain usa o algoritmo concurrency::p arallel_for para criar várias tarefas que exigem acesso ao sinal. Como três threads podem manter o bloqueio a qualquer momento, algumas tarefas devem aguardar a conclusão de outra tarefa e liberar o bloqueio.

// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

int wmain()
{
   // Create a semaphore that allows at most three threads to 
   // hold the lock.
   semaphore s(3);

   parallel_for(0, 10, [&](int i) {
      // Acquire the lock.
      s.acquire();

      // Print a message to the console.
      wstringstream ss;
      ss << L"In loop iteration " << i << L"..." << endl;
      wcout << ss.str();

      // Simulate work by waiting for two seconds.
      wait(2000);

      // Release the lock.
      s.release();
   });
}

Este exemplo gera a saída de amostra a seguir.

In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...

Para mais informações sobre a classe concurrent_queue, confira Contêineres e objetos paralelos. Para saber mais sobre o algoritmo parallel_for, confira Algoritmos paralelos.

Compilando o código

Copie o código de exemplo e cole-o em um projeto do Visual Studio, ou cole-o em um arquivo chamado cooperative-semaphore.cpp e execute o comando a seguir em uma janela do Prompt de comando do Visual Studio.

cl.exe /EHsc cooperative-semaphore.cpp

Programação robusta

Você pode usar o padrão RAII (Resource Acquisition Is Initialization) para limitar o acesso a um objeto semaphore a um determinado escopo. No padrão RAII, uma estrutura de dados é alocada na pilha. Essa estrutura de dados inicializa ou adquire um recurso quando ele é criado e destrói ou libera esse recurso quando a estrutura de dados é destruída. O padrão RAII garante que o destruidor seja chamado antes que o escopo delimitador seja encerrado. Portanto, o recurso é gerenciado corretamente quando uma exceção é gerada ou quando uma função contém várias instruções return.

O exemplo a seguir define uma classe chamada scoped_lock, que é definida na seção public da classe semaphore. A classe scoped_lock se assemelha às classes simultaneidade::critical_section::scoped_lock e simultaneidade::reader_writer_lock::scoped_lock. O construtor da classe semaphore::scoped_lock adquire acesso ao objeto semaphore fornecido e o destruidor libera o acesso a esse objeto.

// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
   // Acquires access to the semaphore.
   scoped_lock(semaphore& s)
      : _s(s)
   {
      _s.acquire();
   }
   // Releases access to the semaphore.
   ~scoped_lock()
   {
      _s.release();
   }

private:
   semaphore& _s;
};

O exemplo a seguir modifica o corpo da função de trabalho que é passada para o algoritmo parallel_for para que ele use RAII para garantir que o semáforo seja liberado antes que a função retorne. Essa técnica garante que a função de trabalho seja à prova de exceções.

parallel_for(0, 10, [&](int i) {
   // Create an exception-safe scoped_lock object that holds the lock 
   // for the duration of the current scope.
   semaphore::scoped_lock auto_lock(s);

   // Print a message to the console.
   wstringstream ss;
   ss << L"In loop iteration " << i << L"..." << endl;
   wcout << ss.str();

   // Simulate work by waiting for two seconds.
   wait(2000);
});

Confira também

Contextos
Contêineres e objetos em paralelo