Praktik Terbaik di Pustaka Agen Asinkron

Dokumen ini menjelaskan cara memanfaatkan Pustaka Agen Asinkron yang efektif. Pustaka Agen mempromosikan model pemrograman berbasis aktor dan pesan dalam proses yang diteruskan untuk aliran data dan tugas alur yang kasar.

Untuk informasi selengkapnya tentang Pustaka Agen, lihat Pustaka Agen Asinkron.

Bagian

Dokumen ini berisi bagian berikut:

Gunakan Agen untuk Mengisolasi Status

Pustaka Agen menyediakan alternatif untuk status bersama dengan memungkinkan Anda menghubungkan komponen terisolasi melalui mekanisme pengiriman pesan asinkron. Agen asinkron paling efektif ketika mereka mengisolasi status internal mereka dari komponen lain. Dengan mengisolasi status, beberapa komponen biasanya tidak bertindak pada data bersama. Isolasi status dapat memungkinkan aplikasi Anda untuk menskalakan karena mengurangi ketidakcocokan pada memori bersama. Isolasi status juga mengurangi kemungkinan kebuntuan dan kondisi balapan karena komponen tidak perlu menyinkronkan akses ke data bersama.

Anda biasanya mengisolasi status dalam agen dengan menahan anggota data di private bagian atau protected kelas agen dan dengan menggunakan buffer pesan untuk mengomunikasikan perubahan status. Contoh berikut menunjukkan basic_agent kelas, yang berasal dari konkurensi::agent. Kelas basic_agent menggunakan dua buffer pesan untuk berkomunikasi dengan komponen eksternal. Satu buffer pesan menyimpan pesan masuk; buffer pesan lainnya menyimpan pesan keluar.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Untuk contoh lengkap tentang cara menentukan dan menggunakan agen, lihat Panduan: Membuat Aplikasi dan Panduan Berbasis Agen: Membuat Agen Aliran Data.

[Atas]

Menggunakan Mekanisme Pembatasan untuk Membatasi Jumlah Pesan dalam Alur Data

Banyak jenis buffer pesan, seperti konkurensi::unbounded_buffer, dapat menyimpan pesan dalam jumlah tak terbatas. Ketika produsen pesan mengirim pesan ke alur data lebih cepat daripada konsumen dapat memproses pesan ini, aplikasi dapat memasukkan status memori rendah atau kehabisan memori. Anda dapat menggunakan mekanisme pembatasan, misalnya, semaphore, untuk membatasi jumlah pesan yang secara bersamaan aktif dalam alur data.

Contoh dasar berikut menunjukkan cara menggunakan semaphore untuk membatasi jumlah pesan dalam alur data. Alur data menggunakan fungsi konkurensi::tunggu untuk mensimulasikan operasi yang membutuhkan setidaknya 100 milidetik. Karena pengirim menghasilkan pesan lebih cepat daripada yang dapat diproses konsumen pesan tersebut semaphore , contoh ini mendefinisikan kelas untuk memungkinkan aplikasi membatasi jumlah pesan aktif.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

Objek semaphore membatasi alur untuk memproses paling banyak dua pesan secara bersamaan.

Produsen dalam contoh ini mengirim pesan yang relatif sedikit kepada konsumen. Oleh karena itu, contoh ini tidak menunjukkan potensi kondisi memori rendah atau kehabisan memori. Namun, mekanisme ini berguna ketika alur data berisi jumlah pesan yang relatif tinggi.

Untuk informasi selengkapnya tentang cara membuat kelas semaphore yang digunakan dalam contoh ini, lihat Cara: Menggunakan Kelas Konteks untuk Menerapkan Semaphore Kooperatif.

[Atas]

Jangan Lakukan Pekerjaan Halus dalam Alur Data

Pustaka Agen paling berguna ketika pekerjaan yang dilakukan oleh alur data cukup kasar. Misalnya, satu komponen aplikasi mungkin membaca data dari file atau koneksi jaringan dan kadang-kadang mengirim data tersebut ke komponen lain. Protokol yang digunakan Pustaka Agen untuk menyebarluaskan pesan menyebabkan mekanisme pengiriman pesan memiliki lebih banyak overhead daripada konstruksi paralel tugas yang disediakan oleh Pustaka Pola Paralel (PPL). Oleh karena itu, pastikan bahwa pekerjaan yang dilakukan oleh alur data cukup panjang untuk mengimbangi overhead ini.

Meskipun alur data paling efektif ketika tugasnya kasar, setiap tahap alur data dapat menggunakan konstruksi PPL seperti grup tugas dan algoritma paralel untuk melakukan pekerjaan yang lebih halus. Untuk contoh jaringan data kasar yang menggunakan paralelisme terperinci pada setiap tahap pemrosesan, lihat Panduan: Membuat Jaringan Pemrosesan Gambar.

[Atas]

Jangan Lewati Payload Pesan Besar berdasarkan Nilai

Dalam beberapa kasus, runtime membuat salinan setiap pesan yang diteruskannya dari satu buffer pesan ke buffer pesan lain. Misalnya, kelas konkurensi::overwrite_buffer menawarkan salinan setiap pesan yang diterimanya ke setiap targetnya. Runtime juga membuat salinan data pesan saat Anda menggunakan fungsi pengiriman pesan seperti konkurensi::send dan konkurensi::receive untuk menulis pesan ke dan membaca pesan dari buffer pesan. Meskipun mekanisme ini membantu menghilangkan risiko penulisan bersamaan ke data bersamaan, mekanisme ini dapat menyebabkan performa memori yang buruk ketika payload pesan relatif besar.

Anda dapat menggunakan pointer atau referensi untuk meningkatkan performa memori saat Anda meneruskan pesan yang memiliki payload besar. Contoh berikut membandingkan meneruskan pesan besar menurut nilai dengan meneruskan penunjuk ke jenis pesan yang sama. Contoh mendefinisikan dua jenis agen, producer dan consumer, yang bertindak pada message_data objek. Contoh membandingkan waktu yang diperlukan bagi produsen untuk mengirim beberapa message_data objek ke konsumen dengan waktu yang diperlukan agen produsen untuk mengirim beberapa pointer ke message_data objek ke konsumen.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Contoh ini menghasilkan contoh output berikut:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Versi yang menggunakan pointer berkinerja lebih baik karena menghilangkan persyaratan untuk runtime untuk membuat salinan lengkap setiap message_data objek yang diteruskan dari produsen ke konsumen.

[Atas]

Menggunakan shared_ptr di Jaringan Data Saat Kepemilikan Tidak Ditentukan

Saat Anda mengirim pesan dengan penunjuk melalui alur atau jaringan yang melewati pesan, Anda biasanya mengalokasikan memori untuk setiap pesan di bagian depan jaringan dan membebaskan memori tersebut di akhir jaringan. Meskipun mekanisme ini sering bekerja dengan baik, ada kasus di mana sulit atau tidak mungkin untuk menggunakannya. Misalnya, pertimbangkan kasus di mana jaringan data berisi beberapa simpul akhir. Dalam hal ini, tidak ada lokasi yang jelas untuk membebaskan memori untuk pesan.

Untuk mengatasi masalah ini, Anda dapat menggunakan mekanisme, misalnya, std::shared_ptr, yang memungkinkan penunjuk dimiliki oleh beberapa komponen. Ketika objek akhir shared_ptr yang memiliki sumber daya dihancurkan, sumber daya juga dibebesarkan.

Contoh berikut menunjukkan cara menggunakan shared_ptr untuk berbagi nilai penunjuk di antara beberapa buffer pesan. Contoh menyambungkan objek konkurensi::overwrite_buffer ke tiga objek konkurensi::panggilan . Kelas ini overwrite_buffer menawarkan pesan ke setiap targetnya. Karena ada beberapa pemilik data di akhir jaringan data, contoh ini menggunakan shared_ptr untuk memungkinkan setiap call objek berbagi kepemilikan pesan.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Contoh ini menghasilkan contoh output berikut:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Baca juga

Praktik Terbaik Runtime Konkurensi
Pustaka Agen Asinkron
Panduan: Membuat Aplikasi Berbasis Agen
Panduan: Membuat Agen Aliran Data
Panduan: Membuat Jaringan Pemrosesan Gambar
Praktik Terbaik di Pustaka Pola Paralel
Praktik Terbaik Umum dalam Runtime Konkurensi