Exemplarische Vorgehensweise: Erstellen eines Datenfluss-Agent

In diesem Dokument wird das Erstellen von agentbasierten Anwendungen auf Basis eines Datenflusses anstelle eines Kontrollflusses veranschaulicht.

Der Steuerungsfluss bezieht sich auf die Ausführungsreihenfolge von Vorgängen in einem Programm. Die Ablaufsteuerung wird mit Steuerungsstrukturen, z. B. Bedingungsanweisungen, Schleifen usw., geregelt. Alternativ bezieht sich der Datenfluss auf ein Programmiermodell, bei dem Berechnungen nur vorgenommen werden, wenn alle erforderlichen Daten verfügbar sind. Das Datenflussprogrammiermodell bezieht sich auf das Konzept der Nachrichtenübergabe, bei dem unabhängige Komponenten eines Programms miteinander kommunizieren, indem sie Nachrichten senden.

Asynchrone Agents unterstützen sowohl das Ablaufsteuerungs- als auch das Datenflussprogrammiermodell. Das Ablaufsteuerungsmodell eignet sich in vielen Fällen, jedoch ist in anderen Fällen das Datenflussmodell angemessen, z. B. wenn ein Agent Daten empfängt und eine Aktion ausführt, die auf der Nutzlast dieser Daten basiert.

Voraussetzungen

Lesen Sie die folgenden Dokumente, bevor Sie mit dieser exemplarischen Vorgehensweise beginnen:

Abschnitte

Diese exemplarische Vorgehensweise enthält folgende Abschnitte:

Erstellen eines grundlegenden Kontrollfluss-Agents

Betrachten Sie das folgende Beispiel, in dem die control_flow_agent-Klasse definiert wird. Die control_flow_agent-Klasse wird auf drei Nachrichtenpuffer angewendet: einen Eingabepuffer und zwei Ausgabepuffer. Die run-Methode liest in einer Schleife aus dem Quellnachrichtenpuffer und steuert den Fluss der Programmausführung mit einer Bedingungsanweisung. Der Agent erhöht einen Zähler für negative Werte ungleich 0 (null) und einen anderen Zähler für positive Werte ungleich 0 (null). Nachdem der Agent den Sentinelwert 0 (null) empfangen hat, sendet er die Werte der Zähler an die Ausgabenachrichtenpuffer. Die negatives-Methode und die positives-Methode ermöglichen es der Anwendung, die Anzahl der negativen und positiven Werte aus dem Agent zu lesen.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

In diesem Beispiel wird zwar nur eine einfache Ablaufsteuerung in einem Agent verwendet, es veranschaulicht jedoch, dass die Programmierung mit Ablaufsteuerung serielle Vorgänge regelt. Die einzelnen Nachrichten müssen nacheinander verarbeitet werden, auch wenn im Eingabenachrichtenpuffer mehrere Nachrichten verfügbar sind. Das Datenflussmodell ermöglicht die gleichzeitige Auswertung beider Branches der Bedingungsanweisung. Das Datenflussmodell ermöglicht Ihnen außerdem das Erstellen von komplexeren Nachrichtennetzwerken, die auf Daten angewendet werden, sobald sie verfügbar sind.

[Nach oben]

Erstellen eines einfachen Dataflow-Agents

In diesem Abschnitt wird gezeigt, wie die control_flow_agent-Klasse konvertiert wird, um die gleiche Aufgabe mit dem Datenflussmodell auszuführen.

Der Datenfluss-Agent erstellt ein Netzwerk von Nachrichtenpuffern, die jeweils einem bestimmten Zweck dienen. Bestimmte Nachrichtenblöcke verwenden eine Filterfunktion, um eine Nachricht auf Grundlage ihrer Nutzlast anzunehmen oder abzulehnen. Eine Filterfunktion stellt sicher, dass ein Nachrichtenblock nur bestimmte Werte empfängt.

So konvertieren Sie einen Ablaufsteuerungs-Agent in einen Datenfluss-Agent

  1. Kopieren Sie den Text der control_flow_agent-Klasse in eine andere Klasse, z. B. dataflow_agent. Alternativ können Sie die control_flow_agent-Klasse umbenennen.

  2. Entfernen Sie den Text der Schleife, die in der receive-Methode run aufruft.

void run()
{
   // Counts the number of negative and positive values that
   // the agent receives.
   size_t negative_count = 0;
   size_t positive_count = 0;

   // Write the counts to the message buffers.
   send(_negatives, negative_count);
   send(_positives, positive_count);

   // Set the agent to the completed state.
   done();
}
  1. Fügen Sie in der run-Methode nach der Initialisierung der Variablen negative_count und positive_count ein countdown_event-Objekt hinzu, das die Anzahl der aktiven Vorgänge nachverfolgt.
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;

Die countdown_event-Klasse wird weiter unten in diesem Thema dargestellt.

  1. Erstellen Sie die Nachrichtenpufferobjekte, die am Datenflussnetzwerk beteiligt sind.
 //
 // Create the members of the dataflow network.
 //

 // Increments the active counter.
 transformer<int, int> increment_active(
    [&active](int value) -> int {
       active.add_count();
       return value;
    });

 // Increments the count of negative values.
 call<int> negatives(
    [&](int value) {
       ++negative_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value < 0;
    });

 // Increments the count of positive values.
 call<int> positives(
    [&](int value) {
       ++positive_count;
       // Decrement the active counter.
       active.signal();
    },
    [](int value) -> bool {
       return value > 0;
    });

 // Receives only the sentinel value of 0.
 call<int> sentinel(
    [&](int value) {            
       // Decrement the active counter.
       active.signal();
       // Set the sentinel event.
       received_sentinel.set();
    },
    [](int value) -> bool { 
       return value == 0; 
    });

 // Connects the _source message buffer to the rest of the network.
 unbounded_buffer<int> connector;
  1. Verbinden Sie die Nachrichtenpuffer zu einem Netzwerk.
//
// Connect the network.
//

// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);

// Connect the _source buffer to the internal network to 
// begin data flow.
_source.link_target(&increment_active);
  1. Warten Sie, bis das event-Objekt und das countdown event-Objekt festgelegt sind. Diese Ereignisse signalisieren, dass der Agent den Sentinelwert empfangen hat und dass alle Vorgänge abgeschlossen sind.
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();

Das folgende Diagramm zeigt das gesamte Datenflussnetzwerk für die dataflow_agent-Klasse:

The dataflow network.

In der folgenden Tabelle werden die Member des Netzwerks beschrieben.

Member Beschreibung
increment_active Ein Parallelitätsobjekt::transformator , das den aktiven Ereigniszähler erhöht und den Eingabewert an den Rest des Netzwerks übergibt.
negatives, positives concurrency::call objects that increment the count of numbers and decrement the active event counter. Die einzelnen Objekte verwenden einen Filter, um entweder positive Zahlen oder negative Zahlen zu akzeptieren.
sentinel Ein Parallelitätsobjekt::call-Objekt , das nur den Sentinelwert null akzeptiert und den aktiven Ereigniszähler verringert.
connector Ein Parallelitätsobjekt::unbounded_buffer , das den Quellnachrichtenpuffer mit dem internen Netzwerk verbindet.

Da die run-Methode in einem eigenen Thread aufgerufen wird, können andere Threads Nachrichten an das Netzwerk senden, bevor dieses vollständig verbunden ist. Der _source-Datenmember ist ein unbounded_buffer-Objekt, das alle Eingaben puffert, die von der Anwendung an den Agent gesendet werden. Um sicherzustellen, dass das Netzwerk alle Eingabenachrichten verarbeitet, verknüpft der Agent zunächst die internen Knoten des Netzwerks, und anschließend verknüpft er den Anfang dieses Netzwerk (connector) mit dem _source-Datenmember. Dies stellt sicher, dass keine Nachrichten verarbeitet werden, während das Netzwerk erstellt wird.

Da das Netzwerk in diesem Beispiel auf Datenfluss und nicht auf Ablaufsteuerung basiert, muss es dem Agent mitteilen, dass die Verarbeitung der einzelnen Eingabewerte abgeschlossen wurde und der Sentinelknoten den entsprechenden Wert empfangen hat. In diesem Beispiel wird ein countdown_event Objekt verwendet, um zu signalisieren, dass alle Eingabewerte verarbeitet wurden, und ein Parallelitätsobjekt::event , um anzugeben, dass der Sentinelknoten seinen Wert empfangen hat. Die countdown_event-Klasse signalisiert mit einem event-Objekt, dass ein Zähler den Wert 0 (null) erreicht hat. Der Anfang des Datenflussnetzwerks erhöht den Zähler jedes Mal, wenn bei ihm ein Wert empfangen wird. Jeder Terminalknoten des Netzwerks verringert den Zähler, nachdem er den Eingabewert verarbeitet hat. Nachdem der Agent das Datenflussnetzwerk erstellt hat, wartet er, bis der Sentinelknoten das event-Objekt festgelegt und das countdown_event-Objekt signalisiert hat, dass sein Zähler 0 (null) erreicht hat.

Im folgenden Beispiel werden die Klassen control_flow_agent, dataflow_agent und countdown_event gezeigt. Die wmain-Funktion erstellt ein control_flow_agent-Objekt und ein dataflow_agent-Objekt und sendet mit der send_values-Funktion eine Reihe von Zufallswerten an die Agents.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }
     
   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }
   
   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }
 
private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;
      
      //
      // Create the members of the dataflow network.
      //
     
      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) -> bool { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;
       
      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();
           
      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;
   
   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&cf_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();
   
   // Send values to the agent.
   send_values(source, sentinel, count);
   
   // Wait for the agent to finish.
   agent::wait(&df_agent);
   
   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Dieses Beispiel erzeugt die folgende Beispielausgabe:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Kompilieren des Codes

Kopieren Sie den Beispielcode, fügen Sie ihn in ein Visual Studio-Projekt ein, oder fügen Sie ihn in eine Datei ein, die benannt dataflow-agent.cpp ist, und führen Sie dann den folgenden Befehl in einem Visual Studio-Eingabeaufforderungsfenster aus.

cl.exe /EHsc dataflow-agent.cpp

[Nach oben]

Erstellen eines Nachrichtenprotokollierungs-Agents

Im folgenden Beispiel wird die log_agent-Klasse veranschaulicht, die der dataflow_agent-Klasse ähnelt. Die log_agent-Klasse implementiert einen asynchronen Protokollierungs-Agent, der Protokollmeldungen in eine Datei und an die Konsole ausgibt. Mit der log_agent-Klasse kann die Anwendung Nachrichten als Informationsmeldung, Warnmeldung oder Fehlermeldung kategorisieren. Sie ermöglicht es der Anwendung außerdem anzugeben, ob die Protokollmeldungen der jeweiligen Kategorie in eine Datei, an die Konsole oder sowohl in eine Datei als auch an die Konsole ausgegeben werden. In diesem Beispiel werden alle Protokollmeldungen in eine Datei und nur Fehlermeldungen an die Konsole ausgegeben.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

In diesem Beispiel wird die folgende Ausgabe in die Konsole geschrieben.

error: This is a sample error message.

In diesem Beispiel wird außerdem die Datei log.txt erzeugt, die den folgenden Text enthält.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Kompilieren des Codes

Kopieren Sie den Beispielcode, fügen Sie ihn in ein Visual Studio-Projekt ein, oder fügen Sie ihn in eine Datei ein, die benannt log-filter.cpp ist, und führen Sie dann den folgenden Befehl in einem Visual Studio-Eingabeaufforderungsfenster aus.

cl.exe /EHsc log-filter.cpp

[Nach oben]

Siehe auch

Exemplarische Vorgehensweisen für die Concurrency Runtime