异步代理库中的最佳做法

本文档介绍如何有效利用异步代理库。 代理库为粗粒度数据流和管道任务提升了基于角色的编程模型和进程内消息传递。

有关代理库的详细信息,请参阅异步代理库

章节

本文档包含以下各节:

使用代理隔离状态

代理库通过异步消息传递机制连接隔离的组件,从而提供共享状态的替代方法。 异步代理在将其内部状态与其他组件隔离时最有效。 通过隔离状态,多个组件通常不会对共享数据执行操作。 状态隔离可以使应用程序进行缩放,因为它减少了共享内存上的争用。 状态隔离还减少了死锁和争用条件的可能性,因为组件不必同步对共享数据的访问。

通常,通过在代理类的 privateprotected 部分中保存数据成员,并使用消息缓冲区来传达状态更改,来隔离代理中的状态。 下面的示例演示 basic_agent 类,该类派生自 concurrency::agentbasic_agent 类使用两个消息缓冲区与外部组件进行通信。 一个消息缓冲区用于保存传入消息;另一个消息缓冲区用于保存传出消息。

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

有关如何定义和使用代理的完整示例,请参阅演练:创建基于代理的应用程序演练:创建数据流代理

[返回页首]

使用限制机制限制数据管道中的消息数

许多消息缓冲区类型(如 concurrency::unbounded_buffer)可以保存无限数量的消息。 当消息生成者向数据管道发送消息的速度快于使用者处理这些消息的速度时,应用程序可能会进入内存不足状态。 可以使用限制机制(例如信号灯)来限制数据管道中并发处于活动状态的消息数。

以下基本示例演示如何使用信号灯来限制数据管道中的消息数。 数据管道使用 concurrency::wait 函数来模拟至少需要 100 毫秒的操作。 由于发送方生成消息的速度比使用者处理这些消息的速度快,因此此示例定义了 semaphore 类,以使应用程序能够限制活动消息的数量。

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

semaphore 对象将管道限制为最多同时处理两条消息。

此示例中的生成者向使用者发送的消息相对较少。 因此,此示例不演示潜在的内存不足情况。 但是,当数据管道包含相对较多的消息时,此机制很有用。

有关如何创建此示例中使用的信号灯类的更多信息,请参见如何:使用上下文类实现协作信号灯

[返回页首]

不要在数据管道中执行细粒度工作

当数据管道执行的工作相当粗粒度时,代理库最有用。 例如,一个应用程序组件可能从文件或网络连接中读取数据,有时还会将该数据发送到另一个组件。 代理库用于传播消息的协议会导致消息传递机制比并行模式库 (PPL) 提供的任务并行构造具有更多的开销。 因此,请确保数据管道执行的工作足够长,可以抵消此开销。

尽管数据管道在任务是粗粒度时最有效,但数据管道的每个阶段都可以使用 PPL 构造(如任务组和并行算法)来执行更细粒度的工作。 有关在每个处理阶段使用细粒度并行性的粗粒度数据网络的示例,请参阅演练:创建图像处理网络

[返回页首]

不要按值传递大型消息有效负载

在某些情况下,运行时会创建从一个消息缓冲区传递到另一个消息缓冲区的每条消息的副本。 例如,concurrency::overwrite_buffer 类将它收到的每条消息的副本提供给每个目标。 使用消息传递函数(如 concurrency::sendconcurrency::receive)将消息写入消息缓冲区并从消息缓冲区读取消息时,运行时还会创建消息数据的副本。 尽管此机制有助于消除并发写入共享数据的风险,但当消息负载相对较大时,它可能导致内存性能较差。

在传递具有较大负载的消息时,可以使用指针或引用来提高内存性能。 下面的示例将按值传递大型消息与将指针传递到同一消息类型进行比较。 该示例定义了两种代理类型(producerconsumer),它们作用于 message_data 对象。 该示例将生成者向使用者发送多个 message_data 对象所需的时间与生成者代理向使用者发送多个指向 message_data 对象的指针所需的时间进行比较。

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

此示例产生以下示例输出:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

使用指针的版本性能更好,因为它消除了运行时创建从生成者传递到使用者的每个 message_data 对象的完整副本的要求。

[返回页首]

在未定义所有权时在数据网络中使用 shared_ptr

通过传递消息传递管道或网络通过指针发送消息时,通常在网络前端为每条消息分配内存,并在网络末端释放该内存。 尽管此机制通常运行良好,但在某些情况下很难或不可能使用它。 例如,请考虑数据网络包含多个终结点的情况。 在这种情况下,没有明确的位置释放消息的内存。

若要解决此问题,可以使用一种机制(例如 std::shared_ptr),使指针由多个组件拥有。 当拥有资源的最终 shared_ptr 对象被销毁时,该资源也会被释放。

下面的示例演示如何使用 shared_ptr 在多个消息缓冲区之间共享指针值。 该示例将一个 concurrency::overwrite_buffer 对象连接到三个 concurrency::call 对象。 overwrite_buffer 类向其每个目标提供消息。 由于数据网络末端有多个数据所有者,因此此示例使用 shared_ptr 使每个 call 对象能够共享消息的所有权。

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

此示例产生以下示例输出:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

另请参阅

并发运行时最佳做法
异步代理库
演练:创建基于代理的应用程序
演练:创建数据流代理
演练:创建图像处理网络
并行模式库中的最佳做法
并发运行时中的常规最佳做法