Tutorial: Crear un bloque de mensajes personalizadoWalkthrough: Creating a Custom Message Block

En este documento se describe cómo crear un tipo de bloque de mensajes personalizado que ordena los mensajes entrantes por prioridad.This document describes how to create a custom message block type that orders incoming messages by priority.

Aunque los tipos integrados de bloques de mensajes proporciona una amplia gama de funcionalidad, puede crear su propio tipo de bloque de mensajes y personalizarlo para satisfacer los requisitos de la aplicación.Although the built-in message block types provide a wide-range of functionality, you can create your own message block type and customize it to meet the requirements of your application. Para obtener una descripción de los tipos de bloques de mensajes integrados que proporciona la biblioteca de agentes asincrónicos, vea bloques de mensajes asincrónicos.For a description of the built-in message block types that are provided by the Asynchronous Agents Library, see Asynchronous Message Blocks.

PrerrequisitosPrerequisites

Lea los documentos siguientes antes de iniciar este tutorial:Read the following documents before you start this walkthrough:

SecciónSections

Este tutorial contiene las siguientes secciones:This walkthrough contains the following sections:

Diseñar un bloque de mensajes personalizadoDesigning a Custom Message Block

Los bloques de mensajes participan en el acto de enviar y recibir mensajes.Message blocks participate in the act of sending and receiving messages. Un bloque de mensajes que envía mensajes se denomina bloque de origen.A message block that sends messages is known as a source block. Un bloque de mensajes que recibe mensajes se conoce como bloque de destino.A message block that receives messages is known as a target block. Un bloque de mensajes que envía y recibe mensajes se conoce como un bloque propagador.A message block that both sends and receives messages is known as a propagator block. La biblioteca de agentes usa la clase abstracta Concurrency:: ISource para representar los bloques de origen y la clase abstracta Concurrency:: ITarget para representar los bloques de destino.The Agents Library uses the abstract class concurrency::ISource to represent source blocks and the abstract class concurrency::ITarget to represent target blocks. Los tipos de bloques de mensajes que actúan como orígenes derivan de ISource; los tipos de bloques de mensajes que actúan como destinos derivan de ITarget.Message block types that act as sources derive from ISource; message block types that act as targets derive from ITarget.

Aunque puede derivar el tipo de bloque de mensajes directamente de ISource y ITarget, la Biblioteca de agentes define tres clases base que realizan gran parte de la funcionalidad común a todos los tipos de bloques de mensajes; por ejemplo, control de errores y conexión de los bloques de mensajes de manera segura para simultaneidad.Although you can derive your message block type directly from ISource and ITarget, the Agents Library defines three base classes that perform much of the functionality that is common to all message block types, for example, handling errors and connecting message blocks together in a concurrency-safe manner. La clase Concurrency:: source_block deriva de ISource y envía mensajes a otros bloques.The concurrency::source_block class derives from ISource and sends messages to other blocks. La clase Concurrency:: target_block deriva de ITarget y recibe mensajes de otros bloques.The concurrency::target_block class derives from ITarget and receives messages from other blocks. La clase Concurrency::p ropagator_block deriva de ISource y ITarget y envía mensajes a otros bloques y recibe mensajes de otros bloques.The concurrency::propagator_block class derives from ISource and ITarget and sends messages to other blocks and it receives messages from other blocks. Se recomienda usar estas tres clases base para controlar los detalles de infraestructura de modo que se pueda centrar en el comportamiento del bloque de mensajes.We recommend that you use these three base classes to handle infrastructure details so that you can focus on the behavior of your message block.

Las clases source_block, target_block y propagator_block son plantillas que se parametrizan en un tipo que administra las conexiones, o vínculos, entre los bloques de origen y de destino y en un tipo que administra cómo se procesan los mensajes.The source_block, target_block, and propagator_block classes are templates that are parameterized on a type that manages the connections, or links, between source and target blocks and on a type that manages how messages are processed. La biblioteca de agentes define dos tipos que realizan la administración de vínculos, Concurrency:: single_link_registry y Concurrency:: multi_link_registry.The Agents Library defines two types that perform link management, concurrency::single_link_registry and concurrency::multi_link_registry. La clase single_link_registry permite vincular un bloque de mensajes a un origen o a un destino.The single_link_registry class enables a message block to be linked to one source or to one target. La clase multi_link_registry permite vincular un bloque de mensajes a varios orígenes o a varios destinos.The multi_link_registry class enables a message block to be linked to multiple sources or multiple targets. La biblioteca de agentes define una clase que realiza la administración de mensajes, Concurrency:: ordered_message_processor.The Agents Library defines one class that performs message management, concurrency::ordered_message_processor. La clase ordered_message_processor permite que los bloques de mensajes procesen los mensajes en el orden en que se reciben.The ordered_message_processor class enables message blocks to process messages in the order in which it receives them.

Para entender mejor cómo se relacionan los bloques de mensajes con sus orígenes y destinos, considere el ejemplo siguiente.To better understand how message blocks relate to their sources and targets, consider the following example. En este ejemplo se muestra la declaración de la clase Concurrency:: Transformer .This example shows the declaration of the concurrency::transformer class.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

La clase transformer se deriva de propagator_block y, por tanto, actúa como bloque de origen y como bloque de destino.The transformer class derives from propagator_block, and therefore acts as both a source block and as a target block. Acepta mensajes de tipo _Input y envía mensajes de tipo _Output.It accepts messages of type _Input and sends messages of type _Output. La clase transformer especifica single_link_registry como administrador de vínculos para los bloques de destino y multi_link_registry como administrador de vínculos para los bloques de origen.The transformer class specifies single_link_registry as the link manager for any target blocks and multi_link_registry as the link manager for any source blocks. Por tanto, un objeto transformer puede tener hasta un destino y un número ilimitado de orígenes.Therefore, a transformer object can have up to one target and an unlimited number of sources.

Una clase que deriva de source_block debe implementar seis métodos: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_messagey resume_propagation.A class that derives from source_block must implement six methods: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, and resume_propagation. Una clase que deriva de target_block debe implementar el método propagate_message y, opcionalmente, puede implementar el método send_message .A class that derives from target_block must implement the propagate_message method and can optionally implement the send_message method. Derivar de propagator_block es funcionalmente equivalente a la derivación de source_block y target_block.Deriving from propagator_block is functionally equivalent to deriving from both source_block and target_block.

El runtime llama al método propagate_to_any_targets para procesar de forma sincrónica o asincrónica los mensajes entrantes y propagar los mensajes salientes.The propagate_to_any_targets method is called by the runtime to asynchronously or synchronously process any incoming messages and propagate out any outgoing messages. Los bloques de destino llaman al método accept_message para aceptar mensajes.The accept_message method is called by target blocks to accept messages. Muchos tipos de bloques de mensajes, como unbounded_buffer, envían mensajes solo al primer destino que los recibiría.Many message block types, such as unbounded_buffer, send messages only to the first target that would receive it. Por tanto, transfiere la propiedad del mensaje al destino.Therefore, it transfers ownership of the message to the target. Otros tipos de bloques de mensajes, como Concurrency:: overwrite_buffer, ofrecen mensajes a cada uno de sus bloques de destino.Other message block types, such as concurrency::overwrite_buffer, offer messages to each of its target blocks. Por tanto, overwrite_buffer crea una copia del mensaje para cada uno de sus destinos.Therefore, overwrite_buffer creates a copy of the message for each of its targets.

Los métodos reserve_message, consume_message, release_message y resume_propagation permiten a los bloques de mensajes participar en la reserva de mensajes.The reserve_message, consume_message, release_message, and resume_propagation methods enable message blocks to participate in message reservation. Los bloques de destino llaman al método reserve_message cuando se les ofrece un mensaje y tienen que reservar el mensaje para su uso posterior.Target blocks call the reserve_message method when they are offered a message and have to reserve the message for later use. Después de que un bloque de destino reserva un mensaje, puede llamar al método consume_message para usar ese mensaje o al método release_message para cancelar la reserva.After a target block reserves a message, it can call the consume_message method to consume that message or the release_message method to cancel the reservation. Como sucede con el método accept_message, la implementación de consume_message puede transferir la propiedad del mensaje o devolver una copia del mensaje.As with the accept_message method, the implementation of consume_message can either transfer ownership of the message or return a copy of the message. Después de que un bloque de destino usa o libera un mensaje reservado, el runtime llama al método resume_propagation.After a target block either consumes or releases a reserved message, the runtime calls the resume_propagation method. Normalmente, este método continúa la propagación de mensajes, comenzando por el siguiente mensaje de la cola.Typically, this method continues message propagation, starting with the next message in the queue.

El runtime llama al método propagate_message para transferir de forma asincrónica un mensaje de otro bloque al actual.The runtime calls the propagate_message method to asynchronously transfer a message from another block to the current one. El método send_message es similar a propagate_message, excepto en que envía de forma sincrónica, en lugar de asincrónica, el mensaje a los bloques de destino.The send_message method resembles propagate_message, except that it synchronously, instead of asynchronously, sends the message to the target blocks. La implementación predeterminada de send_message rechaza todos los mensajes entrantes.The default implementation of send_message rejects all incoming messages. El runtime no llama a ninguno de estos métodos si el mensaje no supera la función opcional de filtro asociada al bloque de destino.The runtime does not call either of these methods if the message does not pass the optional filter function that is associated with the target block. Para obtener más información acerca de los filtros de mensajes, consulte bloques de mensajes asincrónicos.For more information about message filters, see Asynchronous Message Blocks.

[Arriba][Top]

Definición de la clase priority_bufferDefining the priority_buffer Class

La clase priority_buffer es un tipo de bloque de mensajes personalizado que ordena los mensajes entrantes primero por prioridad y, a continuación, en el orden en que se reciben los mensajes.The priority_buffer class is a custom message block type that orders incoming messages first by priority, and then by the order in which messages are received. La priority_buffer clase es similar a la clase Concurrency:: unbounded_buffer porque contiene una cola de mensajes, y también porque actúa como un bloque de mensajes de origen y de destino, y puede tener varios orígenes y varios destinos.The priority_buffer class resembles the concurrency::unbounded_buffer class because it holds a queue of messages, and also because it acts as both a source and a target message block and can have both multiple sources and multiple targets. Sin embargo, unbounded_buffer basa la propagación de mensaje solo en el orden en que recibe mensajes de sus orígenes.However, unbounded_buffer bases message propagation only on the order in which it receives messages from its sources.

La priority_buffer clase recibe mensajes de tipo STD::Tuple que contienen PriorityType Type elementos y.The priority_buffer class receives messages of type std::tuple that contain PriorityType and Type elements. PriorityType se refiere al tipo que contiene la prioridad de cada mensaje; Type se refiere a la parte de datos del mensaje.PriorityType refers to the type that holds the priority of each message; Type refers to the data portion of the message. La clase priority_buffer envía mensajes de tipo Type.The priority_buffer class sends messages of type Type. La priority_buffer clase también administra dos colas de mensajes: un objeto std::p riority_queue para los mensajes entrantes y un objeto STD::Queue para los mensajes salientes.The priority_buffer class also manages two message queues: a std::priority_queue object for incoming messages and a std::queue object for outgoing messages. Ordenar los mensajes por prioridad es útil cuando un objeto priority_buffer recibe varios mensajes simultáneamente o cuando recibe varios mensajes antes de que los consumidores lean cualquier mensaje.Ordering messages by priority is useful when a priority_buffer object receives multiple messages simultaneously or when it receives multiple messages before any messages are read by consumers.

Además de los siete métodos que una clase derivada de propagator_block debe implementar, la clase priority_buffer también invalida los métodos link_target_notification y send_message.In addition to the seven methods that a class that derives from propagator_block must implement, the priority_buffer class also overrides the link_target_notification and send_message methods. La clase priority_buffer también define dos métodos del asistente públicos, enqueue y dequeue, y un método del asistente privado, propagate_priority_order.The priority_buffer class also defines two public helper methods, enqueue and dequeue, and a private helper method, propagate_priority_order.

En el procedimiento siguiente se describe cómo implementar la clase priority_buffer.The following procedure describes how to implement the priority_buffer class.

Para definir la clase priority_bufferTo define the priority_buffer class

  1. Cree un archivo de encabezado de C++ y asígnele el nombre priority_buffer.h .Create a C++ header file and name it priority_buffer.h. O bien, puede usar un archivo de encabezado existente que forme parte del proyecto.Alternatively, you can use an existing header file that is part of your project.

  2. En priority_buffer.h , agregue el código siguiente.In priority_buffer.h, add the following code.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. En el std espacio de nombres, defina especializaciones de STD:: less y STD:: Greater que actúen en los objetos Concurrency::Message .In the std namespace, define specializations of std::less and std::greater that act on concurrency::message objects.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    La clase priority_buffer almacena objetos message en un objeto priority_queue.The priority_buffer class stores message objects in a priority_queue object. Estas especializaciones de tipo permiten a la cola de prioridad ordenar los mensajes según su prioridad.These type specializations enable the priority queue to sort messages according to their priority. La prioridad es el primer elemento del objeto tuple.The priority is the first element of the tuple object.

  4. En el espacio de nombres concurrencyex, declare la clase priority_buffer.In the concurrencyex namespace, declare the priority_buffer class.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    La clase priority_buffer deriva de propagator_block.The priority_buffer class derives from propagator_block. Por tanto, puede enviar y recibir mensajes.Therefore, it can both send and receive messages. La clase priority_buffer puede tener varios destinos que reciben mensajes de tipo Type.The priority_buffer class can have multiple targets that receive messages of type Type. También puede tener varios orígenes que envían mensajes de tipo tuple<PriorityType, Type>.It can also have multiple sources that send messages of type tuple<PriorityType, Type>.

  5. En la private sección de la priority_buffer clase, agregue las siguientes variables de miembro.In the private section of the priority_buffer class, add the following member variables.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    El objeto priority_queue contiene mensajes entrantes; el objeto queue contiene mensajes salientes.The priority_queue object holds incoming messages; the queue object holds outgoing messages. Un objeto priority_buffer puede recibir varios mensajes simultáneamente; el objeto critical_section sincroniza el acceso a la cola de mensajes entrantes.A priority_buffer object can receive multiple messages simultaneously; the critical_section object synchronizes access to the queue of input messages.

  6. En la private sección, defina el constructor de copias y el operador de asignación.In the private section, define the copy constructor and the assignment operator. Esto impide que los objetos priority_queue sean asignables.This prevents priority_queue objects from being assignable.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. En la public sección, defina los constructores que son comunes a muchos tipos de bloques de mensajes.In the public section, define the constructors that are common to many message block types. Defina también el destructor.Also define the destructor.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. En la public sección, defina los métodos enqueue y dequeue .In the public section, define the methods enqueue and dequeue. Estos métodos del asistente proporcionan una manera alternativa de enviar mensajes a y recibir mensajes de un objeto priority_buffer.These helper methods provide an alternative way to send messages to and receive messages from a priority_buffer object.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. En la protected sección, defina el propagate_to_any_targets método.In the protected section, define the propagate_to_any_targets method.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    El método propagate_to_any_targets transfiere a la cola de salida el mensaje que está al principio de la cola de entrada y propaga todos los mensajes de la cola de salida.The propagate_to_any_targets method transfers the message that is at the front of the input queue to the output queue and propagates out all messages in the output queue.

  10. En la protected sección, defina el accept_message método.In the protected section, define the accept_message method.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Cuando un bloque de destino llama al método accept_message, la clase priority_buffer transfiere la propiedad del mensaje al primer bloque de destino que lo acepta.When a target block calls the accept_message method, the priority_buffer class transfers ownership of the message to the first target block that accepts it. (Esto es similar al comportamiento de unbounded_buffer.)(This resembles the behavior of unbounded_buffer.)

  11. En la protected sección, defina el reserve_message método.In the protected section, define the reserve_message method.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    La clase priority_buffer permite a un bloque de destino reservar un mensaje cuando el identificador de mensaje proporcionado coincide con el identificador de mensaje que está en el principio de la cola.The priority_buffer class permits a target block to reserve a message when the provided message identifier matches the identifier of the message that is at the front of the queue. Es decir, un destino puede reservar el mensaje si el objeto priority_buffer aún no ha recibido un mensaje adicional y aún no ha propagado el actual.In other words, a target can reserve the message if the priority_buffer object has not yet received an additional message and has not yet propagated out the current one.

  12. En la protected sección, defina el consume_message método.In the protected section, define the consume_message method.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Un bloque de destino llama a consume_message para transferir la propiedad del mensaje reservado.A target block calls consume_message to transfer ownership of the message that it reserved.

  13. En la protected sección, defina el release_message método.In the protected section, define the release_message method.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Un bloque de destino llama a release_message para cancelar la reserva un mensaje.A target block calls release_message to cancel its reservation to a message.

  14. En la protected sección, defina el resume_propagation método.In the protected section, define the resume_propagation method.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    El runtime llama a resume_propagation después de que un bloque de destino use o libere un mensaje reservado.The runtime calls resume_propagation after a target block either consumes or releases a reserved message. Este método propaga cualquier mensaje que esté en la cola de salida.This method propagates out any messages that are in the output queue.

  15. En la protected sección, defina el link_target_notification método.In the protected section, define the link_target_notification method.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    La variable miembro _M_pReservedFor está definida por la clase base, source_block.The _M_pReservedFor member variable is defined by the base class, source_block. Esta variable miembro apunta al bloque de destino, si existe, que mantiene una reserva del mensaje que está al principio de la cola de salida.This member variable points to the target block, if any, that is holding a reservation to the message that is at the front of the output queue. El runtime llama a link_target_notification cuando un nuevo destino se vincula al objeto priority_buffer.The runtime calls link_target_notification when a new target is linked to the priority_buffer object. Este método propaga cualquier mensaje que está en la cola de salida si ningún destino mantiene una reserva.This method propagates out any messages that are in the output queue if no target is holding a reservation.

  16. En la private sección, defina el propagate_priority_order método.In the private section, define the propagate_priority_order method.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Este método propaga todos los mensajes de la cola de salida.This method propagates out all messages from the output queue. Todos los mensajes de la cola se ofrecen a todos los bloques de destino hasta que uno de los bloques de destino acepta el mensaje.Every message in the queue is offered to every target block until one of the target blocks accepts the message. La clase priority_buffer conserva el orden de los mensajes salientes.The priority_buffer class preserves the order of the outgoing messages. Por tanto, un bloque de destino debe aceptar el primer mensaje de la cola de salida antes de que este método ofrezca ningún otro mensaje a los bloques de destino.Therefore, the first message in the output queue must be accepted by a target block before this method offers any other message to the target blocks.

  17. En la protected sección, defina el propagate_message método.In the protected section, define the propagate_message method.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    El método propagate_message permite que la clase priority_buffer actúe como receptor de mensajes o destino.The propagate_message method enables the priority_buffer class to act as a message receiver, or target. Este método recibe el mensaje ofrecido por el bloque de origen proporcionado e inserta ese mensaje en la cola de prioridad.This method receives the message that is offered by the provided source block and inserts that message into the priority queue. El método propagate_message envía de forma asincrónica todos los mensajes de salida a los bloques de destino.The propagate_message method then asynchronously sends all output messages to the target blocks.

    El Runtime llama a este método cuando se llama a la función Concurrency:: Asend o cuando el bloque de mensajes está conectado a otros bloques de mensajes.The runtime calls this method when you call the concurrency::asend function or when the message block is connected to other message blocks.

  18. En la protected sección, defina el send_message método.In the protected section, define the send_message method.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    El método send_message es similar a propagate_message.The send_message method resembles propagate_message. Sin embargo, envía los mensajes de salida sincrónicamente en lugar de hacerlo de forma asincrónica.However it sends the output messages synchronously instead of asynchronously.

    El Runtime llama a este método durante una operación de envío sincrónica, como cuando se llama a la función Concurrency:: Send .The runtime calls this method during a synchronous send operation, such as when you call the concurrency::send function.

La clase priority_buffer contiene sobrecargas del constructor que son típicas en muchos tipos de bloques de mensajes.The priority_buffer class contains constructor overloads that are typical in many message block types. Algunas sobrecargas de constructor toman objetos Concurrency:: Scheduler o Concurrency:: ScheduleGroup , que permiten que un programador de tareas específico administre el bloque de mensajes.Some constructor overloads take concurrency::Scheduler or concurrency::ScheduleGroup objects, which enable the message block to be managed by a specific task scheduler. Otras sobrecargas del constructor toman una función de filtro.Other constructor overloads take a filter function. Las funciones de filtro permiten a los bloques de mensajes aceptar o rechazar un mensaje en función de su carga.Filter functions enable message blocks to accept or reject a message on the basis of its payload. Para obtener más información acerca de los filtros de mensajes, consulte bloques de mensajes asincrónicos.For more information about message filters, see Asynchronous Message Blocks. Para obtener más información sobre los programadores de tareas, vea programador de tareas.For more information about task schedulers, see Task Scheduler.

Dado que la priority_buffer clase ordena los mensajes por prioridad y, a continuación, por el orden en que se reciben los mensajes, esta clase es muy útil cuando recibe mensajes de forma asincrónica, por ejemplo, cuando se llama a la función Concurrency:: Asend o cuando el bloque de mensajes está conectado a otros bloques de mensajes.Because the priority_buffer class orders messages by priority and then by the order in which messages are received, this class is most useful when it receives messages asynchronously, for example, when you call the concurrency::asend function or when the message block is connected to other message blocks.

[Arriba][Top]

El ejemplo completoThe Complete Example

En el ejemplo siguiente se muestra la definición completa de la clase priority_buffer.The following example shows the complete definition of the priority_buffer class.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

En el ejemplo siguiente se realizan simultáneamente una serie de asend operaciones and Concurrency:: Receive en un priority_buffer objeto.The following example concurrently performs a number of asend and concurrency::receive operations on a priority_buffer object.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Este ejemplo genera la siguiente salida de ejemplo.This example produces the following sample output.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

La clase priority_buffer ordena los mensajes primero por prioridad y, a continuación, por el orden en que recibe los mensajes.The priority_buffer class orders messages first by priority and then by the order in which it receives messages. En este ejemplo, los mensajes con mayor prioridad numérica se insertan al principio de la cola.In this example, messages with greater numerical priority are inserted towards the front of the queue.

[Arriba][Top]

Compilar el códigoCompiling the Code

Copie el código de ejemplo y péguelo en un proyecto de Visual Studio o pegue la definición de la priority_buffer clase en un archivo denominado priority_buffer.h y el programa de prueba en un archivo denominado priority_buffer.cpp y, a continuación, ejecute el siguiente comando en una ventana del símbolo del sistema de Visual Studio.Copy the example code and paste it in a Visual Studio project, or paste the definition of the priority_buffer class in a file that is named priority_buffer.h and the test program in a file that is named priority_buffer.cpp and then run the following command in a Visual Studio Command Prompt window.

cl.exe/EHsc priority_buffer. cppcl.exe /EHsc priority_buffer.cpp

Consulte tambiénSee also

Tutoriales Runtime de simultaneidadConcurrency Runtime Walkthroughs
Bloques de mensajes asincrónicosAsynchronous Message Blocks
Funciones de paso de mensajesMessage Passing Functions