연습: 데이터 흐름 에이전트 만들기

이 문서에서는 제어 흐름 대신 데이터 흐름을 기반으로 하는 에이전트 기반 애플리케이션을 만드는 방법을 보여 줍니다.

제어 흐름 은 프로그램에서 작업의 실행 순서를 나타냅니다. 제어 흐름은 조건문, 루프 등의 제어 구조를 사용하여 제어됩니다. 또는 데이터 흐름은 모든 필수 데이터를 사용할 수 있는 경우에만 계산이 수행되는 프로그래밍 모델을 나타냅니다. 데이터 흐름 프로그래밍 모델은 프로그램의 독립적인 구성 요소가 메시지를 전송하여 서로 통신하는 메시지 전달 개념과 관련이 있습니다.

비동기 에이전트는 제어 흐름 및 데이터 흐름 프로그래밍 모델을 모두 지원합니다. 제어 흐름 모델은 많은 경우에 적절하지만, 예를 들어 에이전트가 데이터를 수신하고 해당 데이터의 페이로드를 기반으로 하는 작업을 수행하는 경우 데이터 흐름 모델은 다른 경우에 적합합니다.

필수 조건

이 연습을 시작하기 전에 다음 문서를 읽어보세요.

섹션

이 연습에는 다음과 같은 섹션이 있습니다.

기본 제어 흐름 에이전트 만들기

클래스를 정의하는 다음 예제를 control_flow_agent 고려해 보세요. 이 클래스는 control_flow_agent 세 개의 메시지 버퍼( 입력 버퍼 1개와 출력 버퍼 2개)에서 작동합니다. 이 메서드는 run 루프의 소스 메시지 버퍼에서 읽고 조건문을 사용하여 프로그램 실행 흐름을 지시합니다. 에이전트는 0이 아닌 음수 값에 대해 하나의 카운터를 증가시키고 0이 아닌 양수 값에 대해 다른 카운터를 증분합니다. 에이전트가 0의 sentinel 값을 받으면 카운터 값을 출력 메시지 버퍼로 보냅니다. negativespositives 메서드를 사용하면 애플리케이션이 에이전트에서 음수 및 양수 값의 개수를 읽을 수 있습니다.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

이 예제에서는 에이전트에서 제어 흐름을 기본적으로 사용하지만 제어 흐름 기반 프로그래밍의 직렬 특성을 보여 줍니다. 입력 메시지 버퍼에서 여러 메시지를 사용할 수 있더라도 각 메시지를 순차적으로 처리해야 합니다. 데이터 흐름 모델을 사용하면 조건문의 두 분기가 동시에 평가할 수 있습니다. 또한 데이터 흐름 모델을 사용하면 사용할 수 있게 되면 데이터에 대해 작동하는 더 복잡한 메시징 네트워크를 만들 수 있습니다.

[맨 위로 이동]

기본 데이터 흐름 에이전트 만들기

이 섹션에서는 데이터 흐름 모델을 사용하여 동일한 작업을 수행하도록 클래스를 변환 control_flow_agent 하는 방법을 보여 줍니다.

데이터 흐름 에이전트는 각각 특정 용도로 사용되는 메시지 버퍼 네트워크를 만들어 작동합니다. 특정 메시지 블록은 필터 함수를 사용하여 페이로드를 기반으로 메시지를 수락하거나 거부합니다. 필터 함수는 메시지 블록이 특정 값만 받도록 합니다.

제어 흐름 에이전트를 데이터 흐름 에이전트로 변환하려면

  1. 클래스의 control_flow_agent 본문을 다른 클래스에 복사합니다(예: dataflow_agent.). 또는 클래스의 control_flow_agent 이름을 바꿀 수 있습니다.

  2. 메서드에서 호출 receive 하는 루프의 본문을 제거합니다 run .

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. 메서드에서 run 변수 negative_countpositive_count를 초기화한 후 활성 작업 수를 추적하는 개체를 추가 countdown_event 합니다.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

클래스는 이 countdown_event 항목의 뒷부분에 나와 있습니다.

  1. 데이터 흐름 네트워크에 참여할 메시지 버퍼 개체를 만듭니다.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. 메시지 버퍼를 커넥트 네트워크를 형성합니다.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. countdown event 개체가 event 설정될 때까지 기다립니다. 이러한 이벤트는 에이전트가 sentinel 값을 수신했으며 모든 작업이 완료되었음을 나타냅니다.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

다음 다이어그램은 클래스에 대한 전체 데이터 흐름 네트워크를 보여 줍니다 dataflow_agent .

The dataflow network.

다음 표에서는 네트워크의 멤버를 설명합니다.

멤버 설명
increment_active 활성 이벤트 카운터를 증가시키고 입력 값을 네트워크의 나머지 부분에 전달하는 동시성::변환기 개체입니다.
negatives, positives concurrency::call 개체는 숫자 수를 증가시키고 활성 이벤트 카운터를 감소합니다. 개체는 각각 필터를 사용하여 음수 또는 양수를 허용합니다.
sentinel 0의 sentinel 값만 허용하고 활성 이벤트 카운터를 감소시키는 동시성::call 개체입니다.
connector 원본 메시지 버퍼를 내부 네트워크에 연결하는 동시성::unbounded_buffer 개체입니다.

run 이 메서드는 별도의 스레드에서 호출되므로 네트워크가 완전히 연결되기 전에 다른 스레드가 네트워크에 메시지를 보낼 수 있습니다. _source 데이터 멤버는 unbounded_buffer 애플리케이션에서 에이전트로 전송되는 모든 입력을 버퍼링하는 개체입니다. 네트워크에서 모든 입력 메시지를 처리하도록 하기 위해 에이전트는 먼저 네트워크의 내부 노드를 연결한 다음 해당 네트워크의 connector시작을 데이터 멤버에 _source 연결합니다. 이렇게 하면 네트워크가 형성될 때 메시지가 처리되지 않습니다.

이 예제의 네트워크는 제어 흐름이 아닌 데이터 흐름을 기반으로 하므로 네트워크는 각 입력 값 처리를 완료했으며 sentinel 노드가 해당 값을 받았다는 것을 에이전트와 통신해야 합니다. 이 예제에서는 개체를 countdown_event 사용하여 모든 입력 값이 처리되었음을 알리고 동시성::event 개체를 사용하여 sentinel 노드가 해당 값을 수신했음을 나타냅니다. 클래스는 countdown_event 개체를 event 사용하여 카운터 값이 0에 도달하면 신호를 표시합니다. 데이터 흐름 네트워크의 헤드는 값을 받을 때마다 카운터를 증가합니다. 네트워크의 모든 터미널 노드는 입력 값을 처리한 후 카운터를 감소합니다. 에이전트가 데이터 흐름 네트워크를 형성한 후에는 sentinel 노드가 개체를 설정하고 event 개체 countdown_event 가 해당 카운터가 0에 도달했음을 알릴 때까지 기다립니다.

다음 예제에서는 , dataflow_agentcountdown_event 클래스를 보여 control_flow_agent줍니다. 함수는 wmain a control_flow_agent 및 개체를 dataflow_agent 만들고 함수를 send_values 사용하여 일련의 임의 값을 에이전트에 보냅니다.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

이 예제에서는 다음 샘플 출력을 생성합니다.

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

코드 컴파일

예제 코드를 복사하여 Visual Studio 프로젝트에 붙여넣거나 이름이 지정된 dataflow-agent.cpp 파일에 붙여넣은 다음 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행합니다.

cl.exe /EHsc dataflow-agent.cpp

[맨 위로 이동]

메시지 로깅 에이전트 만들기

다음 예제에서는 클래스와 log_agent 비슷한 클래스를 dataflow_agent 보여 줍니다. 클래스는 log_agent 파일 및 콘솔에 로그 메시지를 쓰는 비동기 로깅 에이전트를 구현합니다. 이 log_agent 클래스를 사용하면 애플리케이션에서 메시지를 정보, 경고 또는 오류로 분류할 수 있습니다. 또한 애플리케이션에서 각 로그 범주를 파일, 콘솔 또는 둘 다에 쓸지 여부를 지정할 수 있습니다. 이 예제에서는 모든 로그 메시지를 파일에 쓰고 오류 메시지만 콘솔에 씁니다.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

이 예제에서는 콘솔에 다음 출력을 씁니다.

error: This is a sample error message.

이 예제에서는 다음 텍스트를 포함하는 log.txt 파일도 생성합니다.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

코드 컴파일

예제 코드를 복사하여 Visual Studio 프로젝트에 붙여넣거나 이름이 지정된 log-filter.cpp 파일에 붙여넣은 다음 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행합니다.

cl.exe /EHsc log-filter.cpp

[맨 위로 이동]

참고 항목

동시성 런타임 연습