연습: 사용자 지정 메시지 블록 만들기Walkthrough: Creating a Custom Message Block

이 문서에서는 들어오는 메시지를 우선 순위별로 정렬 하는 사용자 지정 메시지 블록 형식을 만드는 방법을 설명 합니다.This document describes how to create a custom message block type that orders incoming messages by priority.

기본 제공 메시지 블록 형식은 다양 한 기능을 제공 하지만 사용자 고유의 메시지 블록 형식을 만들어 응용 프로그램의 요구 사항에 맞게 사용자 지정할 수 있습니다.Although the built-in message block types provide a wide-range of functionality, you can create your own message block type and customize it to meet the requirements of your application. 비동기 에이전트 라이브러리에서 제공 하는 기본 제공 메시지 블록 형식에 대 한 설명은 비동기 메시지 블록을 참조 하세요.For a description of the built-in message block types that are provided by the Asynchronous Agents Library, see Asynchronous Message Blocks.

사전 요구 사항Prerequisites

이 연습을 시작 하기 전에 다음 문서를 읽어 보세요.Read the following documents before you start this walkthrough:

섹션이Sections

이 연습에는 다음과 같은 섹션이 있습니다.This walkthrough contains the following sections:

사용자 지정 메시지 블록 디자인Designing a Custom Message Block

메시지 블록은 메시지를 보내고 받는 동작에 참여 합니다.Message blocks participate in the act of sending and receiving messages. 메시지를 보내는 메시지 블록을 소스 블록이라고 합니다.A message block that sends messages is known as a source block. 메시지를 수신 하는 메시지 블록을 대상 블록이라고 합니다.A message block that receives messages is known as a target block. 메시지를 보내고 받는 메시지 블록을 전파자 블록이라고 합니다.A message block that both sends and receives messages is known as a propagator block. 에이전트 라이브러리는 추상 클래스 concurrency:: ISource 를 사용 하 여 소스 블록을 나타내고 추상 클래스 Concurrency:: ITarget 를 사용 하 여 대상 블록을 나타냅니다.The Agents Library uses the abstract class concurrency::ISource to represent source blocks and the abstract class concurrency::ITarget to represent target blocks. 소스로 작동 하는 메시지 블록 형식은에서 파생 되며 ISource 대상으로 작동 하는 메시지 블록 형식은에서 파생 ITarget 됩니다.Message block types that act as sources derive from ISource; message block types that act as targets derive from ITarget.

및에서 직접 메시지 블록 형식을 파생할 수는 있지만 ISource , ITarget 에이전트 라이브러리는 모든 메시지 블록 형식에 공통적인 기능을 대부분 수행 하는 세 가지 기본 클래스를 정의 합니다. 예를 들어 오류를 처리 하 고 메시지 블록을 동시에 안전 하 게 연결할 수 있습니다.Although you can derive your message block type directly from ISource and ITarget, the Agents Library defines three base classes that perform much of the functionality that is common to all message block types, for example, handling errors and connecting message blocks together in a concurrency-safe manner. Concurrency:: source_block 클래스는에서 파생 되 ISource 고 메시지를 다른 블록으로 보냅니다.The concurrency::source_block class derives from ISource and sends messages to other blocks. Concurrency:: target_block 클래스는에서 파생 되 ITarget 고 다른 블록에서 메시지를 받습니다.The concurrency::target_block class derives from ITarget and receives messages from other blocks. Concurrency::p ropagator_block 클래스는 및에서 ISource 파생 ITarget 되 고 메시지를 다른 블록으로 보내고 다른 블록에서 메시지를 받습니다.The concurrency::propagator_block class derives from ISource and ITarget and sends messages to other blocks and it receives messages from other blocks. 메시지 블록의 동작에 집중할 수 있도록 이러한 세 가지 기본 클래스를 사용 하 여 인프라 세부 정보를 처리 하는 것이 좋습니다.We recommend that you use these three base classes to handle infrastructure details so that you can focus on the behavior of your message block.

source_block, target_block 및 클래스는 propagator_block 소스 블록과 대상 블록 간의 연결 또는 링크를 관리 하는 형식에서 매개 변수화 된 템플릿 및 메시지 처리 방법을 관리 하는 형식입니다.The source_block, target_block, and propagator_block classes are templates that are parameterized on a type that manages the connections, or links, between source and target blocks and on a type that manages how messages are processed. 에이전트 라이브러리는 링크 관리, concurrency:: single_link_registryconcurrency:: multi_link_registry를 수행 하는 두 가지 형식을 정의 합니다.The Agents Library defines two types that perform link management, concurrency::single_link_registry and concurrency::multi_link_registry. single_link_registry클래스를 사용 하면 메시지 블록이 하나의 소스 나 하나의 대상에 연결 될 수 있습니다.The single_link_registry class enables a message block to be linked to one source or to one target. multi_link_registry클래스를 사용 하면 메시지 블록이 여러 원본 또는 여러 대상에 연결 될 수 있습니다.The multi_link_registry class enables a message block to be linked to multiple sources or multiple targets. 에이전트 라이브러리는 메시지 관리, 동시성:: ordered_message_processor를 수행 하는 하나의 클래스를 정의 합니다.The Agents Library defines one class that performs message management, concurrency::ordered_message_processor. ordered_message_processor클래스를 사용 하면 메시지 블록에서 메시지를 받은 순서 대로 메시지를 처리할 수 있습니다.The ordered_message_processor class enables message blocks to process messages in the order in which it receives them.

메시지 블록의 원본 및 대상과 관련 된 방법을 더 잘 이해 하려면 다음 예제를 참조 하세요.To better understand how message blocks relate to their sources and targets, consider the following example. 이 예제에서는 concurrency:: 변환기 클래스의 선언을 보여 줍니다.This example shows the declaration of the concurrency::transformer class.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

transformer클래스는에서 파생 propagator_block 되므로 소스 블록과 대상 블록 역할을 모두 수행 합니다.The transformer class derives from propagator_block, and therefore acts as both a source block and as a target block. 형식의 메시지 _Input 를 수락 하 고 형식의 메시지를 보냅니다 _Output .It accepts messages of type _Input and sends messages of type _Output. transformer클래스는 single_link_registry 모든 대상 블록에 대 한 링크 관리자로 지정 하 고 multi_link_registry 모든 소스 블록에 대 한 링크 관리자로 지정 합니다.The transformer class specifies single_link_registry as the link manager for any target blocks and multi_link_registry as the link manager for any source blocks. 따라서 개체에는 transformer 최대 하나의 대상과 개수에 제한 없이 원본을 사용할 수 있습니다.Therefore, a transformer object can have up to one target and an unlimited number of sources.

에서 파생 되는 클래스는 source_block propagate_to_any_targets, accept_message, reserve_message, consume_message, release_messageresume_propagation의 6 가지 메서드를 구현 해야 합니다.A class that derives from source_block must implement six methods: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, and resume_propagation. 에서 파생 되는 클래스는 target_block propagate_message 메서드를 구현 해야 하며 필요에 따라 send_message 메서드를 구현할 수 있습니다.A class that derives from target_block must implement the propagate_message method and can optionally implement the send_message method. 에서 파생 propagator_block 하는 것은 및에서 파생 하는 것과 기능적으로 동일 source_block target_block 합니다.Deriving from propagator_block is functionally equivalent to deriving from both source_block and target_block.

메서드는 들어오는 모든 메시지 propagate_to_any_targets 를 비동기적으로 처리 하 고 나가는 메시지를 전파 하기 위해 런타임에서 호출 됩니다.The propagate_to_any_targets method is called by the runtime to asynchronously or synchronously process any incoming messages and propagate out any outgoing messages. accept_message메서드는 메시지를 수락 하기 위해 대상 블록에 의해 호출 됩니다.The accept_message method is called by target blocks to accept messages. 와 같은 많은 메시지 블록 형식은 unbounded_buffer 메시지를 수신 하는 첫 번째 대상에만 메시지를 보냅니다.Many message block types, such as unbounded_buffer, send messages only to the first target that would receive it. 따라서 메시지의 소유권을 대상으로 전송 합니다.Therefore, it transfers ownership of the message to the target. Concurrency:: overwrite_buffer와 같은 다른 메시지 블록 형식은 각 대상 블록에 메시지를 제공 합니다.Other message block types, such as concurrency::overwrite_buffer, offer messages to each of its target blocks. 따라서는 overwrite_buffer 각 대상에 대해 메시지의 복사본을 만듭니다.Therefore, overwrite_buffer creates a copy of the message for each of its targets.

reserve_message,, consume_message 및 메서드는 메시지 release_message resume_propagation 블록이 메시지 예약에 참여할 수 있도록 합니다.The reserve_message, consume_message, release_message, and resume_propagation methods enable message blocks to participate in message reservation. 대상 블록은 reserve_message 메시지를 제공 하 고 나중에 사용할 수 있도록 메시지를 예약 해야 하는 경우 메서드를 호출 합니다.Target blocks call the reserve_message method when they are offered a message and have to reserve the message for later use. 대상 블록이 메시지를 예약 하 고 나면 메서드를 호출 하 여 consume_message 해당 메시지를 사용 하거나 메서드를 호출 하 여 예약을 취소할 수 있습니다 release_message .After a target block reserves a message, it can call the consume_message method to consume that message or the release_message method to cancel the reservation. accept_message메서드와 마찬가지로의 구현은 consume_message 메시지 소유권을 양도 하거나 메시지 복사본을 반환할 수 있습니다.As with the accept_message method, the implementation of consume_message can either transfer ownership of the message or return a copy of the message. 대상 블록이 예약 된 메시지를 사용 하거나 해제 한 후에는 런타임에서 메서드를 호출 합니다 resume_propagation .After a target block either consumes or releases a reserved message, the runtime calls the resume_propagation method. 일반적으로이 메서드는 큐의 다음 메시지부터 시작 하 여 메시지 전파를 계속 합니다.Typically, this method continues message propagation, starting with the next message in the queue.

런타임은 메서드를 호출 propagate_message 하 여 다른 블록에서 현재로 메시지를 비동기적으로 전송 합니다.The runtime calls the propagate_message method to asynchronously transfer a message from another block to the current one. send_message메서드는 propagate_message 비동기 대신 동기적으로 대상 블록에 메시지를 보내는 점을 제외 하 고는와 유사 합니다.The send_message method resembles propagate_message, except that it synchronously, instead of asynchronously, sends the message to the target blocks. 의 기본 구현은 send_message 들어오는 모든 메시지를 거부 합니다.The default implementation of send_message rejects all incoming messages. 메시지에서 대상 블록과 연결 된 선택적 필터 함수를 전달 하지 않는 경우 런타임은 이러한 메서드 중 하나를 호출 하지 않습니다.The runtime does not call either of these methods if the message does not pass the optional filter function that is associated with the target block. 메시지 필터에 대 한 자세한 내용은 비동기 메시지 블록을 참조 하세요.For more information about message filters, see Asynchronous Message Blocks.

[맨 위로이동][Top]

Priority_buffer 클래스 정의Defining the priority_buffer Class

priority_buffer클래스는 들어오는 메시지를 우선 순위에 따라 먼저 정렬 한 다음 메시지를 받은 순서 대로 정렬 하는 사용자 지정 메시지 블록 형식입니다.The priority_buffer class is a custom message block type that orders incoming messages first by priority, and then by the order in which messages are received. priority_buffer클래스는 메시지 큐를 포함 하기 때문에 concurrency:: unbounded_buffer 클래스와 비슷하며, 소스 및 대상 메시지 블록 역할을 모두 수행 하 고 여러 원본과 대상을 모두 포함할 수 있기 때문입니다.The priority_buffer class resembles the concurrency::unbounded_buffer class because it holds a queue of messages, and also because it acts as both a source and a target message block and can have both multiple sources and multiple targets. 그러나는 unbounded_buffer 메시지 전파가 원본에서 메시지를 수신 하는 순서 대로만 메시지를 전파 합니다.However, unbounded_buffer bases message propagation only on the order in which it receives messages from its sources.

priority_buffer클래스는 및 요소를 포함 하는 std::tuple 형식의 메시지를 받습니다 PriorityType Type .The priority_buffer class receives messages of type std::tuple that contain PriorityType and Type elements. PriorityType각 메시지의 우선 순위를 보유 하는 형식을 참조 합니다. Type메시지의 데이터 부분을 참조 합니다.PriorityType refers to the type that holds the priority of each message; Type refers to the data portion of the message. priority_buffer클래스는 형식의 메시지를 보냅니다 Type .The priority_buffer class sends messages of type Type. priority_buffer또한 클래스는 들어오는 메시지에 대 한 std::p riority_queue 개체와 보내는 메시지에 대 한 std::queue 개체 라는 두 개의 메시지 큐를 관리 합니다.The priority_buffer class also manages two message queues: a std::priority_queue object for incoming messages and a std::queue object for outgoing messages. 메시지를 우선 순위별로 정렬 하는 것은 priority_buffer 개체가 여러 메시지를 동시에 받거나 소비자가 메시지를 읽기 전에 여러 메시지를 수신 하는 경우에 유용 합니다.Ordering messages by priority is useful when a priority_buffer object receives multiple messages simultaneously or when it receives multiple messages before any messages are read by consumers.

에서 파생 되는 클래스에서 구현 해야 하는 일곱 가지 메서드 외에 propagator_block priority_buffer 도 클래스는 및 메서드를 재정의 합니다 link_target_notification send_message .In addition to the seven methods that a class that derives from propagator_block must implement, the priority_buffer class also overrides the link_target_notification and send_message methods. priority_buffer또한 클래스는 두 개의 public 도우미 메서드인 enqueue 및를 정의 하 dequeue 고 개인 도우미 메서드인를 정의 propagate_priority_order 합니다.The priority_buffer class also defines two public helper methods, enqueue and dequeue, and a private helper method, propagate_priority_order.

다음 절차에서는 클래스를 구현 하는 방법을 설명 합니다 priority_buffer .The following procedure describes how to implement the priority_buffer class.

Priority_buffer 클래스를 정의 하려면To define the priority_buffer class

  1. C + + 헤더 파일을 만들고 이름을로 priority_buffer.h 합니다.Create a C++ header file and name it priority_buffer.h. 또는 프로젝트의 일부인 기존 헤더 파일을 사용할 수 있습니다.Alternatively, you can use an existing header file that is part of your project.

  2. 에서 priority_buffer.h 다음 코드를 추가 합니다.In priority_buffer.h, add the following code.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std네임 스페이스에서 concurrency::message 개체에 대해 작동 하는 std:: lessstd:: 큰지 특수화를 정의 합니다.In the std namespace, define specializations of std::less and std::greater that act on concurrency::message objects.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    priority_buffer클래스는 개체 message 에 개체를 저장 priority_queue 합니다.The priority_buffer class stores message objects in a priority_queue object. 이러한 형식 특수화를 사용 하면 우선 순위 큐에서 우선 순위에 따라 메시지를 정렬할 수 있습니다.These type specializations enable the priority queue to sort messages according to their priority. 우선 순위는 개체의 첫 번째 요소입니다 tuple .The priority is the first element of the tuple object.

  4. concurrencyex네임 스페이스에서 클래스를 선언 합니다 priority_buffer .In the concurrencyex namespace, declare the priority_buffer class.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    priority_buffer 클래스는 propagator_block에서 파생됩니다.The priority_buffer class derives from propagator_block. 따라서 메시지를 보내고 받을 수 있습니다.Therefore, it can both send and receive messages. 클래스에는 priority_buffer 형식의 메시지를 수신 하는 대상이 여러 개 있을 수 있습니다 Type .The priority_buffer class can have multiple targets that receive messages of type Type. 또한 형식의 메시지를 보내는 여러 소스가 있을 수 있습니다 tuple<PriorityType, Type> .It can also have multiple sources that send messages of type tuple<PriorityType, Type>.

  5. private 클래스의 섹션에서 priority_buffer 다음 멤버 변수를 추가 합니다.In the private section of the priority_buffer class, add the following member variables.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    priority_queue개체는 들어오는 메시지를 보유 하며 queue 개체는 나가는 메시지를 보유 합니다.The priority_queue object holds incoming messages; the queue object holds outgoing messages. priority_buffer개체는 여러 메시지를 동시에 받을 수 있습니다. critical_section 개체는 입력 메시지의 큐에 대 한 액세스를 동기화 합니다.A priority_buffer object can receive multiple messages simultaneously; the critical_section object synchronizes access to the queue of input messages.

  6. 섹션에서 private 복사 생성자와 대입 연산자를 정의 합니다.In the private section, define the copy constructor and the assignment operator. 이렇게 하면 priority_queue 개체를 할당할 수 없습니다.This prevents priority_queue objects from being assignable.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. 섹션에서 public 많은 메시지 블록 형식에 공통적인 생성자를 정의 합니다.In the public section, define the constructors that are common to many message block types. 소멸자도 정의 합니다.Also define the destructor.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. 섹션에서 public 및 메서드를 정의 합니다 enqueue dequeue .In the public section, define the methods enqueue and dequeue. 이러한 도우미 메서드는 개체에서 메시지를 보내고 받을 수 있는 대체 방법을 제공 priority_buffer 합니다.These helper methods provide an alternative way to send messages to and receive messages from a priority_buffer object.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. 섹션에서 protected 메서드를 정의 합니다 propagate_to_any_targets .In the protected section, define the propagate_to_any_targets method.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    propagate_to_any_targets메서드는 입력 큐의 앞에 있는 메시지를 출력 큐에 전송 하 고 출력 큐의 모든 메시지를 전파 합니다.The propagate_to_any_targets method transfers the message that is at the front of the input queue to the output queue and propagates out all messages in the output queue.

  10. 섹션에서 protected 메서드를 정의 합니다 accept_message .In the protected section, define the accept_message method.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    대상 블록이 메서드를 호출 하는 경우 accept_message priority_buffer 클래스는 메시지의 소유권을 해당 메시지를 수락 하는 첫 번째 대상 블록으로 전송 합니다.When a target block calls the accept_message method, the priority_buffer class transfers ownership of the message to the first target block that accepts it. 이는의 동작과 유사 unbounded_buffer 합니다.(This resembles the behavior of unbounded_buffer.)

  11. 섹션에서 protected 메서드를 정의 합니다 reserve_message .In the protected section, define the reserve_message method.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    priority_buffer클래스를 사용 하면 제공 된 메시지 식별자가 큐 앞에 있는 메시지의 식별자와 일치 하는 경우 대상 블록이 메시지를 예약할 수 있습니다.The priority_buffer class permits a target block to reserve a message when the provided message identifier matches the identifier of the message that is at the front of the queue. 즉, priority_buffer 개체에서 추가 메시지를 아직 받지 않고 현재 메시지를 아직 전파 하지 않은 경우 대상에서 메시지를 예약할 수 있습니다.In other words, a target can reserve the message if the priority_buffer object has not yet received an additional message and has not yet propagated out the current one.

  12. 섹션에서 protected 메서드를 정의 합니다 consume_message .In the protected section, define the consume_message method.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    대상 블록은 consume_message 예약 된 메시지의 소유권을 전송 하기 위해를 호출 합니다.A target block calls consume_message to transfer ownership of the message that it reserved.

  13. 섹션에서 protected 메서드를 정의 합니다 release_message .In the protected section, define the release_message method.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    대상 블록은 release_message 를 호출 하 여 메시지에 대 한 예약을 취소 합니다.A target block calls release_message to cancel its reservation to a message.

  14. 섹션에서 protected 메서드를 정의 합니다 resume_propagation .In the protected section, define the resume_propagation method.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    resume_propagation대상 블록이 예약 된 메시지를 사용 하거나 해제 한 후 런타임에서를 호출 합니다.The runtime calls resume_propagation after a target block either consumes or releases a reserved message. 이 메서드는 출력 큐에 있는 모든 메시지를 전파 합니다.This method propagates out any messages that are in the output queue.

  15. 섹션에서 protected 메서드를 정의 합니다 link_target_notification .In the protected section, define the link_target_notification method.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    _M_pReservedFor멤버 변수는 기본 클래스인로 정의 됩니다 source_block .The _M_pReservedFor member variable is defined by the base class, source_block. 이 멤버 변수는 출력 큐의 앞에 있는 메시지에 대 한 예약을 보유 하는 대상 블록 (있는 경우)을 가리킵니다.This member variable points to the target block, if any, that is holding a reservation to the message that is at the front of the output queue. link_target_notification새 대상이 개체에 연결 되 면 런타임에서를 호출 합니다 priority_buffer .The runtime calls link_target_notification when a new target is linked to the priority_buffer object. 이 메서드는 예약을 보유 하 고 있는 대상이 없으면 출력 큐에 있는 모든 메시지를 전파 합니다.This method propagates out any messages that are in the output queue if no target is holding a reservation.

  16. 섹션에서 private 메서드를 정의 합니다 propagate_priority_order .In the private section, define the propagate_priority_order method.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    이 메서드는 출력 큐의 모든 메시지를 전파 합니다.This method propagates out all messages from the output queue. 대상 블록 중 하나가 메시지를 수락 하기 전까지 큐의 모든 메시지는 모든 대상 블록에 제공 됩니다.Every message in the queue is offered to every target block until one of the target blocks accepts the message. priority_buffer클래스는 보내는 메시지의 순서를 유지 합니다.The priority_buffer class preserves the order of the outgoing messages. 따라서이 메서드는 대상 블록에 다른 메시지를 제공 하기 전에 대상 블록에서 출력 큐의 첫 번째 메시지를 수락 해야 합니다.Therefore, the first message in the output queue must be accepted by a target block before this method offers any other message to the target blocks.

  17. 섹션에서 protected 메서드를 정의 합니다 propagate_message .In the protected section, define the propagate_message method.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    propagate_message메서드를 사용 하면 priority_buffer 클래스가 메시지 받는 사람 또는 대상으로 동작할 수 있습니다.The propagate_message method enables the priority_buffer class to act as a message receiver, or target. 이 메서드는 제공 된 소스 블록이 제공 하는 메시지를 받고 해당 메시지를 우선 순위 큐에 삽입 합니다.This method receives the message that is offered by the provided source block and inserts that message into the priority queue. propagate_message그런 다음 메서드는 모든 출력 메시지를 대상 블록으로 비동기적으로 보냅니다.The propagate_message method then asynchronously sends all output messages to the target blocks.

    Concurrency:: asend 함수를 호출할 때 또는 메시지 블록이 다른 메시지 블록에 연결 된 경우 런타임은이 메서드를 호출 합니다.The runtime calls this method when you call the concurrency::asend function or when the message block is connected to other message blocks.

  18. 섹션에서 protected 메서드를 정의 합니다 send_message .In the protected section, define the send_message method.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    메서드는와 send_message 유사 propagate_message 합니다.The send_message method resembles propagate_message. 그러나 출력 메시지를 비동기적이 아닌 동기적으로 보냅니다.However it sends the output messages synchronously instead of asynchronously.

    Concurrency:: send 함수를 호출 하는 경우와 같이 동기 전송 작업 중에 런타임은이 메서드를 호출 합니다.The runtime calls this method during a synchronous send operation, such as when you call the concurrency::send function.

클래스에는 priority_buffer 많은 메시지 블록 형식에서 일반적인 생성자 오버 로드가 포함 되어 있습니다.The priority_buffer class contains constructor overloads that are typical in many message block types. 일부 생성자 오버 로드는 concurrency:: Scheduler 또는 Concurrency:: ScheduleGroup 개체를 사용 합니다 .이 개체를 사용 하면 특정 작업 스케줄러에서 메시지 블록을 관리할 수 있습니다.Some constructor overloads take concurrency::Scheduler or concurrency::ScheduleGroup objects, which enable the message block to be managed by a specific task scheduler. 다른 생성자 오버 로드는 필터 함수를 사용 합니다.Other constructor overloads take a filter function. 필터 함수를 사용 하면 메시지 블록에서 페이로드를 기준으로 메시지를 수락 하거나 거부할 수 있습니다.Filter functions enable message blocks to accept or reject a message on the basis of its payload. 메시지 필터에 대 한 자세한 내용은 비동기 메시지 블록을 참조 하세요.For more information about message filters, see Asynchronous Message Blocks. 작업 스케줄러에 대 한 자세한 내용은 작업 스케줄러를 참조 하세요.For more information about task schedulers, see Task Scheduler.

이 클래스는 메시지를 priority_buffer 우선 순위별로 정렬 한 다음 메시지를 받은 순서 대로 정렬 하므로이 클래스는 concurrency:: asend 함수를 호출 하거나 메시지 블록이 다른 메시지 블록에 연결 된 경우와 같이 메시지를 비동기적으로 받을 때 가장 유용 합니다.Because the priority_buffer class orders messages by priority and then by the order in which messages are received, this class is most useful when it receives messages asynchronously, for example, when you call the concurrency::asend function or when the message block is connected to other message blocks.

[맨 위로이동][Top]

전체 예제The Complete Example

다음 예제에서는 클래스의 전체 정의를 보여 줍니다 priority_buffer .The following example shows the complete definition of the priority_buffer class.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

다음 예제에서는 asend 개체에 대해 많은 및 concurrency:: receive 작업을 동시에 수행 합니다 priority_buffer .The following example concurrently performs a number of asend and concurrency::receive operations on a priority_buffer object.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

이 예제에서는 다음 샘플 출력을 생성 합니다.This example produces the following sample output.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

priority_buffer클래스는 우선 순위에 따라 메시지를 먼저 정렬 한 다음 메시지를 받은 순서 대로 정렬 합니다.The priority_buffer class orders messages first by priority and then by the order in which it receives messages. 이 예제에서 우선 순위가 더 높은 메시지는 큐 앞에 삽입 됩니다.In this example, messages with greater numerical priority are inserted towards the front of the queue.

[맨 위로이동][Top]

코드 컴파일Compiling the Code

예제 코드를 복사 하 여 Visual Studio 프로젝트에 붙여넣거나, 클래스의 정의를 라는 파일 priority_buffer priority_buffer.h 의 및 테스트 프로그램에 붙여넣은 priority_buffer.cpp 후 Visual studio 명령 프롬프트 창에서 다음 명령을 실행 합니다.Copy the example code and paste it in a Visual Studio project, or paste the definition of the priority_buffer class in a file that is named priority_buffer.h and the test program in a file that is named priority_buffer.cpp and then run the following command in a Visual Studio Command Prompt window.

cl.exe/EHsc priority_buffercl.exe /EHsc priority_buffer.cpp

참고 항목See also

동시성 런타임 연습Concurrency Runtime Walkthroughs
비동기 메시지 블록Asynchronous Message Blocks
메시지 전달 함수Message Passing Functions