Share via


Procedura: Implementare vari modelli producer-consumer

Questo argomento descrive come implementare il modello producer-consumer nell'applicazione. In questo modello, il producer invia messaggi a un blocco di messaggi e il consumer legge i messaggi dal blocco.

L'argomento illustra due scenari. Nel primo scenario, il consumer deve ricevere ogni messaggio inviato dal producer. Nel secondo scenario, il consumer esegue periodicamente il polling dei dati e pertanto non deve ricevere ogni messaggio.

Entrambi gli esempi in questo argomento usano agenti, blocchi di messaggi e funzioni di passaggio dei messaggi per trasmettere messaggi dal producer al consumer. L'agente producer usa la funzione concurrency::send per scrivere messaggi in un oggetto concurrency::ITarget . L'agente consumer usa la funzione concurrency::receive per leggere i messaggi da un oggetto concurrency::ISource . Entrambi gli agenti contengono un valore sentinel per coordinare la fine dell'elaborazione.

Per altre informazioni sugli agenti asincroni, vedere Agenti asincroni. Per altre informazioni sui blocchi di messaggi e sulle funzioni di passaggio dei messaggi, vedere Blocchi di messaggi asincroni e funzioni di passaggio di messaggi.

Esempio: Inviare serie di numeri all'agente consumer

In questo esempio, l'agente producer invia una serie di numeri all'agente consumer. Il consumer riceve ognuno di questi numeri e calcola la media. L'applicazione scrive la media nella console.

Questo esempio usa un oggetto concurrency::unbounded_buffer per consentire al producer di accodare i messaggi. La unbounded_buffer classe implementa ITarget e ISource in modo che il producer e il consumer possano inviare e ricevere messaggi da e verso un buffer condiviso. Le send funzioni e receive coordinano l'attività di propagazione dei dati dal producer al consumer.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Questo esempio produce il seguente output:

The average is 50.

Esempio: Inviare una serie di quotazioni azionarie all'agente consumer

In questo esempio, l'agente producer invia una serie di quotazioni azionarie all'agente consumer. L'agente consumer legge periodicamente l'offerta corrente e la stampa nella console.

Questo esempio è simile a quello precedente, ad eccezione del fatto che usa un oggetto concurrency::overwrite_buffer per consentire al producer di condividere un messaggio con il consumer. Come nell'esempio precedente, overwrite_buffer la classe implementa ITarget e ISource in modo che il producer e il consumer possano agire su un buffer di messaggi condiviso.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

In questo esempio viene generato l'output di esempio seguente.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

A differenza di un unbounded_buffer oggetto, la receive funzione non rimuove il messaggio dall'oggetto overwrite_buffer . Se il consumer legge dal buffer dei messaggi più di una volta prima che il producer sovrascriva tale messaggio, il ricevitore ottiene lo stesso messaggio ogni volta.

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollarlo in un file denominato producer-consumer.cpp e quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio.

cl.exe /EHsc producer-consumer.cpp

Vedi anche

Libreria di agenti asincroni
Agenti asincroni
Blocchi dei messaggi asincroni
Funzioni di passaggio dei messaggi