Share via


연습: 사용자 지정 메시지 블록 만들기

이 문서에서는 들어오는 메시지를 우선 순위별로 정렬하는 사용자 지정 메시지 블록 유형을 만드는 방법을 설명합니다.

기본 제공 메시지 블록 형식은 광범위한 기능을 제공하지만 사용자 고유의 메시지 블록 유형을 만들고 애플리케이션의 요구 사항에 맞게 사용자 지정할 수 있습니다. 비동기 에이전트 라이브러리에서 제공하는 기본 제공 메시지 블록 형식에 대한 설명은 비동기 메시지 블록을 참조 하세요.

필수 조건

이 연습을 시작하기 전에 다음 문서를 읽어보세요.

섹션

이 연습에는 다음과 같은 섹션이 있습니다.

사용자 지정 메시지 블록 디자인

메시지 블록은 메시지를 보내고 받는 동작에 참여합니다. 메시지를 보내는 메시지 블록을 원본 블록이라고 합니다. 메시지를 받는 메시지 블록을 대상 블록이라고 합니다. 메시지를 보내고 받는 메시지 블록을 전파자 블록이라고 합니다. 에이전트 라이브러리는 추상 클래스 동시성::ISource 를 사용하여 원본 블록을 나타내고 추상 클래스 동시성::ITarget 을 사용하여 대상 블록을 나타냅니다. 원본으로 작동하는 메시지 블록 형식은 파생 ISource됩니다. 대상 역할을 하는 메시지 블록 형식은 파생 ITarget됩니다.

메시지 블록 형식을 ISourceITarget직접 파생시킬 수 있지만 에이전트 라이브러리는 모든 메시지 블록 형식에 공통적인 기능(예: 오류 처리 및 동시성 안전 방식으로 메시지 블록 연결)을 수행하는 세 가지 기본 클래스를 정의합니다. 동시성::source_block 클래스는 ISource 파생되어 다른 블록으로 메시지를 보냅니다. 동시성::target_block 클래스는 다른 블록에서 ITarget 파생되고 메시지를 받습니다. concurrency::p ropagator_block 클래스는 파생 ISource 되어 ITarget 다른 블록으로 메시지를 보내고 다른 블록에서 메시지를 받습니다. 메시지 블록의 동작에 집중할 수 있도록 이러한 세 가지 기본 클래스를 사용하여 인프라 세부 정보를 처리하는 것이 좋습니다.

target_block클래스는 source_block원본 블록과 propagator_block 대상 블록 간의 연결 또는 링크를 관리하는 형식과 메시지 처리 방법을 관리하는 형식에서 매개 변수가 있는 템플릿입니다. 에이전트 라이브러리는 링크 관리, 동시성::single_link_registry 및 동시성::multi_link_registry 수행하는 두 가지 형식을 정의합니다. 이 single_link_registry 클래스를 사용하면 메시지 블록을 하나의 원본 또는 하나의 대상에 연결할 수 있습니다. 이 multi_link_registry 클래스를 사용하면 메시지 블록을 여러 원본 또는 여러 대상에 연결할 수 있습니다. 에이전트 라이브러리는 메시지 관리, 동시성::ordered_message_processor 수행하는 하나의 클래스를 정의합니다. 이 ordered_message_processor 클래스를 사용하면 메시지 블록이 메시지를 받는 순서대로 메시지를 처리할 수 있습니다.

메시지 블록이 원본 및 대상과 어떻게 관련되어 있는지 더 잘 이해하려면 다음 예제를 고려하세요. 이 예제에서는 동시성::변환기 클래스의 선언을 보여줍니다.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

클래스는 transformer 파생 propagator_block되므로 소스 블록 및 대상 블록으로 작동합니다. 형식의 메시지를 수락하고 형식 _Input_Output의 메시지를 보냅니다. 클래스는 transformer 모든 대상 블록에 대한 링크 관리자로 지정하고 multi_link_registry 소스 블록에 대한 링크 관리자로 지정 single_link_registry 합니다. 따라서 개체에는 transformer 최대 하나의 대상과 무제한의 원본이 있을 수 있습니다.

파생 source_block 되는 클래스는 propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message resume_propagation 6개의 메서드를 구현해야 합니다. 파생 target_block 되는 클래스는 propagate_message 메서드를 구현해야 하며 필요에 따라 send_message 메서드를 구현할 수 있습니다. propagator_block 파생은 기능적으로 둘 다 source_blocktarget_block에서 파생되는 것과 동일합니다.

propagate_to_any_targets 메서드는 런타임에서 들어오는 메시지를 비동기적으로 또는 동기적으로 처리하고 나가는 메시지를 전파하기 위해 호출됩니다. 이 accept_message 메서드는 대상 블록에서 메시지를 수락하기 위해 호출됩니다. 많은 메시지 블록 유형(예: unbounded_buffer메시지를 수신할 첫 번째 대상에만 메시지를 보냅니다). 따라서 메시지의 소유권을 대상으로 전송합니다. 동시성::overwrite_buffer 같은 다른 메시지 블록 형식은 각 대상 블록에 메시지를 제공합니다. 따라서 overwrite_buffer 각 대상에 대한 메시지의 복사본을 만듭니다.

reserve_message, consume_message, release_messageresume_propagation 메서드를 사용하면 메시지 블록이 메시지 예약에 참여할 수 있습니다. 대상 블록은 메시지가 제공되면 메서드를 호출 reserve_message 하고 나중에 사용할 수 있도록 메시지를 예약해야 합니다. 대상 블록이 메시지를 예약한 후 해당 메시지를 사용하는 메서드 또는 release_message 예약을 취소하는 메서드를 호출 consume_message 할 수 있습니다. 메서드와 accept_message 마찬가지로 구현은 메시지의 consume_message 소유권을 이전하거나 메시지의 복사본을 반환할 수 있습니다. 대상 블록이 예약된 메시지를 사용하거나 해제한 후 런타임은 메서드를 호출합니다 resume_propagation . 일반적으로 이 메서드는 큐의 다음 메시지부터 시작하여 메시지 전파를 계속합니다.

런타임은 메서드를 propagate_message 호출하여 다른 블록에서 현재 블록으로 메시지를 비동기적으로 전송합니다. 메서드는 send_messagepropagate_message비동기식이 아니라 동기적으로 대상 블록에 메시지를 보내는 것을 제외하고 유사합니다. 기본 구현은 send_message 들어오는 모든 메시지를 거부합니다. 메시지가 대상 블록과 연결된 선택적 필터 함수를 전달하지 않으면 런타임에서 이러한 메서드 중 하나를 호출하지 않습니다. 메시지 필터에 대한 자세한 내용은 비동기 메시지 블록을 참조 하세요.

[맨 위로 이동]

priority_buffer 클래스 정의

클래스는 priority_buffer 들어오는 메시지를 우선 순위에 따라 정렬한 다음 메시지를 받는 순서에 따라 정렬하는 사용자 지정 메시지 블록 형식입니다. 클래스는 priority_buffer 메시지 큐를 보유하고 있으며 원본 및 대상 메시지 블록의 역할을 하며 여러 원본과 여러 대상을 모두 가질 수 있기 때문에 동시성::unbounded_buffer 클래스와 유사합니다. 그러나 unbounded_buffer 원본에서 메시지를 수신하는 순서에 대해서만 메시지 전파를 기반으로 합니다.

클래스는 priority_buffer 포함된 PriorityType 요소와 Type std::tuple 형식의 메시지를 받습니다. PriorityType 는 각 메시지의 우선 순위를 보유하는 형식을 나타냅니다. Type 는 메시지의 데이터 부분을 나타냅니다. 클래스는 priority_buffer 형식 Type의 메시지를 보냅니다. 또한 이 클래스는 priority_buffer 들어오는 메시지에 대한 std::p riority_queue 개체와 나가는 메시지의 std::queue 개체라는 두 개의 메시지 큐 를 관리합니다. 우선 순위별로 메시지 순서 지정은 개체가 priority_buffer 동시에 여러 메시지를 수신하거나 소비자가 메시지를 읽기 전에 여러 메시지를 수신할 때 유용합니다.

파생 propagator_block 되는 클래스가 구현 priority_buffer 해야 하는 7개의 메서드 외에도 클래스는 및 send_message 메서드를 재정의 link_target_notification 합니다. 또한 클래스는 priority_buffer 두 개의 공용 도우미 메서드 enqueuedequeue프라이빗 도우미 메서드 propagate_priority_order를 정의합니다.

다음 절차에서는 클래스를 구현 priority_buffer 하는 방법을 설명합니다.

priority_buffer 클래스를 정의하려면

  1. C++ 헤더 파일을 만들고 이름을 지정합니다 priority_buffer.h. 또는 프로젝트의 일부인 기존 헤더 파일을 사용할 수 있습니다.

  2. 에서 priority_buffer.h다음 코드를 추가합니다.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. 네임스페이 std 스에서 동시성::message 개체에 대해 작동하는 std::lessstd::greater의 특수화를 정의합니다.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    클래스는 priority_buffer 개체에 개체를 priority_queue 저장 message 합니다. 이러한 형식 특수화를 사용하면 우선 순위 큐가 우선 순위에 따라 메시지를 정렬할 수 있습니다. 우선 순위는 개체의 첫 번째 요소입니다 tuple .

  4. 네임스페이 concurrencyex 스에서 클래스를 선언합니다 priority_buffer .

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    priority_buffer 클래스는 propagator_block에서 파생됩니다. 따라서 메시지를 보내고 받을 수 있습니다. 클래스에는 priority_buffer 형식 Type의 메시지를 받는 여러 대상이 있을 수 있습니다. 형식의 메시지를 보내는 여러 원본을 가질 수도 있습니다 tuple<PriorityType, Type>.

  5. 클래스의 privatepriority_buffer 섹션에서 다음 멤버 변수를 추가합니다.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    개체는 priority_queue 들어오는 메시지를 보유합니다. 개체는 queue 나가는 메시지를 보유합니다. 개체는 priority_buffer 동시에 여러 메시지를 받을 수 있습니다. 개체는 critical_section 입력 메시지 큐에 대한 액세스를 동기화합니다.

  6. private 섹션에서 복사 생성자 및 대입 연산자를 정의합니다. 이렇게 하면 개체를 priority_queue 할당할 수 없습니다.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public 이 섹션에서는 많은 메시지 블록 형식에 공통적인 생성자를 정의합니다. 또한 소멸자를 정의합니다.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. public 섹션에서 메서드 enqueue 를 정의하고 dequeue. 이러한 도우미 메서드는 개체에서 priority_buffer 메시지를 보내고 받는 다른 방법을 제공합니다.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. protected 섹션에서 메서드를 정의합니다propagate_to_any_targets.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    이 메서드는 propagate_to_any_targets 입력 큐의 맨 앞에 있는 메시지를 출력 큐로 전송하고 출력 큐의 모든 메시지를 전파합니다.

  10. protected 섹션에서 메서드를 정의합니다accept_message.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    대상 블록이 메서드를 accept_message 호출하면 클래스는 priority_buffer 메시지의 소유권을 수락하는 첫 번째 대상 블록으로 전송합니다. (이 동작은 .)의 unbounded_buffer동작과 유사합니다.

  11. protected 섹션에서 메서드를 정의합니다reserve_message.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    클래스는 priority_buffer 제공된 메시지 식별자가 큐 앞에 있는 메시지의 식별자와 일치하는 경우 대상 블록에서 메시지를 예약할 수 있도록 허용합니다. 즉, 개체가 아직 추가 메시지를 받지 못하고 현재 메시지를 전파하지 않은 경우 priority_buffer 대상은 메시지를 예약할 수 있습니다.

  12. protected 섹션에서 메서드를 정의합니다consume_message.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    대상 블록은 예약된 메시지의 소유권을 전송하기 위해 호출 consume_message 합니다.

  13. protected 섹션에서 메서드를 정의합니다release_message.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    대상 블록은 메시지에 대한 예약을 취소하기 위해 호출 release_message 합니다.

  14. protected 섹션에서 메서드를 정의합니다resume_propagation.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    대상 블록이 예약된 메시지를 사용하거나 해제한 후 런타임이 호출 resume_propagation 됩니다. 이 메서드는 출력 큐에 있는 모든 메시지를 전파합니다.

  15. protected 섹션에서 메서드를 정의합니다link_target_notification.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    _M_pReservedFor 멤버 변수는 기본 클래스source_block에 의해 정의됩니다. 이 멤버 변수는 출력 큐의 맨 앞에 있는 메시지에 대한 예약을 보유하는 대상 블록(있는 경우)을 가리킵니다. 런타임은 새 대상이 개체에 연결 priority_buffer 되면 호출 link_target_notification 됩니다. 이 메서드는 예약을 보유하고 있는 대상이 없는 경우 출력 큐에 있는 모든 메시지를 전파합니다.

  16. private 섹션에서 메서드를 정의합니다propagate_priority_order.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    이 메서드는 출력 큐에서 모든 메시지를 전파합니다. 큐의 모든 메시지는 대상 블록 중 하나가 메시지를 수락할 때까지 모든 대상 블록에 제공됩니다. 이 클래스는 priority_buffer 보내는 메시지의 순서를 유지합니다. 따라서 이 메서드가 대상 블록에 다른 메시지를 제공하려면 먼저 대상 블록에서 출력 큐의 첫 번째 메시지를 수락해야 합니다.

  17. protected 섹션에서 메서드를 정의합니다propagate_message.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    propagate_message 메서드를 사용하면 클래스가 priority_buffer 메시지 수신자 또는 대상으로 작동할 수 있습니다. 이 메서드는 제공된 소스 블록에서 제공하는 메시지를 수신하고 해당 메시지를 우선 순위 큐에 삽입합니다. 그런 다음 메서드는 propagate_message 모든 출력 메시지를 대상 블록에 비동기적으로 보냅니다.

    런타임은 동시성::asend 함수를 호출하거나 메시지 블록이 다른 메시지 블록에 연결된 경우 이 메서드를 호출합니다.

  18. protected 섹션에서 메서드를 정의합니다send_message.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    메서드는 send_message 다음과 유사합니다 propagate_message. 그러나 비동기식이 아닌 동기적으로 출력 메시지를 보냅니다.

    런타임은 동시성::send 함수를 호출하는 경우와 같이 동기 송신 작업 중에 이 메서드를 호출합니다.

클래스에는 priority_buffer 많은 메시지 블록 형식에서 일반적인 생성자 오버로드가 포함됩니다. 일부 생성자 오버로드는 동시성::Scheduler 또는 동시성::ScheduleGroup 개체를 사용합니다. 이를 통해 메시지 블록을 특정 작업 스케줄러에서 관리할 수 있습니다. 다른 생성자 오버로드는 필터 함수를 사용합니다. 필터 함수를 사용하면 메시지 블록이 페이로드를 기반으로 메시지를 수락하거나 거부할 수 있습니다. 메시지 필터에 대한 자세한 내용은 비동기 메시지 블록을 참조 하세요. 작업 스케줄러에 대한 자세한 내용은 작업 스케줄러를 참조 하세요.

클래스는 priority_buffer 메시지를 우선 순위에 따라 정렬한 다음 메시지를 받는 순서에 따라 정렬하기 때문에 이 클래스는 메시지를 비동기적으로 수신할 때 가장 유용합니다(예: 동시성::asend 함수를 호출하거나 메시지 블록이 다른 메시지 블록에 연결된 경우).

[맨 위로 이동]

전체 예제

다음 예제에서는 클래스의 priority_buffer 전체 정의를 보여줍니다.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

다음 예제에서는 개체에 대해 여러 asend가지 동시성::receive 작업을 동시에 수행합니다 priority_buffer .

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

이 예제에서는 다음 샘플 출력을 생성합니다.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

클래스는 priority_buffer 먼저 우선 순위에 따라 메시지를 정렬한 다음 메시지를 받는 순서로 메시지를 정렬합니다. 이 예제에서는 숫자 우선 순위가 더 큰 메시지가 큐의 맨 앞으로 삽입됩니다.

[맨 위로 이동]

코드 컴파일

예제 코드를 복사하여 Visual Studio 프로젝트에 붙여넣거나 이름이 지정된 priority_buffer.h 파일에 클래스 정의 priority_buffer 와 테스트 프로그램을 priority_buffer.cpp 붙여넣은 다음 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행합니다.

cl.exe /EHsc priority_buffer.cpp

참고 항목

동시성 런타임 연습
비동기 메시지 블록
메시지 전달 함수