.NET Matters

ThreadPool을 사용한 순차 실행

Stephen Toub

Q 시스템에 작업을 비동기적으로 실행해야 하는 구성 요소가 많이 있는데, 제 생각에는 Microsoft .NET Framework ThreadPool이 적절한 해결책 같습니다. 다만 독특한 요구 사항이 있습니다. 즉, 각 구성 요소에서 작업 항목은 순서에 따라 처리되어야 하고, 동시에 두 개의 작업 항목이 실행되면 안 됩니다. 다만 여러 구성 요소가 동시에 실행되는 것은 문제 없으며 사실 그렇게 되는 것이 좋습니다. 어떻게 해야 좋을까요?

A 이는 메시지 전달을 기반을 하는 시나리오를 포함한 다양한 중요 시나리오에서 볼 수 있는 상황으로, 생각만큼 어려운 문제는 아닐 수도 있습니다. 항상 파이프라인의 여러 단계를 활성 상태로 유지하는 방법으로 병렬 처리의 장점을 활용하는 파이프라인 구현을 생각해 보십시오.

예를 들어 파일에서 데이터를 읽고 이를 압축 및 암호화하여 새 파일로 쓰는 파이프라인이 있습니다. 압축과 암호화를 동시에 수행할 수는 있지만 이 둘을 동시에 같은 데이터에 대해 수행할 수는 없습니다. 둘 중 하나의 출력이 다른 하나의 입력이 되어야 하기 때문입니다. 압축 루틴은 일부 데이터를 압축하여 이를 암호화 루틴으로 보내고, 암호화 루틴에서 데이터를 처리하는 시점에 다음 데이터 부분에 대한 압축 작업을 실행할 수 있습니다.

많은 압축 및 암호화 알고리즘이 상태를 유지 관리하고, 이 상태는 이후 데이터의 압축 및 암호화 방식에 영향을 미치므로 순서를 유지하는 것이 중요합니다. 이 예에서 파일을 다룬다는 점은 개의치 마십시오. 모든 데이터가 올바른 순서대로 배치된 원래 상태로 돌아가도록 결과물의 암호를 해독하고 압축을 해제할 수 있다면 좋을 것입니다.

여러 가지 가능한 해결책이 있습니다. 첫 번째는 각 구성 요소당 하나의 스레드를 할애하는 것입니다. 이 DedicatedThread는 실행할 작업 항목의 FIFO(선입선출) 큐와 이 큐를 서비스하는 하나의 스레드를 사용합니다. 구성 요소를 실행해야 하면 DedicatedThread는 해당 작업을 큐에 넣고, 최종적으로 스레드가 이 작업을 꺼내 실행하게 됩니다. 스레드는 하나뿐이므로 한 번에 하나의 항목만 실행됩니다. 또한 FIFO 큐를 사용하므로 작업 항목은 생성된 순서에 따라 처리됩니다.

2009년 1월호 .NET Matters 칼럼에 나온 예와 마찬가지로 간단한 WorkItem 클래스를 사용하여 실행할 작업을 나타냅니다(그림 1 참조). 이 WorkItem 형식을 사용하는 DedicatedThread 구현은 그림 2에 나와 있습니다. 구현의 대부분은 네이티브 BlockingQueue<T> 구현입니다(.NET Framework 4.0에는 이러한 구현에 적합한 BlockingCollection<T> 형식이 포함되어 있음). DedicatedThread 생성자는 BlockingQueue<T> 인스턴스를 만든 다음, 큐에서 다른 항목을 계속 기다렸다가 실행하는 스레드를 준비합니다.

그림 1 작업 항목 캡처

internal class WorkItem {
  public WaitCallback Callback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = s => {
    var item = (WorkItem)s;
    item.Callback(item.State);
 };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else Callback(State);
  }
}

그림 2 DedicatedThread 구현

public class DedicatedThread {
  private BlockingQueue<WorkItem> _workItems = 
    new BlockingQueue<WorkItem>();

  public DedicatedThread() {
    new Thread(() => {
      while (true) { workItems.Dequeue().Execute(); }
    }) { IsBackground = true }.Start();
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _workItems.Enqueue(new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() });
  }

  private class BlockingQueue<T> {
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _gate = new Semaphore(0, Int32.MaxValue);

    public void Enqueue(T item) {
      lock (_queue) _queue.Enqueue(item);
      _gate.Release();
    }

    public T Dequeue() {
      _gate.WaitOne();
      lock (_queue) return _queue.Dequeue();
    }
  }
}

이는 문제의 시나리오를 위한 기본적인 기능을 제공하고 요구 사항도 충족할 수 있겠지만 중요한 단점이 있습니다. 첫째, 스레드가 각 구성 요소에 예약됩니다. 구성 요소가 한두 개라면 별 문제가 되지 않지만 구성 요소가 많은 경우에는 스레드의 수가 폭발적으로 증가할 수 있습니다. 그렇게 되면 성능은 떨어지게 됩니다.

또한 이 구현은 그다지 견고하지 않습니다. 예를 들어 구성 요소 하나를 종료해야 하는 경우, 스레드에 차단을 중지하라는 지시를 어떻게 내려야 할까요? 그리고 작업 항목에서 예외가 발생하는 경우에는 어떻게 될까요?

이와는 별개로 이 방법은 Windows가 일반적인 메시지 펌프에서 사용하는 방법과 비슷합니다. 메시지 펌프는 메시지 도착을 기다렸다가 발송(처리)한 후 다시 메시지를 기다리는 상태로 돌아가는 루프입니다. 특정 창에 대한 메시지는 하나의 스레드에 의해 처리됩니다. 이 둘의 유사점은 그림 3의 코드를 통해 볼 수 있습니다. 이 코드의 동작은 그림 2의 코드와 거의 동일합니다. Control을 만드는 새 스레드가 준비되어 핸들이 초기화되었음을 확인하고 Application.Run을 사용하여 메시지 루프를 실행합니다. 작업 항목을 이 스레드에 대한 큐에 넣으려면 Control의 BeginInvoke 메서드를 사용하면 됩니다. 이 방법을 권장하지는 않지만 어쨌든 높은 수준에서 볼 때 기본적인 개념은 DedicatedThread 해결책에서 이미 살펴본 내용과 동일합니다.

그림 3 UI 메시지 루프와의 유사성

public class WindowsFormsDedicatedThread {
  private Control _control;

  public WindowsFormsDedicatedThread() {
    using (var mre = new ManualResetEvent(false)) {
      new Thread(() => {
        _control = new Control();
        var forceHandleCreation = _control.Handle;
        mre.Set();
        Application.Run();
      }) { IsBackground = true }.Start();
      mre.WaitOne();
    }
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _control.BeginInvoke(callback, state);
  }
} 

두 번째 해결책은 실행에 ThreadPool을 사용하는 것입니다. 전용 큐를 서비스하는 새로운 사용자 지정 스레드를 준비하는 대신 구성 요소별로 큐를 유지함으로써 같은 큐에서 두 개의 요소가 동시에 서비스되는 일이 없도록 합니다. 이렇게 하면 ThreadPool이 알아서 필요한 스레드의 수를 제어하고 스레드 주입 및 폐기를 처리하고 안정성 문제를 처리하고 작업자가 새 스레드를 준비(대부분의 경우 바람직하지 않음)할 필요가 없도록 해 줍니다.

이 해결책의 구현은 그림 4에 나와 있습니다. FifoExecution 클래스는 두 개의 필드만 유지 관리합니다. 하나는 처리할 작업 항목의 큐, 다른 하나는 작업 항목 처리 요청이 ThreadPool로 전달되었는지 여부를 나타내는 부울 값 필드입니다. 두 필드 모두 작업 항목 목록의 잠금에 의해 보호됩니다. 구현의 나머지 부분은 두 메서드입니다.

그림 4 FifoExecution 구현

public class FifoExecution {
  private Queue<WorkItem> _workItems = new Queue<WorkItem>();
  private bool _delegateQueuedOrRunning = false;

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    var item = new WorkItem { 
      Callback = callback, State = state, 
      Context = ExecutionContext.Capture() };
    lock (_workItems) {
      _workItems.Enqueue(item);
      if (!_delegateQueuedOrRunning) {
        _delegateQueuedOrRunning = true;
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
      }
    }
  }

  private void ProcessQueuedItems(object ignored) {
    while (true) {
      WorkItem item;
      lock (_workItems) {
        if (_workItems.Count == 0) {
          _delegateQueuedOrRunning = false;
          break;
        }
        item = _workItems.Dequeue();
      }
      try { item.Execute(); }
      catch {
        ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
          null);
        throw;
      }
    }
  }
}

첫 번째 메서드는 QueueUserWorkItem입니다. 이 메서드 시그니처는 ThreadPool에 의해 노출된 것과 일치합니다(ThreadPool은 WaitCallback만 받는 편리한 오버로드도 제공함). 이 메서드는 먼저 저장될 WorkItem을 만든 다음 잠금을 획득합니다. WorkItem을 만드는 동안에는 공유 상태에 액세스하지 않습니다. 따라서 잠금을 최소한으로 유지하려면 잠금을 획득하기 전에 이 항목 캡처를 수행해야 합니다. 잠금이 확보되면 만들어진 작업 항목은 작업 항목 큐에 들어갑니다.

그런 다음 이 메서드는 큐에 저장된 작업 항목의 처리 요청이 ThreadPool에 전달되었는지 여부를 확인하고, 아직 전달되지 않은 경우 요청을 전달하고 나중을 위해 이를 기록해 둡니다. ThreadPool에 대한 이 요청은 ProcessQueuedItems 메서드를 실행하기 위해 ThreadPool 스레드 중 하나를 사용하기 위한 요청입니다.

ThreadPool 스레드에 의해 호출된 ProcessQueuedItems는 루프로 들어갑니다. 루프에서 잠금을 획득하고, 잠금을 유지하는 동안 처리할 다른 작업 항목이 있는지 여부를 확인합니다. 더 이상 처리할 작업 항목이 없으면 요청 플래그를 재설정하고(이후 큐 항목이 다시 풀 처리를 요청할 수 있도록) 루프를 빠져나옵니다. 처리할 작업 항목이 더 있으면 다음 항목을 잡고 잠금을 해제하고 처리를 실행한 후 처음부터 다시 시작합니다. 큐에 더 이상 항목이 없을 때까지 이 과정이 반복됩니다.

단순하지만 강력한 구현입니다. 이제 구성 요소에서 FifoExecution 인스턴스를 만들고 이를 사용하여 작업 항목을 예약할 수 있습니다. FifoExecution 인스턴스별로 한 번에 하나의 큐 작업 항목만 실행될 수 있으며, 큐의 작업 항목은 큐에 저장된 순서대로 실행됩니다. 또한 다른 FifoExecution 인스턴스의 작업 항목은 동시에 실행될 수 있습니다. 가장 좋은 부분은 이제 스레드 관리에서 벗어나 스레드 관리의 어려우면서도 중요한 작업을 ThreadPool에 맡길 수 있다는 점입니다.

모든 구성 요소의 풀이 작업으로 포화된 상태가 유지되는 극단적인 경우 ThreadPool은 원래의 DedicatedThread와 마찬가지로 구성 요소당 하나의 스레드를 갖게 될 가능성이 높습니다. 그러나 이러한 현상은 ThreadPool에 의해 적절하다고 판단되는 경우에만 발생합니다. 구성 요소의 풀이 포화된 상태로 유지되지 않으면 필요한 스레드의 수는 훨씬 더 적습니다.

ThreadPool이 예외를 적절히 처리할 수 있다는 부가적인 이점도 있습니다. DedicatedThread 구현에서 항목 처리에 예외가 발생하는 경우 어떻게 될까요? 스레드는 중단되지만 응용 프로그램 구성에 따라 프로세스는 종료되지 않을 수 있습니다. 이 경우 작업 항목은 DedicatedThread 큐에 저장되기 시작하지만 어떤 작업 항목도 처리되지 않습니다. FifoExecution을 사용할 경우 ThreadPool은 사라진 스레드를 보상하기 위해 더 많은 스레드를 추가하게 됩니다.

그림 5는 FifoExecution 클래스를 사용하는 간단한 데모 응용 프로그램을 보여 줍니다. 이 응용 프로그램의 파이프라인에는 세 가지 단계가 있습니다. 각 단계는 현재 작업 중인 데이터 조각의 ID를 기록합니다(루프 반복). 그런 다음 몇 가지 작업을 수행하고(여기에서는 Thread.SpinWait로 나타냄) 다음 단계로 데이터를 전달합니다(역시 루프 반복). 각 단계는 결과를 분리하여 보기 쉽도록 여러 탭으로 정보를 출력합니다. 그림 6의 출력에서 볼 수 있듯이 각 단계(열)는 올바른 순서에 따라 작업을 유지합니다.

그림 5 FifoExecution 데모

static void Main(string[] args) {
  var stage1 = new FifoExecution();
  var stage2 = new FifoExecution();
  var stage3 = new FifoExecution();

  for (int i = 0; i < 100; i++) {
    stage1.QueueUserWorkItem(one => {
      Console.WriteLine("" + one);
      Thread.SpinWait(100000000);

      stage2.QueueUserWorkItem(two => {
        Console.WriteLine("\t\t" + two);
        Thread.SpinWait(100000000);

        stage3.QueueUserWorkItem(three => {
          Console.WriteLine("\t\t\t\t" + three);
          Thread.SpinWait(100000000);
        }, two);
      }, one);
    }, i);
  }

   Console.ReadLine();
}

그림 6 데모 응용 프로그램의 출력

파이프라인의 단계 사이에 형평성이 부족하다는 부분도 흥미롭습니다. 예를 들어 그림 6의 stage1은 이미 21번째 반복으로 들어가고 있지만 stage2는 13번째, stage3은 9번째 반복까지만 진행된 것을 볼 수 있습니다. 주 원인은 ProcessQueuedItems 구현입니다. 샘플 응용 프로그램은 100개의 작업 항목을 stage1에 매우 신속하게 밀어넣고, 그 결과 stage1에 사용되는 풀이 ProcessQueuedItems 루프를 점유한 채 더 이상 stage1 작업이 없을 때까지 반환되지 않습니다. 이는 다른 단계 관점에서 공정하지 않습니다. 사용 중인 응용 프로그램에서 이와 비슷한 현상이 발생하고 이것이 문제가 된다면 다음과 같이 ProcessQueuedItems 구현을 수정하여 단계 간의 형평성을 높일 수 있습니다.

private void ProcessQueuedItems(object ignored) {
  WorkItem item;
  lock (_workItems) {
    if (_workItems.Count == 0) {
      _delegateQueuedOrRunning = false;
      return;
    }
    item = _workItems.Dequeue();
  }
  try { item.Execute(); }
  finally {
    ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems,
      null);
  }
}

이제 처리할 항목이 더 있더라도 ProcessQueuedItems는 루프를 돌지 않고, 대신 재귀적으로 자신을 ThreadPool 큐에 저장하여 자신의 우선 순위를 다른 단계의 항목 뒤로 설정합니다. 이렇게 수정하면 그림 5에 나온 응용 프로그램의 출력은 이제 그림 7과 같이 됩니다. 새 출력을 보면 예약에서 stage2와 stage3을 전보다 더 공정하게 처리함을 확인할 수 있습니다(단계 간에 아직 약간의 차이가 있지만 파이프라인임을 감안하면 어쩔 수 없는 현상임).

그림 7 보다 공평한 예약을 볼 수 있는 새로운 출력

물론 형평성을 높이기 위해서는 대가가 따릅니다. 이제 각 작업 항목은 스케줄러를 추가로 거쳐야 하며 여기에서 비용이 발생합니다. 응용 프로그램에서 이 타협을 수용할 것인지 여부를 결정해야 합니다. 예를 들어 작업 항목에서 수행 중인 작업의 규모가 크다면 이 오버헤드는 무시해도 되며 사실 인지할 수도 없을 것입니다.

이는 ThreadPool을 기반으로 직접 사용자 지정 스레드 풀을 만들지 않고도 기능을 추가하는 시스템을 구축하는 방법의 한 가지 예일 뿐입니다. 다른 예는 MSDN Magazine이전 .NET Matters 칼럼을 참조하십시오.

질문이나 의견이 있으면 netqa@microsoft.com으로 보내시기 바랍니다.

Stephen Toub은 Microsoft Parallel Computing Platform 팀의 선임 프로그램 관리자이며, MSDN Magazine의 기고 편집자이기도 합니다.