방법: 초과 구독을 사용하여 대기 오프셋

초과 구독은 대기 시간이 많은 작업을 포함하는 일부 애플리케이션의 전반적인 효율성을 향상시킬 수 있습니다. 이 항목에서는 초과 구독을 사용하여 네트워크 연결에서 데이터를 읽음으로 인해 발생하는 대기 시간을 오프셋하는 방법을 보여 줍니다.

예시

이 예제에서는 비동기 에이전트 라이브러리를 사용하여 HTTP 서버에서 파일을 다운로드합니다. 클래스는 http_reader 동시성::agent에서 파생되며, 메시지를 전달하여 다운로드할 URL 이름을 비동기적으로 읽습니다.

클래스는 http_reader 동시성::task_group 클래스를 사용하여 각 파일을 동시에 읽습니다. 각 태스크는 현재 컨텍스트에서 초과 구독을 사용하도록 설정하기 위해 true 매개 변수가 설정된 동시성::Context::Oversubscribe 메서드 _BeginOversubscription 를 호출합니다. 그런 다음 각 태스크는 MFC(Microsoft Foundation Classs) CInternetSessionCHttpFile 클래스를 사용하여 파일을 다운로드합니다. 마지막으로, 각 태스크는 매개 변수가 설정된 상태에서 호출 Context::Oversubscribe_BeginOversubscription 하여 false 초과 구독을 사용하지 않도록 설정합니다.

초과 구독을 사용하도록 설정하면 런타임에서 작업을 실행할 스레드를 하나 더 만듭니다. 이러한 각 스레드는 현재 컨텍스트를 초과 구독하여 추가 스레드를 만들 수도 있습니다. 클래스는 http_reader 동시성::unbounded_buffer 개체를 사용하여 애플리케이션에서 사용하는 스레드 수를 제한합니다. 에이전트는 고정된 수의 토큰 값을 사용하여 버퍼를 초기화합니다. 각 다운로드 작업에 대해 에이전트는 작업이 시작되기 전에 버퍼에서 토큰 값을 읽은 다음 작업이 완료된 후 해당 값을 버퍼에 다시 씁니다. 버퍼가 비어 있으면 에이전트는 다운로드 작업 중 하나가 버퍼에 값을 다시 쓸 때까지 기다립니다.

다음 예제에서는 동시 작업 수를 사용 가능한 하드웨어 스레드 수의 두 배로 제한합니다. 이 값은 초과 구독을 실험할 때 사용하기에 좋은 시작점입니다. 특정 처리 환경에 맞는 값을 사용하거나 이 값을 동적으로 변경하여 실제 워크로드에 응답할 수 있습니다.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;
 
protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();
      
      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());
      
      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "http://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "http://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "http://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "http://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };
      
   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {
      
      // Start the agent.
      reader.start();
      
      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

이 예제에서는 4개의 프로세서가 있는 컴퓨터에서 다음 출력을 생성합니다.

Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

다른 태스크가 대기 작업이 완료될 때까지 기다리는 동안 추가 작업이 실행되기 때문에 초과 구독을 사용하도록 설정하면 이 예제가 더 빠르게 실행될 수 있습니다.

코드 컴파일

예제 코드를 복사하여 Visual Studio 프로젝트에 붙여넣거나 이름이 지정된 download-oversubscription.cpp 파일에 붙여넣은 다음 Visual Studio 명령 프롬프트 창에서 다음 명령 중 하나를 실행합니다.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp

강력한 프로그래밍

더 이상 필요하지 않은 경우 항상 초과 구독을 사용하지 않도록 설정합니다. 다른 함수에서 throw되는 예외를 처리하지 않는 함수를 고려합니다. 함수가 반환되기 전에 초과 구독을 사용하지 않도록 설정하지 않으면 추가 병렬 작업도 현재 컨텍스트를 초과 구독합니다.

RAII(리소스 취득 초기화) 패턴을 사용하여 초과 구독을 지정된 범위로 제한할 수 있습니다. RAII 패턴에 따라 데이터 구조가 스택에 할당됩니다. 해당 데이터 구조는 생성될 때 리소스를 초기화하거나 획득하고 데이터 구조가 소멸될 때 해당 리소스를 삭제하거나 해제합니다. RAII 패턴은 바깥쪽 범위가 종료되기 전에 소멸자가 호출되도록 보장합니다. 따라서 예외가 throw되거나 함수에 여러 return 문이 포함된 경우 리소스가 올바르게 관리됩니다.

다음 예제에서는 이름이 지정된 scoped_blocking_signal구조를 정의합니다. 구조체의 scoped_blocking_signal 생성자는 초과 구독을 사용하도록 설정하고 소멸자는 초과 구독을 사용하지 않도록 설정합니다.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

다음 예제에서는 함수가 반환되기 전에 RAII를 사용하도록 메서드 본 download 문을 수정하여 초과 구독을 사용하지 않도록 설정합니다. 이 기술은 메서드가 download 예외로부터 안전한지 확인합니다.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

참고 항목

컨텍스트
Context::Oversubscribe 메서드