Práticas recomendadas na Biblioteca de Agentes Assíncrona

Este documento descreve como fazer uso efetivo da Biblioteca de Agentes Assíncronos. A Biblioteca de Agentes Assíncronos promovem um modelo de programação baseado em ator e passagem de mensagens em processo para tarefas de pipeline e fluxo de dados de alta granularidade.

Para mais informações sobre a Biblioteca de Agentes, confira a Biblioteca de Agentes Assíncronos.

Seções

Este documento contém as seguintes seções:

Usar agentes para isolar o estado

A Biblioteca de Agentes fornece alternativas ao estado compartilhado, permitindo que você conecte componentes isolados por meio de um mecanismo assíncrono de passagem de mensagens. Os agentes assíncronos são mais eficazes quando isolam seu estado interno de outros componentes. Ao isolar o estado, normalmente, vários componentes não agem em dados compartilhados. O isolamento de estado pode permitir que seu aplicativo seja dimensionado porque reduz a contenção na memória compartilhada. O isolamento de estado também reduz a chance de deadlock e condições de corrida porque os componentes não precisam sincronizar o acesso aos dados compartilhados.

Normalmente, você isola o estado em um agente mantendo os membros de dados nas seções private ou protected da classe de agente e usando buffers de mensagem para comunicar as alterações de estado. O exemplo a seguir mostra a classe basic_agent, que deriva de concurrency::agent. A classe basic_agent usa dois buffers de mensagem para se comunicar com componentes externos. Um buffer de mensagem contém mensagens de entrada; o outro buffer de mensagem contém mensagens de saída.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Para obter exemplos completos sobre como definir e usar agentes, consulte Passo a passo: criando um aplicativo baseado em agente e Passo a passo: criando um agente de fluxo de dados.

[Parte superior]

Usar um mecanismo de limitação para limitar o número de mensagens em um pipeline de dados

Muitos tipos de buffer de mensagem, como concurrency::unbounded_buffer, podem conter um número ilimitado de mensagens. Quando um produtor de mensagens envia mensagens para um pipeline de dados mais rápido do que o consumidor pode processar essas mensagens, o aplicativo pode inserir um estado de baixa memória ou memória insuficiente. Você pode usar um mecanismo de limitação, por exemplo, um semáforo, para limitar o número de mensagens que estejam simultaneamente ativas em um pipeline de dados.

O exemplo básico a seguir demonstra como usar um semáforo para limitar o número de mensagens em um pipeline de dados. O pipeline de dados usa a função concurrency::wait para simular uma operação que leva pelo menos 100 milissegundos. Como o remetente produz mensagens mais rápido do que o consumidor pode processá-las, este exemplo define a classe semaphore para permitir que o aplicativo limite o número de mensagens ativas.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

O objeto semaphore limita o pipeline para processar no máximo duas mensagens ao mesmo tempo.

O produtor neste exemplo envia relativamente poucas mensagens ao consumidor. Portanto, este exemplo não demonstra uma possível condição de baixa memória ou de memória insuficiente. No entanto, esse mecanismo é útil quando um pipeline de dados contém um número relativamente alto de mensagens.

Para obter mais informações sobre como criar a classe de semáforo usada neste exemplo, consulte Como usar a classe de contexto para implementar um semáforo cooperativo.

[Parte superior]

Não executar trabalho refinado em um pipeline de dados

A Biblioteca de Agentes é mais útil quando o trabalho executado por um pipeline de dados é bastante granular. Por exemplo, um componente de aplicativo pode ler dados de um arquivo ou uma conexão de rede e ocasionalmente enviar esses dados para outro componente. O protocolo que a Biblioteca de Agentes usa para propagar mensagens faz com que o mecanismo de passagem de mensagens tenha mais sobrecarga do que as construções paralelas da tarefa fornecidas pela Biblioteca de Padrões Paralelos (PPL). Portanto, certifique-se de que o trabalho executado por um pipeline de dados seja longo o suficiente para compensar essa sobrecarga.

Embora um pipeline de dados seja mais eficaz quando suas tarefas são mais granulares, cada estágio do pipeline de dados pode usar constructos PPL, como grupos de tarefas e algoritmos paralelos, para executar um trabalho mais refinado. Para obter um exemplo de uma rede de dados granular que usa paralelismo refinado em cada estágio de processamento, consulte Passo a passo: Criando uma rede de processamento de imagem.

[Parte superior]

Não passar cargas de mensagens grandes por valor

Em alguns casos, o runtime cria uma cópia de cada mensagem que passa de um buffer de mensagem para outro buffer de mensagem. Por exemplo, a classe concurrency::overwrite_buffer oferece uma cópia de cada mensagem que recebe para cada um de seus destinos. O runtime também cria uma cópia dos dados da mensagem quando você usa funções de passagem de mensagem, como concurrency::send e concurrency::receive para gravar e ler mensagens de um buffer de mensagem. Embora esse mecanismo ajude a eliminar o risco de gravação simultânea em dados compartilhados, ele pode levar a um baixo desempenho da memória quando o conteúdo da mensagem é relativamente grande.

Você pode usar ponteiros ou referências para melhorar o desempenho da memória ao passar mensagens que tenha um conteúdo grande. O exemplo a seguir compara a passagem de mensagens grandes por valor para passar ponteiros para o mesmo tipo de mensagem. O exemplo define dois tipos de agente, producer e consumer, que atuam em objetos message_data. O exemplo compara o tempo necessário para que o produtor envie vários objetos message_data ao consumidor com o tempo necessário para que o agente produtor envie vários ponteiros para objetos message_data ao consumidor.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Este exemplo gera a seguinte saída de amostra:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

A versão que usa ponteiros tem melhor desempenho porque elimina o requisito para que o runtime crie uma cópia completa de cada objeto message_data que ele passa do produtor para o consumidor.

[Parte superior]

Usar shared_ptr em uma rede de dados quando a propriedade estiver indefinida

Quando você envia mensagens por ponteiro por meio de um pipeline ou rede de passagem de mensagens, normalmente aloca a memória para cada mensagem na frente da rede e libera essa memória no final da rede. Embora esse mecanismo funcione frequentemente bem, há casos em que é difícil ou não é possível usá-lo. Por exemplo, considere o caso em que a rede de dados contém vários nós finais. Nesse caso, não há um local claro para liberar a memória das mensagens.

Para resolver esse problema, você pode usar um mecanismo, por exemplo, std::shared_ptr, que permite que um ponteiro seja de propriedade de vários componentes. Quando o objeto final shared_ptr que possui um recurso é destruído, o recurso também é liberado.

O exemplo a seguir demonstra como usar shared_ptr para compartilhar valores de ponteiro entre vários buffers de mensagem. O exemplo conecta um objeto concurrency::overwrite_buffer a três objetos concurrency::call. A classe overwrite_buffer oferece mensagens para cada um de seus destinos. Como há vários proprietários dos dados no final da rede de dados, este exemplo usa shared_ptr para permitir que cada objeto call compartilhe a propriedade das mensagens.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Este exemplo gera a seguinte saída de amostra:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Confira também

Práticas recomendadas do runtime de simultaneidade
Biblioteca de agentes assíncronos
Instruções passo a passo: criando um aplicativo com base no agente
Instruções passo a passo: criando um agente de fluxo de dados
Instruções passo a passo: criando uma rede de processamento de imagem
Práticas recomendadas na biblioteca de padrões paralelos
Práticas recomendadas gerais no runtime de simultaneidade