Compartir a través de


Tutorial: Crear un agente de flujo de datos

En este documento se muestra cómo crear aplicaciones basadas en agentes que recurren al flujo de datos, no al flujo de control.

Flujo de control hace referencia al orden de ejecución de las operaciones de un programa. El flujo de control se regula con las estructuras de control, como instrucciones condicionales, bucles, etc. Como alternativa, el flujo de datos hace referencia a un modelo de programación en el que los cálculos solo se realizan cuando están disponibles todos los datos necesarios. El modelo de programación de flujo de datos está relacionado con el concepto paso de mensajes, en el que los componentes independientes de un programa se comunican entre sí mediante el envío de mensajes.

Los agentes asincrónicos admiten los modelos de programación de flujo de control y flujo de datos. Aunque el modelo de flujo de control es adecuado en muchos casos, el modelo de flujo de datos es adecuado en otros, por ejemplo, cuando un agente recibe los datos y realiza una acción que se basa en la carga de esos datos.

Requisitos previos

Lea los documentos siguientes antes de iniciar este tutorial:

Secciones

Este tutorial contiene las siguientes secciones:

Creación de un agente básico de flujo de control

Considere el ejemplo siguiente que define la clase control_flow_agent. La clase control_flow_agent actúa en tres búferes de mensajes: un búfer de entrada y dos búferes de salida. El método run lee del búfer de mensajes de origen en un bucle y usa una instrucción condicional para dirigir el flujo de ejecución del programa. El agente incrementa un contador para los valores negativos distintos de cero e incrementa otro contador para los valores positivos distintos de cero. Después de que el agente recibe el valor centinela de cero, envía los valores de los contadores a los búferes de mensajes de salida. Los métodos negatives y positives permiten que la aplicación lea los recuentos de valores positivos y negativos del agente.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Aunque en este ejemplo se realiza un uso básico del flujo de control en un agente, muestra la naturaleza de serie de la programación basada en control de flujo. Cada mensaje se debe procesar secuencialmente, aunque haya varios mensajes disponibles en el búfer de mensajes de entrada. El modelo de flujo de datos permite que ambas bifurcaciones de las instrucciones condicionales se evalúen simultáneamente. El modelo de flujo de datos también permite crear redes de mensajería más complejas que actúan sobre los datos cuando están disponibles.

[Arriba]

Creación de un agente de flujo de datos básico

En esta sección se muestra cómo convertir la clase control_flow_agent para usar el modelo de flujo de datos con el fin de realizar la misma tarea.

El agente de flujo de datos crea una red de búferes de mensajes, cada uno de los cuales sirve para un propósito concreto. Ciertos bloques de mensajes usan una función de filtro para aceptar o rechazar un mensaje en función de su carga. Una función de filtro garantiza que un bloque de mensajes reciba solo ciertos valores.

Para convertir el agente de flujo de control en un agente de flujo de datos

  1. Copie el cuerpo de la clase control_flow_agent en otra clase, por ejemplo, dataflow_agent. Como alternativa, puede cambiar el nombre de la clase control_flow_agent.

  2. Quite el cuerpo del bucle que llama a receive desde el método run.

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. En el método run, después de la inicialización de las variables negative_count y positive_count, agregue un objeto countdown_event que siga el contador de operaciones activo.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

La clase countdown_event se muestra más adelante en este tema.

  1. Cree los objetos del búfer de mensajes que participarán en la red de flujo de datos.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Conecte los búferes de mensajes para formar una red.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. Espere a que se establezcan los objetos event y countdown event. Estos eventos señalan que el agente ha recibido el valor centinela y que todas las operaciones han finalizado.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

En el diagrama siguiente se muestra la red de flujo de datos completa de la clase dataflow_agent:

The dataflow network.

En la tabla siguiente se describen los miembros de la red.

Miembro Descripción
increment_active Un objeto concurrency::transformer que incrementa el contador de eventos activos y pasa el valor de entrada al resto de la red.
negatives, positives Objetos concurrency::call que aumentan el recuento de números y reducen el contador de eventos activos. Cada uno de objetos usa un filtro de aceptar los números negativos o los números positivos.
sentinel Un objeto concurrency::call que solo acepta el valor centinela de cero y reduce el contador de eventos activos.
connector Un objeto concurrency::unbounded_buffer que conecta el búfer de mensajes de origen con la red interna.

Dado que se llama al método run en un subproceso independiente, otros subprocesos pueden enviar mensajes a la red antes de que esté conectada totalmente. El miembro de datos _source es un objeto unbounded_buffer que almacena en búfer todas las entradas que se envían desde la aplicación al agente. Para asegurarse de que la red procesa todos los mensajes de entrada, el agente primero vincula los nodos internos de la red y, a continuación, vincula el inicio de esa red, connector, con el miembro de datos _source. Esto garantiza que los mensajes no se procesen mientras se esté formando la red.

Dado que la red de este ejemplo se basa en el flujo de datos, no en el flujo de control, la red debe comunicar al agente que ha finalizado el procesamiento de cada valor de entrada y que el nodo centinela ha recibido su valor. En este ejemplo se usa un objeto countdown_event para indicar que se han procesado todos los valores de entrada y un objeto concurrency::event para indicar que el nodo centinela ha recibido su valor. La clase countdown_event usa un objeto event para señalar cuando un valor de contador llega a cero. El encabezado de la red de flujo de datos incrementa el contador cada vez que recibe un valor. Cada nodo terminal de la red disminuye el contador después de procesar el valor de entrada. Una vez que el agente forma la red de flujo de datos, espera a que el nodo centinela establezca el objeto event y a que el objeto countdown_event señale que el contador ha llegado a cero.

En el ejemplo siguiente, se muestran las clases control_flow_agent, dataflow_agent y countdown_event. La función wmain crea un control_flow_agent y un objeto dataflow_agent, y usa la función send_values para enviar una serie de valores aleatorios a los agentes.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Este ejemplo genera la siguiente salida de ejemplo:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Compilar el código

Copie el código de ejemplo y péguelo en un proyecto de Visual Studio o en un archivo denominado dataflow-agent.cpp y, después, ejecute el siguiente comando en una ventana del símbolo del sistema de Visual Studio.

cl.exe /EHsc dataflow-agent.cpp

[Arriba]

Creación de un agente de registro de mensajes

En el ejemplo siguiente se muestra la clase log_agent, que es similar a la clase dataflow_agent. La clase log_agent implementa un agente de registro asincrónico que escribe los mensajes de registro en un archivo y en la consola. La clase log_agent permite que la aplicación categorice los mensajes como informativos, de advertencia o de error. También permite que la aplicación especifique si cada categoría de registro se escribe en un archivo, en la consola o en ambos. En este ejemplo se escriben todos los mensajes de registro en un archivo y solo los mensajes de error en la consola.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

En este ejemplo se escribe el resultado siguiente en la consola.

error: This is a sample error message.

Este ejemplo también muestra el archivo log.txt, que contiene el texto siguiente.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Compilar el código

Copie el código de ejemplo y péguelo en un proyecto de Visual Studio o en un archivo denominado log-filter.cpp y, después, ejecute el siguiente comando en una ventana del símbolo del sistema de Visual Studio.

cl.exe /EHsc log-filter.cpp

[Arriba]

Consulte también

Tutoriales del Runtime de simultaneidad