방법: 다양한 공급자/소비자 패턴 구현

이 항목에서는 애플리케이션에서 생산자-소비자 패턴을 구현하는 방법을 설명합니다. 이 패턴에서 생산자는 메시지 블록에 메시지를 보내고 소비자는 해당 블록에서 메시지를 읽습니다.

이 항목에서는 두 가지 시나리오를 보여 줍니다. 첫 번째 시나리오에서 소비자는 생산자가 보내는 각 메시지를 받아야 합니다. 두 번째 시나리오에서 소비자는 주기적으로 데이터를 폴링하므로 각 메시지를 받을 필요가 없습니다.

이 항목의 두 예제 모두 에이전트, 메시지 블록 및 메시지 전달 함수를 사용하여 생산자에서 소비자로 메시지를 전송합니다. 생산자 에이전트는 동시성::send 함수를 사용하여 동시성::ITarget 개체에 메시지를 씁니다. 소비자 에이전트는 동시성::receive 함수를 사용하여 동시성::ISource 개체에서 메시지를 읽습니다. 두 에이전트 모두 처리 종료를 조정하기 위해 sentinel 값을 보유합니다.

비동기 에이전트에 대한 자세한 내용은 비동기 에이전트를 참조 하세요. 메시지 블록 및 메시지 전달 함수에 대한 자세한 내용은 비동기 메시지 블록메시지 전달 함수를 참조하세요.

예: 소비자 에이전트에 일련의 숫자 보내기

이 예제에서 생산자 에이전트는 소비자 에이전트에 일련의 숫자를 보냅니다. 소비자는 이러한 각 숫자를 수신하고 평균을 계산합니다. 애플리케이션은 콘솔에 평균을 씁니다.

이 예제에서는 동시성::unbounded_buffer 개체를 사용하여 생산자가 메시지를 큐에 대기할 수 있도록 합니다. 이 클래스는 unbounded_buffer 생산자와 ISource 소비자가 공유 버퍼 간에 메시지를 보내고 받을 수 있도록 구현 ITarget 합니다. 및 receive 함수는 send 생산자에서 소비자로 데이터를 전파하는 작업을 조정합니다.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

이 예제의 결과는 다음과 같습니다.

The average is 50.

예: 소비자 에이전트에 일련의 주식 시세 보내기

이 예제에서 생산자 에이전트는 일련의 주식 견적을 소비자 에이전트에 보냅니다. 소비자 에이전트는 주기적으로 현재 견적을 읽고 콘솔에 출력합니다.

이 예제는 생산자가 소비자와 하나의 메시지를 공유할 수 있도록 동시성::overwrite_buffer 개체를 사용한다는 점을 제외하고 이전 예제와 유사합니다. 이전 예제 overwrite_buffer 와 같이 클래스는 ITarget 생산자와 ISource 소비자가 공유 메시지 버퍼에 대해 작동할 수 있도록 구현합니다.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

이 예제에서는 다음 샘플 출력을 생성합니다.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

개체와 unbounded_buffer 달리 함수는 receive 개체에서 overwrite_buffer 메시지를 제거하지 않습니다. 생산자가 해당 메시지를 덮어쓰기 전에 소비자가 메시지 버퍼에서 두 번 이상 읽는 경우 수신기는 매번 동일한 메시지를 받습니다.

코드 컴파일

예제 코드를 복사하여 Visual Studio 프로젝트에 붙여넣거나 이름이 지정된 producer-consumer.cpp 파일에 붙여넣은 다음 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행합니다.

cl.exe /EHsc producer-consumer.cpp

참고 항목

비동기 에이전트 라이브러리
비동기 에이전트
비동기 메시지 블록
메시지 전달 함수