Share via


逐步解說:建立自訂訊息區

本檔說明如何建立自訂消息區塊類型,以依優先順序排序傳入訊息。

雖然內建的訊息區塊類型提供廣泛的功能,但您可以建立自己的訊息區塊類型並加以自訂,以符合應用程式的需求。 如需非同步代理程式程式庫所提供之內建消息塊類型的描述,請參閱 非同步消息塊

必要條件

開始本逐步解說之前,請先閱讀下列檔:

區段

本逐步解說包含下列各節:

設計自訂消息塊

消息塊會參與傳送和接收訊息的動作。 傳送訊息的訊息區塊稱為 來源區塊 。 接收訊息的訊息區塊稱為 目標區塊 。 傳送和接收訊息的訊息區塊稱為 傳播器區塊 。 Agents Library 會使用抽象類別 concurrency::ISource 來代表來源區塊,而抽象類別 並行::ITarget 則代表目標區塊。 做為來源的訊息區塊類型衍生自 ISource ;做為目標的訊息區塊類型衍生自 ITarget

雖然您可以直接從 ISourceITarget 衍生訊息區塊類型,但代理程式程式庫會定義三個基類,這些基類會執行所有訊息區塊類型通用的大部分功能,例如,以並行安全的方式處理錯誤,並將訊息區塊連接在一起。 concurrency ::source_block 類別衍生自 ISource ,並將訊息傳送至其他區塊。 concurrency ::target_block 類別衍生自 ITarget ,並從其他區塊接收訊息。 concurrency::p ropagator_block 類別衍生自 ISourceITarget ,並將訊息傳送至其他區塊,並接收來自其他區塊的訊息。 我們建議您使用這三個基類來處理基礎結構詳細資料,以便專注于訊息區塊的行為。

source_blocktarget_blockpropagator_block 類別是範本,這些範本會參數化于管理來源和目標區塊之間的連線或連結的類型上,以及管理訊息處理方式的類型。 代理程式程式庫會定義兩種執行連結管理的類型: 並行::single_link_registry 並行::multi_link_registry 。 類別 single_link_registry 可讓訊息區塊連結至一個來源或一個目標。 類別 multi_link_registry 可讓訊息區塊連結至多個來源或多個目標。 Agents Library 會定義一個類別來執行訊息管理、 並行::ordered_message_processor 。 類別 ordered_message_processor 可讓訊息區塊按照接收訊息的順序來處理訊息。

若要進一步瞭解訊息區塊與其來源和目標的關係,請考慮下列範例。 此範例顯示並行::transformer 類別的 宣告。

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

類別 transformer 衍生自 propagator_block ,因此可作為來源區塊和目標區塊。 它會接受 類型 _Input 為 的訊息,並傳送 類型 _Output 為 的訊息。 類別 transformersingle_link_registry 指定為任何目標區塊的連結管理員,以及 multi_link_registry 作為任何來源區塊的連結管理員。 因此, transformer 物件最多可以有一個目標和無限數目的來源。

衍生自 source_block 的類別必須實作六種方法: propagate_to_any_targets accept_message 、reserve_message、 consume_message release_message resume_propagation 。 衍生自 target_block 類別必須實作 propagate_message 方法,而且可以選擇性地實 作 send_message 方法。 衍生自 propagator_block 的功能相當於從 和 target_block 衍生。 source_block

運行 propagate_to_any_targets 時間會呼叫 方法,以非同步或同步方式處理任何傳入訊息,並傳播任何傳出訊息。 目標 accept_message 區塊會呼叫 方法以接受訊息。 許多訊息區塊類型,例如 unbounded_buffer ,只會將訊息傳送至接收它的第一個目標。 因此,它會將訊息的擁有權轉移至目標。 其他訊息區塊類型,例如 並行::overwrite_buffer ,會提供訊息給其每個目標區塊。 因此, overwrite_buffer 為每個目標建立訊息的複本。

reserve_messageconsume_messagerelease_messageresume_propagation 方法可讓訊息區塊參與訊息保留。 目標區塊會在提供訊息時呼叫 reserve_message 方法,而且必須保留訊息以供稍後使用。 在目標區塊保留訊息之後,它可以呼叫 consume_message 方法來取用該訊息或 release_message 取消保留的方法。 如同 accept_message 方法,的 consume_message 實作可以傳輸訊息的擁有權,或傳回訊息的複本。 在目標區塊取用或釋放保留訊息之後,執行時間會呼叫 resume_propagation 方法。 一般而言,此方法會繼續傳播訊息,從佇列中的下一個訊息開始。

執行時間會呼叫 propagate_message 方法,以非同步方式將訊息從另一個區區塊轉送至目前的區塊。 方法 send_message 類似 propagate_message ,不同之處在于它會以同步方式,而不是以非同步方式將訊息傳送至目標區塊。 的預設實作 send_message 會拒絕所有傳入訊息。 如果訊息未傳遞與目標區塊相關聯的選擇性篩選函式,則執行時間不會呼叫上述任一方法。 如需訊息篩選的詳細資訊,請參閱 非同步消息塊

[靠上]

定義 priority_buffer 類別

類別 priority_buffer 是自訂訊息區塊類型,會先依優先順序排序傳入訊息,然後依接收訊息的順序排序。 類別 priority_buffer 類似于 concurrency::unbounded_buffer 類別,因為它保存訊息佇列,也因為它同時作為來源和目標消息塊,而且可以同時擁有多個來源和多個目標。 不過, unbounded_buffer 只會根據從其來源接收訊息的順序,以訊息傳播為基礎。

類別 priority_buffer 會接收 std:: tuple 類型的訊息,其中包含 PriorityTypeType 元素。 PriorityType 是指保留每個訊息優先順序的類型; Type 是指訊息的資料部分。 類別 priority_buffer 會傳送 類型 Type 為 的訊息。 類別 priority_buffer 也會管理兩個 訊息佇列:傳入訊息的 std::p riority_queue 物件,以及傳出訊息的 std:: queue 物件。 當物件同時接收多個訊息,或在取用者讀取任何訊息之前收到多個訊息時,依優先順序排序訊息會很有用 priority_buffer

除了衍生自 propagator_block 的類別必須實作的七種方法之外,類別 priority_buffer 也會覆寫 link_target_notificationsend_message 方法。 類別 priority_buffer 也會定義兩個公用協助程式方法, enqueue 以及 dequeue 和 私用協助程式方法 propagate_priority_order

下列程式描述如何實作 priority_buffer 類別。

定義priority_buffer類別

  1. 建立 C++ 標頭檔並將其命名為 priority_buffer.h 。 或者,您可以使用屬於專案一部分的現有標頭檔。

  2. 在 中 priority_buffer.h ,新增下列程式碼。

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. 在 命名空間中 std ,定義 std::less std::greater 在 並行:: message 物件上作用的特製化。

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    類別 priority_buffer 會將 message 物件儲存在 物件中 priority_queue 。 這些類型特製化可讓優先順序佇列根據其優先順序來排序訊息。 優先順序是 物件的第一個專案 tuple

  4. 在 命名空間中 concurrencyex ,宣告 類別 priority_buffer

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    priority_buffer 類別衍生自 propagator_block。 因此,它可以同時傳送和接收訊息。 類別 priority_buffer 可以有多個目標可接收 類型 Type 為 的訊息。 它也可以有多個來源可傳送 類型 tuple<PriorityType, Type> 為 的訊息。

  5. 在 類別的 priority_bufferprivate 段中,新增下列成員變數。

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    物件 priority_queue 會保存傳入訊息; queue 物件會保存傳出訊息。 priority_buffer物件可以同時接收多個訊息; critical_section 物件會同步處理輸入訊息佇列的存取。

  6. 在 區 private 段中,定義複製建構函式和指派運算子。 這可防止 priority_queue 物件被指派。

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. 在 區 public 段中,定義許多訊息區塊類型通用的建構函式。 同時定義解構函式。

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. 在 區 public 段中,定義 方法和 enqueuedequeue 。 這些協助程式方法提供將訊息傳送至物件及接收訊息的 priority_buffer 替代方式。

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. 在 區 protected 段中,定義 propagate_to_any_targets 方法。

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    方法會將 propagate_to_any_targets 位於輸入佇列前端的訊息傳送至輸出佇列,並傳播輸出佇列中的所有訊息。

  10. 在 區 protected 段中,定義 accept_message 方法。

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    當目標區塊呼叫 accept_message 方法時, priority_buffer 類別會將訊息的擁有權傳送至接受訊息的第一個目標區塊。 (這類似于 的行為 unbounded_buffer

  11. 在 區 protected 段中,定義 reserve_message 方法。

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    類別 priority_buffer 允許目標區塊在提供的訊息識別碼符合佇列前端的訊息識別碼時保留訊息。 換句話說,如果 priority_buffer 物件尚未收到其他訊息,且尚未傳播出目前訊息,目標可以保留訊息。

  12. 在 區 protected 段中,定義 consume_message 方法。

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    目標區塊會呼叫 consume_message ,以傳輸其保留之訊息的擁有權。

  13. 在 區 protected 段中,定義 release_message 方法。

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    目標區塊會呼叫 release_message ,以取消對訊息的保留。

  14. 在 區 protected 段中,定義 resume_propagation 方法。

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    執行時間會在目標區塊取用或釋放保留訊息之後呼叫 resume_propagation 。 這個方法會傳播輸出佇列中的任何訊息。

  15. 在 區 protected 段中,定義 link_target_notification 方法。

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    成員 _M_pReservedFor 變數是由基類 source_block 所定義。 這個成員變數指向目標區塊,如果有的話,該區塊會保留輸出佇列前端的訊息。 當新的目標連結至 priority_buffer 物件時,執行時間會呼叫 link_target_notification 。 如果沒有任何目標保留,這個方法會傳播輸出佇列中的任何訊息。

  16. 在 區 private 段中,定義 propagate_priority_order 方法。

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    此方法會從輸出佇列傳播所有訊息。 佇列中的每個訊息都會提供給每個目標區塊,直到其中一個目標區塊接受訊息為止。 類別 priority_buffer 會保留傳出訊息的順序。 因此,在此方法將任何其他訊息提供給目標區塊之前,輸出佇列中的第一個訊息必須由目標區塊接受。

  17. 在 區 protected 段中,定義 propagate_message 方法。

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    propagate_message方法可讓 priority_buffer 類別做為訊息接收者或目標。 此方法會接收所提供來源區塊所提供的訊息,並將該訊息插入優先順序佇列中。 方法 propagate_message 接著會以非同步方式將所有輸出訊息傳送至目標區塊。

    當您呼叫 concurrency::asend 函式或訊息區塊連接到其他消息區塊時,執行時間會呼叫 這個方法。

  18. 在 區 protected 段中,定義 send_message 方法。

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    方法 send_message 類似 propagate_message 。 不過,它會以同步方式傳送輸出訊息,而不是以非同步方式傳送。

    執行時間會在同步傳送作業期間呼叫這個方法,例如當您呼叫 concurrency::send 函式時。

類別 priority_buffer 包含許多訊息區塊類型中典型的建構函式多載。 某些建構函式多載會採用 並行::排程器 並行::ScheduleGroup 物件,這可讓訊息區塊由特定工作排程器管理。 其他建構函式多載會採用篩選函式。 篩選函式可讓訊息區塊根據其承載接受或拒絕訊息。 如需訊息篩選的詳細資訊,請參閱 非同步消息塊 。 如需工作排程器的詳細資訊,請參閱 工作排程器

priority_buffer因為類別會依優先順序排序訊息,然後依接收訊息的順序排序,因此當這個類別以非同步方式接收訊息時最有用,例如,當您呼叫 並行函式::asend 函式或訊息區塊連線到其他消息區塊時。

[靠上]

完整範例

下列範例顯示 類別的完整定義 priority_buffer

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

下列範例會同時對 物件執行一些 asend 並行::receive 作業 priority_buffer

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

此範例會產生下列範例輸出。

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

類別 priority_buffer 會先依優先順序排序訊息,然後依接收訊息的順序排序訊息。 在此範例中,具有較大數值優先順序的訊息會插入佇列前端。

[靠上]

編譯程式碼

複製範例程式碼,並將其貼到 Visual Studio 專案中,或將 類別的定義 priority_buffer 貼到名為 的檔案中,並將測試程式貼到名為 priority_buffer.hpriority_buffer.cpp 的檔案中,然後在 Visual Studio 命令提示字元視窗中執行下列命令。

cl.exe /EHsc priority_buffer.cpp

另請參閱

並行執行階段逐步解說
非同步訊息區
訊息傳遞函式