Assuntos .NET

Acesso round robin ao ThreadPool

Stephen Toub

 

P Estou usando o ThreadPool do Microsoft .NET Framework e me deparei com uma situação que não sei ao certo como resolver. Começo com um lote grande de itens de trabalho, que são colocados na fila, e então um segundo lote menor chega, após o início do processamento do primeiro lote. Inicialmente, parte do trabalho no lote maior é expedida para todos os threads de trabalho no ThreadPool. Contudo, quando chega o segundo lote, quero uma distribuição justa, de forma que cada lote seja atendido igualmente, em vez de o primeiro lote receber toda a atenção por ter chegado primeiro.

Quando um dos lotes for concluído, quero que o lote que ainda precisar de processamento receba a atenção de todos os threads de trabalho. Há algo que eu possa fazer para criar uma camada com essa funcionalidade de lotes sobre o ThreadPool?

R Em colunas anteriores, mostrei como criar camadas com vários tipos de funções sobre o ThreadPool existente do .NET. Na edição de outubro de 2004 da MSDN Magazine, mostrei como adicionar ao ThreadPool suporte para a espera por itens de trabalho na fila (consulte "ThreadPoolWait e HandleLeakTracker"). Na edição de novembro de 2004, mostrei como adicionar suporte para prioridades de itens de trabalho (consulte "ThreadPoolPriority e MethodImplAttribute"). E, na edição de março de 2006, mostrei como adicionar suporte ao cancelamento (consulte "ThreadPool anulável"). No futuro, poderei mencionar também a edição de janeiro de 2009 como aquela em que mostrei como adicionar suporte ao agendamento round robin sobre o ThreadPool.

O problema que você deseja solucionar requer, primeiro, a compreensão de como o ThreadPool expede trabalho. Ele mantém internamente uma fila dos itens de trabalho colocados em sua fila. Quando um thread do pool está disponível para a execução de trabalho, ele retorna à fila de trabalho e capta o próximo item. A ordem na qual esse processamento ocorre não está documentada e com certeza não é confiável (pois pode mudar e provavelmente mudará em versões futuras).

Atualmente, isso é implementado de forma muito simples: uma fila FIFO (primeiro a entrar, primeiro a sair). Assim, o primeiro trabalho colocado na fila será o primeiro captado por um thread. No seu cenário de lotes, isso significa que todo o trabalho do primeiro lote estará na fila, na frente de todo o trabalho do segundo lote. Assim, todo o trabalho do primeiro lote será expedido antes do trabalho do segundo lote. Em alguns cenários, isso é o ideal. No seu caso, você precisa de mais controle.

Uma das formas mais fáceis para obter esse tipo de controle no ThreadPool é substituir seu próprio delegado por aquele que o usuário realmente quer que seja executado. Digamos, por exemplo, que você queira capturar todas as exceções sem tratamento lançadas pelo trabalho na fila e gerar um evento para cada uma delas. Para fazer isso, você pode escrever código como o mostrado na Figura 1. Então, em vez de usar ThreadPool.QueueUserWorkItem, você usaria ExceptionThreadPool.QueueUserWorkItem. O trabalho ainda seria executado pelo ThreadPool, mas os threads do pool executariam, na verdade, o delegado que você colocou na fila, em vez do fornecido pelo usuário. A invocação ao seu delegado invocaria o delegado fornecido pelo usuário, capturando as exceções e gerando o evento de destino.

Figura 1 Ajustando o ThreadPool

public static class ExceptionThreadPool {
  public static void QueueUserWorkItem(
      WaitCallback callback, object state) {
    ThreadPool.QueueUserWorkItem(delegate {
      try { callback(state); }
      catch (Exception exc) {
        var handler = UnhandledException;
        if (handler != null) 
          handler(null, 
            new UnhandledExceptionEventArgs(exc, false));
      }
    });
  }

  public static event 
    UnhandledExceptionEventHandler UnhandledException;
}

Observe que essa técnica, embora eficaz, tem um custo: um delegado extra precisa ser alocado, invocado etc. Se o custo será ou não proibitivo, somente você e seus cenários poderão determinar. Mas esse tipo de estrutura de camadas em geral é mais econômico do que escrever seu próprio pool de threads a partir do zero.

Claro que este é um exemplo muito simples, mas você pode realizar procedimentos mais complexos. O exemplo do pool de prioridades que mencionei anteriormente armazena os delegados fornecidos pelo usuário em suas próprias estruturas de dados. Depois, coloca na fila do pool um delegado substituto, que retorna e pesquisa nessas estruturas o delegado certo a ser executado, preferindo executar primeiro aqueles com as prioridades mais altas. Você pode adotar uma técnica semelhante para lidar com a sua dificuldade no caso dos lotes.

Imagine por um momento que, em vez de ter uma só fila, você tenha uma fila por lote. Cada lote coloca trabalho na fila correspondente. Nesse cenário, você usa os threads do pool para fazer round robin entre todas as filas. O delegado colocado por você na fila do ThreadPool real retorna às suas estruturas de dados e procura por trabalho, começando pela próxima fila a ser examinada. No caso de encontrar trabalho, executa aquela fila. Caso contrário, procura na próxima fila.

Dessa forma, você é capaz de realizar um agendamento justo entre os lotes. Se há apenas um lote, os threads sempre retiram trabalho daquela fila. Se há vários lotes, eles visitam cada uma das filas, fornecendo-lhes atenção praticamente igual. A Figura 2 apresenta uma visão geral da aparência dessa solução.

netmatters,fig02.gif

Figura 2 Abordagem de RoundRobinThreadPool

Para colocar isso em andamento, primeiro você precisa de uma estrutura de dados para armazenar o delegado fornecido pelo usuário. A minha representação é mostrada na Figura 3. Essa estrutura de dados contém três propriedades. As duas primeiras devem parecer familiares. São o estado que o usuário fornece a QueueUserWorkItem, que logicamente precisa ser armazenado em cache. No entanto, talvez a terceira propriedade não seja tão familiar. Associado a todos os threads de execução no .NET, há um System.Threading.ExecutionContext que representa informações como o usuário atual, qualquer estado associado ao thread lógico de execução, informações de segurança para acesso ao código, e assim por diante. É importante que esse contexto flua entre pontos assíncronos de execução.

Figura 3 Captura de um item de trabalho

internal class WorkItem {
  public WaitCallback WaitCallback;
  public object State;
  public ExecutionContext Context;

  private static ContextCallback _contextCallback = state => {
    var item = (WorkItem)state;
    item.WaitCallback(item.State);
  };

  public void Execute() {
    if (Context != null) 
      ExecutionContext.Run(Context, _contextCallback, this);
    else WaitCallback(State);
  }
}

Por exemplo, se você estiver representando uma identidade específica do Windows e chamar ThreadPool.QueueUserWorkItem, o trabalho que você colocou na fila deverá ser executado na mesma identidade do Windows. Se não for, isso indicará uma possível falha de segurança. A partir do .NET Framework 2.0, esse contexto flui automaticamente, por padrão, entre todos os pontos assíncronos no seu código: ThreadPool.QueueUserWorkItem, criação de um novo Thread, invocação de delegado assíncrona, e assim por diante.

Contudo, você precisa seguir uma série de regras diferentes com a implementação discutida aqui. Quando a ordem de colocação dos delegados na fila é alterada a partir da ordem na qual eles são executados, não há mais uma correspondência direta entre esse fluxo de ExecutionContext e os itens de trabalho fornecidos pelo usuário. Assim, sua implementação precisa armazenar corretamente em cache o ExecutionContext com o delegado fornecido pelo usuário, e depois usar o contexto capturado para executar esse delegado.

Agora que você tem um item de trabalho, vejamos a fila na qual ele será mantido (mostrada na Figura 4). A estrutura de dados de RoundRobinThreadPool.Queue em si é bastante simples. Internamente, ela contém um Queue<WorkItem> para armazenar todos os itens de trabalho fornecidos, uma referência à instância de RoundRobinThreadPool, à qual a fila é associada, e um valor booleano que denota se o método Dispose foi chamado na fila. Ela também fornece métodos QueueUserWorkItem com a mesma assinatura do ThreadPool.

Figura 4 Uma fila round robin

public sealed class RoundRobinThreadPool {  
  private List<Queue> _queues;
  ...

  public sealed class Queue : IDisposable {
    internal Queue(RoundRobinThreadPool pool) { _pool = pool; }

    internal Queue<WorkItem> _workItems = new Queue<WorkItem>();
    private RoundRobinThreadPool _pool;
    internal bool _disposed;

    public void QueueUserWorkItem(WaitCallback callback) { 
      QueueUserWorkItem(callback, null); 
    }

    public void QueueUserWorkItem(WaitCallback callback, object state) {
      if (_disposed) 
        throw new ObjectDisposedException(GetType().Name);
      var item = new WorkItem { 
        Context = ExecutionContext.Capture(), 
        WaitCallback = callback, State = state };
      lock (_pool._queues) _workItems.Enqueue(item);
      _pool.NotifyNewWorkItem();
    }

    public void Dispose() {
      if (!_disposed)  {
        lock (_pool._queues) {
          if (_workItems.Count == 0) 
            _pool.RemoveQueueNeedsLock(this);
        }
        _disposed = true;
      }
    }
  }
}

Quando QueueUserWorkItem é chamado, o retorno de chamada e o estado fornecidos pelo usuário (juntamente com o ExecutionContext atual) são capturados em um WorkItem. Então, esse trabalho é armazenado na fila genérica. Depois, o pool correspondente é notificado sobre a chegada de um novo trabalho. É importante observar que um bloqueio é usado para proteger a fila de itens de trabalho, pois QueueUserWorkItem pode ser chamado simultaneamente por vários threads, e você precisa garantir a retenção de constantes.

Observe também que o objeto bloqueado é uma lista de filas global vinda do pool. Estou utilizando um bloqueio de granularidade relativamente alta em toda a implementação. Provavelmente, uma implementação mais eficiente utilizaria um bloqueio mais refinado; por exemplo, com o uso de bloqueios individuais por fila, em vez de um para todo o RoundRobinThreadPool. Para maior facilidade de implementação e simplicidade, optei pelo bloqueio único.

O método Dispose é usado quando essa fila não é mais necessária. Em um cenário típico de lotes, uma fila é criada, o trabalho é colocado nela, e depois a fila é descartada. Se o método Dispose simplesmente removesse a fila do pool, provavelmente ela seria removida ainda contendo itens de trabalho a serem processados.

Assim, Dispose executa duas ações. Primeiro, verifica se ainda há itens de trabalho restantes. Se não houver, a fila chama o pool para ser removida. Depois, o método marca a si próprio como descartado. Você verá logo a seguir como o pool lida com a situação na qual se depara com um pool descartado que não foi removido.

A Figura 5 mostra o restante da implementação, da classe RoundRobinThreadPool propriamente dita. O pool contém quatro campos:

  • Uma lista das filas individuais mantidas pelo pool (que também funciona como o bloqueio mencionado anteriormente).
  • Uma fila padrão para o pool.
  • Um inteiro representando a fila seguinte onde procurar trabalho.
  • O delegado de retorno de chamada efetivamente colocado na fila do ThreadPool.

Figura 5 RoundRobinThreadPool

public sealed class RoundRobinThreadPool {
  private List<Queue> _queues;
  private Queue _defaultQueue;
  private int _nextQueue;
  private WaitCallback _callback;

  public RoundRobinThreadPool() {
    _queues = new List<Queue>();
    _callback = DequeueAndExecuteWorkItem;
    _nextQueue = 0;
    _defaultQueue = CreateQueue();
  }

  public Queue CreateQueue() {
    var createdQueue = new Queue(this);
    lock (_queues) _queues.Add(createdQueue);
    return createdQueue;
  }

  public void QueueUserWorkItem(WaitCallback callback)  { 
    QueueUserWorkItem(callback, null); 
  }

  public void QueueUserWorkItem(WaitCallback callback, object state) {
    _defaultQueue.QueueUserWorkItem(callback, state);
  }

  private void RemoveQueueNeedsLock(Queue queue) {
    int index = _queues.IndexOf(queue);
    if (_nextQueue >= index) _nextQueue--;
    _queues.RemoveAt(index);
  }

  private void NotifyNewWorkItem() { 
    ThreadPool.UnsafeQueueUserWorkItem(_callback, null); 
  }

  private void DequeueAndExecuteWorkItem(object ignored) {
    WorkItem item = null;

    lock (_queues) {
      var searchOrder = 
        Enumerable.Range(_nextQueue, _queues.Count - _nextQueue).
        Concat(Enumerable.Range(0, _nextQueue));

      foreach (int i in searchOrder) {
        var items = _queues[i]._workItems;
        if (items.Count > 0) {
          item = items.Dequeue();
          _nextQueue = i; 
          if (queue._disposed && 
              items.Count == 0) RemoveQueueNeedsLock(_queues[i]);
          break;
        }
      }
      _nextQueue = (_nextQueue + 1) % _queues.Count;
    }
    if (item != null) item.Execute();
  }

  ... // RoundRobinThreadPool.Queue and .WorkItem, already shown
}

Quando um RoundRobinThreadPool é inicializado, todo esse estado é configurado. A fila padrão, especificamente, é inicializada com uma chamada para o método CreateQueue. Esse método CreateQueue é o mesmo exposto publicamente para permitir que um desenvolvedor adicione outra fila ao pool (por exemplo, quando chega um novo lote de trabalho que precisa de uma fila isolada). Ele simplesmente gera uma nova instância de RoundRobinThreadPool.Queue (o tipo analisado na Figura 3), adiciona-o à lista de filas e retorna-o.

Para facilitar o uso, RoundRobinThreadPool expõe seus próprios métodos QueueUserWorkItem, que simplesmente se destinam à fila padrão criada quando o pool foi instanciado.

O próximo método a ser analisado é NotifyNewWorkItem. Você deve se lembrar de que, quando QueueUserWorkItem foi chamado em uma fila, depois de armazenar o item de trabalho, a fila chamou o método NotifyNewWorkItem no pool. Esse método simplesmente faz delegação ao verdadeiro ThreadPool, enviando um delegado que retornará ao método DequeueAndExecuteWorkItem (para ser examinado ligeiramente) e que, como seu nome indica, irá retirar da fila e executar um item de trabalho do pool round robin.

Observe que NotifyNewWorkItem chama ThreadPool.UnsafeQueueUserWorkItem, em vez de ThreadPool.QueueUserWorkItem. O prefixo "Unsafe" indica apenas que não há fluxo de ExecutionContext; isso representa vantagem em termos de desempenho. E, como a implementação já manipula manualmente o fluxo de ExecutionContext, não é preciso que o ThreadPool tente fazer o mesmo.

DequeueAndExecuteWorkItem é onde acontece a verdadeira "mágica". Esse método primeiro gera uma ordem para a pesquisa nas filas. A ordem de pesquisa vai da fila seguinte a ser examinada até o final da lista e depois faz um movimento circular, começando pelo princípio da lista e seguindo até a fila onde a pesquisa começou. Para simplificar a implementação, o método LINQ Enumerable.Range é usado para gerar as duas listas, que depois são concatenadas com o uso do método LINQ Enumerable.Concat.

Depois de obter a ordem de pesquisa, o método sai em busca de itens de trabalho. Cada fila é examinada na ordem especificada e, assim que um item de trabalho é encontrado, ele é removido, e o ponteiro seguinte é atualizado. Então, o item de trabalho é invocado usando o método Execute, mostrado na Figura 3.

Há uma linha de código especialmente interessante aqui: a que serve para verificar se a fila da qual um item acaba de ser recuperado foi descartada e está vazia. Se o pool encontra uma fila assim, sabe que nenhum item será adicionado a ela (pois foi descartada), e portanto não é mais preciso mantê-la. Neste ponto, RemoveQueueNeedsLock é usado para remover a fila de destino da lista de filas, e possivelmente para atualizar o próximo ponteiro de fila, caso esteja agora fora de alcance.

Observe que esse método não usa um bloqueio internamente, mas acessa o estado compartilhado; por isso, nomeei o método com um sufixo "NeedsLock", para lembrar-me de que ele precisa ser chamado enquanto o bloqueio é retido. Você perceberá que os dois sites de chamada para RemoveQueueNeedLock — um no método Dispose da fila e outro no método DequeueAndExecuteWorkItem do pool — chamam esse método enquanto retêm o bloqueio de filas.

Com a implementação concluída, você já pode fazer o teste no seu código. No exemplo a seguir, criei uma única instância estática do RoundRobinThreadPool. Quando um lote de trabalho chega para ser processado, uma nova fila é criada, todo o trabalho é colocado nela, e depois a fila é descartada:

 

private static RoundRobinThreadPool _pool = 
  new RoundRobinThreadPool();
...
private void ProcessBatch(Batch b) {
  using(var queue = _pool.CreateQueue()) {
    foreach(var item in b) {
      queue.QueueUserWorkItem(() => ProcessItem(item));
    }
  }
}

Embora todo o trabalho do primeiro lote a chegar seja agendado primeiro, assim que chega o segundo lote, ele começa a obter uma parcela praticamente igual dos recursos de processamento.

Poderíamos incrementar mais essa implementação, que provavelmente seria aperfeiçoada sob o ponto de vista do desempenho. Contudo, com muito pouco código, você conseguiu criar uma abstração que tira proveito do ThreadPool do .NET e de todos os recursos que ele fornece, obtendo, ainda assim, suporte para o equilíbrio entre lotes desejado.

Envie dúvidas e comentários para Stephen no email netqa@microsoft.com.

Stephen Toub é gerente sênior de programa da equipe da plataforma de computação paralela da Microsoft. Além disso, é editor colaborador da MSDN Magazine.