Exemplarische Vorgehensweise: Erstellen eines benutzerdefinierten Nachrichtenblocks

In diesem Dokument wird beschrieben, wie ein benutzerdefinierter Nachrichtenblocktyp erstellt wird, um eingehende Nachrichten nach Priorität zu sortieren.

Obwohl die integrierten Nachrichtenblocktypen eine breite Palette von Funktionen bereitstellen, können Sie auch eigene Nachrichtenblocktypen erstellen und anpassen, um die Anforderungen Ihrer Anwendung zu erfüllen. Eine Beschreibung der integrierten Nachrichtenblocktypen, die von der Bibliothek für asynchrone Agents bereitgestellt werden, finden Sie unter Asynchrone Nachrichtenblöcke.

Voraussetzungen

Lesen Sie die folgenden Dokumente, bevor Sie mit dieser exemplarischen Vorgehensweise beginnen:

Abschnitte

Diese exemplarische Vorgehensweise enthält folgende Abschnitte:

Entwerfen eines benutzerdefinierten Nachrichtenblocks

Nachrichtenblöcke sind am Senden und Empfangen von Nachrichten beteiligt. Ein Nachrichtenblock, der Nachrichten sendet, wird als Quellblock bezeichnet. Ein Nachrichtenblock, der Nachrichten empfängt, wird als Zielblock bezeichnet. Ein Nachrichtenblock, der Nachrichten sendet und empfängt, wird als Propagatorblock bezeichnet. Die Agents Library verwendet die abstrakte Klasse concurrency::ISource , um Quellblöcke und die abstrakte Klasse concurrency::ITarget zur Darstellung von Zielblöcken zu darstellen. Nachrichtenblocktypen, die als Quelle dienen, werden von der ISource-Klasse abgeleitet, während Nachrichtenblocktypen, die als Ziel dienen, von der ITarget-Klasse abgeleitet werden.

Der Nachrichtenblocktyp kann prinzipiell unmittelbar von ISource und ITarget abgeleitet werden. Die Agents Library definiert jedoch drei Basisklassen, deren Funktionalität weitestgehend der aller Nachrichtenblocktypen entspricht. Beispiel: parallelitätssicheres Behandeln von Fehlern und parallelitätssicheres Verbinden von Nachrichtenblöcken. Die concurrency::source_block-KlasseISource wird von und sendet Nachrichten an andere Blöcke. Die concurrency::target_block-Klasse wird ITarget von anderen Blöcken und empfängt Nachrichten von ihnen. Die concurrency::p ropagator_block-KlasseISourceITarget wird von und ableiten und sendet Nachrichten an andere Blöcke und empfängt Nachrichten von anderen Blöcken. Es wird empfohlen, Infrastrukturdetails mit diesen drei Basisklassen zu behandeln, sodass Sie sich auf das Verhalten des Nachrichtenblocks konzentrieren können.

Die Klassen source_block, target_block und propagator_block sind Vorlagen, die auf der Grundlage eines Typs parametrisiert werden, der die Verbindungen oder Links zwischen Quell- und Zielblöcken verwaltet, sowie auf Grundlage eines Typs, der die Verarbeitung von Nachrichten verwaltet. Die Agents Library definiert zwei Typen, die die Linkverwaltung ausführen: concurrency::single_link_registry und concurrency::multi_link_registry. Die single_link_registry-Klasse ermöglicht das Verknüpfen eines Nachrichtenblocks mit einer Quelle oder einem Ziel. Die multi_link_registry-Klasse ermöglicht das Verknüpfen eines Nachrichtenblocks mit mehreren Quellen oder mehreren Zielen. Die Agents Library definiert eine Klasse, die die Nachrichtenverwaltung ausführt, concurrency::ordered_message_processor. Die ordered_message_processor-Klasse ermöglicht Nachrichtenblöcken die Verarbeitung von Nachrichten in der Reihenfolge ihres Empfangs.

Im folgenden Beispiel wird die Beziehung zwischen Nachrichtenblöcken sowie Quellen und Zielen veranschaulicht. Dieses Beispiel zeigt die Deklaration der concurrency::transformer-Klasse .

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

Die transformer-Klasse wird von propagator_block abgeleitet und fungiert daher als Quell- sowie als Zielblock. Sie akzeptiert Nachrichten vom Typ _Input und sendet Nachrichten vom Typ _Output. Die transformer-Klasse gibt single_link_registry als Link-Manager für alle Zielblöcke und multi_link_registry als Link-Manager für alle Quellblöcke an. Aus diesem Grund kann ein transformer-Objekt ein Ziel sowie eine unbegrenzte Anzahl von Quellen haben.

Eine von ableitende source_block Klasse muss sechs Methoden implementieren: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message und resume_propagation. Eine Klasse, die von ableitung, target_block muss die propagate_message implementieren und kann optional die send_message implementieren. Ableitungen von propagator_block sowie von source_block und target_block sind funktional äquivalent.

Die propagate_to_any_targets-Methode wird von der Laufzeit aufgerufen, um alle eingehenden Nachrichten synchron oder asynchron zu verarbeiten und alle ausgehenden Nachrichten weiterzugeben. Die accept_message-Methode wird von Zielblöcken aufgerufen, um Nachrichten zu akzeptieren. Viele Nachrichtenblocktypen wie unbounded_buffer senden Nachrichten nur an das erste Ziel, das diese empfangen würde. Daher wird der Besitz der Nachricht auf das Ziel übertragen. Andere Nachrichtenblocktypen, z. B. concurrency::overwrite_buffer, bieten Nachrichten für jeden seiner Zielblöcke an. overwrite_buffer erstellt daher eine Kopie der Nachricht für alle diesbezüglichen Ziele.

Mit den Methoden reserve_message, consume_message, release_message und resume_propagation können Nachrichtenblöcke an der Reservierung von Nachrichten teilnehmen. Zielblöcke rufen die reserve_message-Methode auf, wenn eine Nachricht für sie bereitgestellt wird, die zur späteren Verwendung reserviert werden muss. Nach dem Reservieren einer Nachricht durch den Zielblock kann dieser die consume_message-Methode aufrufen, um die Nachricht zu verarbeiten, oder die release_message-Methode, um die Reservierung abzubrechen. Analog zur accept_message-Methode kann die Implementierung von consume_message den Besitz der Nachricht übertragen oder eine Kopie der Nachricht zurückgeben. Nachdem eine reservierte Nachricht von einem Zielblock verarbeitet oder freigegeben wurde, wird die resume_propagation-Methode von der Laufzeit aufgerufen. Diese Methode setzt die Nachrichtenweitergabe i. d. R. mit der nächsten Nachricht in der Warteschlange fort.

Die propagate_message-Methode wird von der Laufzeit aufgerufen, um eine Nachricht asynchron von einem anderen Block zum aktuellen zu übertragen. Die send_message-Methode ähnelt der propagate_message-Methode, sendet die Nachrichten im Unterschied zu dieser jedoch synchron an die Zielblöcke. Die Standardimplementierung von send_message weist alle eingehenden Nachrichten zurück. Die Laufzeit ruft keine der Methoden auf, wenn von der Nachricht nicht die optionale Filterfunktion übergeben wird, die dem Zielblock zugeordnet ist. Weitere Informationen zu Nachrichtenfiltern finden Sie unter Asynchrone Nachrichtenblöcke.

[Oben]

Definieren der priority_buffer-Klasse

Die priority_buffer-Klasse ist ein benutzerdefinierter Nachrichtenblocktyp, der eingehende Meldungen zunächst nach der Priorität und anschließend nach der Reihenfolge ihres Empfangs sortiert. priority_buffer Die -Klasse ähnelt der concurrency::unbounded_buffer-Klasse, da sie eine Nachrichtenwarteschlange enthält und auch als Quell- und Zielnachrichtenblock fungiert und sowohl mehrere Quellen als auch mehrere Ziele haben kann. unbounded_buffer legt als Kriterium für die Weitergabe von Nachrichten jedoch nur die Reihenfolge ihres Empfangs aus den Quellen zugrunde.

Die priority_buffer -Klasse empfängt Nachrichten vom Typ std::tuple, die - und - PriorityType Elemente Type enthalten. PriorityType verweist auf den Typ, der die Priorität einer Nachricht angibt; Type verweist auf den Datenteil der Nachricht. Die priority_buffer-Klasse sendet Nachrichten vom Typ Type. Die priority_buffer -Klasse verwaltet auch zwei Nachrichtenwarteschlangen: ein std::p riority_queue-Objekt für eingehende Nachrichten und ein std::queue-Objekt für ausgehende Nachrichten. Das Sortieren von Nachrichten nach der Priorität ist hilfreich, wenn ein priority_buffer-Objekt mehrere Nachrichten gleichzeitig oder bevor diese von Consumern gelesen werden empfängt.

Zusätzlich zu den sieben Methoden, die von einer Klasse implementiert werden müssen, die von propagator_block abgeleitet wird, überschreibt die priority_buffer-Klasse die noch die link_target_notification-Methode und die send_message-Methode. Die priority_buffer-Klasse definiert außerdem zwei öffentliche Hilfsmethoden (, enqueue und dequeue) sowie eine private Hilfsmethode ( propagate_priority_order).

Im folgenden Verfahren wird beschrieben, wie die priority_buffer-Klasse implementiert wird.

So definieren Sie die priority_buffer-Klasse

  1. Erstellen Sie eine C++-Headerdatei, und nennen Sie sie priority_buffer.h. Sie können auch eine bestehende Headerdatei verwenden, die Teil Ihres Projekts ist.

  2. Fügen priority_buffer.hSie in den folgenden Code hinzu.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. Definieren Sie std im -Namespace Spezialisierungen von std::less und std::greater , die für concurrency::message-Objekte fungieren .

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    Die priority_buffer-Klasse speichert message-Objekte in einem priority_queue-Objekt. Mit diesen Typspezialisierungen können die Nachrichten von der Prioritätswarteschlange anhand ihrer Priorität sortiert werden. Die Priorität ist das erste Element des tuple-Objekts.

  4. Deklarieren Sie die concurrencyex-Klasse im priority_buffer-Namespace.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    Die priority_buffer-Klasse wird von propagator_block abgeleitet. Sie kann daher Meldungen senden und empfangen. Die priority_buffer-Klasse mehrere Ziele aufweisen, die Nachrichten vom Typ Type empfangen. Sie kann außerdem mehrere Quellen aufweisen, die Nachrichten vom Typ tuple<PriorityType, Type> senden.

  5. Fügen Sie im private-Abschnitt der priority_buffer-Klasse die folgenden Membervariablen hinzu.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    Das priority_queue-Objekt enthält eingehende Nachrichten, das queue-Objekt enthält ausgehende Nachrichten. Ein priority_buffer-Objekt kann mehrere Nachrichten gleichzeitig empfangen. Das critical_section-Objekt synchronisiert den Zugriff auf die Warteschlange für eingehende Nachrichten.

  6. Definieren Sie den Kopierkonstruktor und den Zuweisungsoperator im Abschnitt private. Dadurch wird verhindert, dass priority_queue-Objekte zugewiesen werden können.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. Definieren Sie im public-Abschnitt die Konstruktoren, die in zahlreichen Nachrichtenblocktypen verwendet werden. Definieren Sie auch den Destruktor.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. Definieren Sie im public-Abschnitt die enqueue-Methode und die dequeue-Methode. Diese Hilfsmethoden bieten eine alternative Möglichkeit, Nachrichten an ein priority_buffer-Objekt zu senden und von diesem zu empfangen.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. Definieren Sie die protected-Methode im propagate_to_any_targets-Abschnitt.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    Die propagate_to_any_targets-Methode überträgt die Nachricht, die sich in der Eingabewarteschlange an erster Stelle befindet, an die Ausgabewarteschlange, und gibt alle Nachrichten an die Ausgabewarteschlange weiter.

  10. Definieren Sie die protected-Methode im accept_message-Abschnitt.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Wenn die accept_message-Methode von einem Zielblock aufgerufen wird, wird der Besitz der Nachricht von der priority_buffer-Klasse auf den ersten Zielblock übertragen, der diesen akzeptiert. (Dieses Verhalten ist vergleichbar mit unbounded_buffer).

  11. Definieren Sie die protected-Methode im reserve_message-Abschnitt.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    Die priority_buffer-Klasse ermöglicht einem Zielblock, eine Nachricht zu reservieren, wenn der Bezeichner der bereitgestellten Nachricht mit dem Bezeichner der Nachricht übereinstimmt, die an erster Position in der Warteschlange steht. Anders ausgedrückt: Ein priority_buffer Ziel kann die Nachricht reservieren, wenn das Objekt noch keine zusätzliche Nachricht empfangen und die aktuelle noch nicht weiter gegeben hat.

  12. Definieren Sie die protected-Methode im consume_message-Abschnitt.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Die consume_message-Methode wird von einem Zielblock aufgerufen, um den Besitz der reservierten Methode zu übertragen.

  13. Definieren Sie die protected-Methode im release_message-Abschnitt.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Die release_message-Methode wird von einem Zielblock aufgerufen, um die Reservierung einer Nachricht abzubrechen.

  14. Definieren Sie die protected-Methode im resume_propagation-Abschnitt.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Nachdem eine reservierte Nachricht von einem Zielblock verarbeitet oder freigegeben wurde, wird die resume_propagation-Methode von der Laufzeit aufgerufen. Diese Methode gibt alle Nachrichten weiter, die sich in der Ausgabewarteschlange befinden.

  15. Definieren Sie die protected-Methode im link_target_notification-Abschnitt.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    Die Membervariable _M_pReservedFor wird von der Basisklasse source_block definiert. Diese Membervariable zeigt ggf. auf den Zielblock mit der Reservierung für die Nachricht, die sich an erster Stelle in der Warteschlange befindet. link_target_notification wird von der Laufzeit aufgerufen, wenn ein neues Ziel mit dem priority_buffer-Objekt verknüpft wird. Diese Methode gibt alle Nachrichten in der Ausgabewarteschlange weiter, wenn kein Ziel eine Reservierung aufweist.

  16. Definieren Sie die private-Methode im propagate_priority_order-Abschnitt.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Diese Methode gibt alle Nachrichten von der Ausgabewarteschlange weiter. Jede Nachricht in der Warteschlange wird für alle Zielblöcke bereitgestellt, bis einer der Zielblöcke die Meldung akzeptiert. Die priority_buffer-Klasse behält die Reihenfolge der ausgehenden Nachrichten bei. Daher muss die erste Nachricht in der Ausgabewarteschlange von einem Zielblock akzeptiert werden, bevor eine andere Meldung von dieser Methode für die Zielblöcke bereitgestellt wird.

  17. Definieren Sie die protected-Methode im propagate_message-Abschnitt.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    Die propagate_message-Methode ermöglicht der priority_buffer-Klasse als Nachrichtenempfänger oder -ziel fungieren. Diese Methode empfängt die vom angegebenen Quellblock bereitgestellte Nachricht und fügt sie in die Prioritätswarteschlange ein. Anschließend werden alle Ausgabenachrichten von der propagate_message-Methode asynchron an die Zielblöcke gesendet.

    Die Runtime ruft diese Methode auf, wenn Sie die concurrency::asend-Funktion aufrufen oder wenn der Nachrichtenblock mit anderen Nachrichtenblöcken verbunden ist.

  18. Definieren Sie die protected-Methode im send_message-Abschnitt.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    Die send_message-Methode ähnelt der propagate_message-Methode. Im Unterschied zu dieser sendet sie die Ausgabemeldungen jedoch synchron.

    Die Laufzeit ruft diese Methode während eines synchronen Sendevorgang auf, z. B. wenn Sie die concurrency::send-Funktion aufrufen.

Die priority_buffer-Klasse enthält Konstruktorüberladungen, die in vielen Nachrichtenblocktypen verwendet werden. Einige Konstruktorüberladungen verwenden concurrency::Scheduler- oder concurrency::ScheduleGroup-Objekte , die es ermöglichen, dass der Meldungsblock von einem bestimmten Aufgabenplaner verwaltet wird. Andere Konstruktorüberladungen übernehmen eine Filterfunktion. Filterfunktionen ermöglichen Nachrichtenblöcken das Annehmen oder Ablehnen von Nachrichten anhand der Nutzlast. Weitere Informationen zu Nachrichtenfiltern finden Sie unter Asynchrone Nachrichtenblöcke. Weitere Informationen zu Aufgabenplanern finden Sie unter Taskplaner.

priority_buffer Da die -Klasse Nachrichten nach Priorität und dann nach der Reihenfolge, in der Nachrichten empfangen werden, geordnet, ist diese Klasse am nützlichsten, wenn nachrichten asynchron empfangen werden, z. B. wenn Sie die concurrency::asend-Funktion aufrufen oder wenn der Nachrichtenblock mit anderen Nachrichtenblöcken verbunden ist.

[Oben]

Vollständiges Beispiel

Im folgenden Beispiel wird die vollständige Definition der priority_buffer-Klasse veranschaulicht.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

Im folgenden Beispiel werden gleichzeitig eine Reihe von - und asendconcurrency::receive-Vorgängen für ein -Objekt priority_buffer ausgeführt.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Dieses Beispiel erzeugt die folgende Beispielausgabe.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

Die priority_buffer-Klasse ordnet die Nachrichten zunächst nach der Priorität und anschließend nach der Reihenfolge ihres Empfangs. In diesem Beispiel werden Nachrichten mit höherer numerischer Priorität am Anfang der Warteschlange eingefügt.

[Oben]

Kompilieren des Codes

Kopieren Sie den Beispielcode, und fügen Sie ihn in ein Visual Studio-Projekt ein, priority_buffer oder fügen Sie die Definition der -Klasse in priority_buffer.h eine Datei mit dem Namen und das Testprogramm in priority_buffer.cpp eine Datei mit dem Namen ein, und führen Sie dann den folgenden Befehl in einem Visual Studio-Eingabeaufforderungsfenster aus.

cl.exe /EHsc priority_buffer.cpp

Siehe auch

Concurrency Runtime Exemplarische Vorgehensweisen
Asynchrone Nachrichtenblöcke
Nachrichtenüber übergebende Funktionen