Padrão de Consumidores ConcorrentesCompeting Consumers pattern

Permita que vários consumidores em simultâneo processem mensagens recebidas no mesmo canal de mensagens.Enable multiple concurrent consumers to process messages received on the same messaging channel. Um sistema pode, assim, processar várias mensagens em simultâneo para otimizar o débito, melhorar a escalabilidade e disponibilidade e balancear a carga de trabalho.This enables a system to process multiple messages concurrently to optimize throughput, to improve scalability and availability, and to balance the workload.

Contexto e problemaContext and problem

Espera-se que uma aplicação em execução na nuvem processe um grande número de pedidos.An application running in the cloud is expected to handle a large number of requests. Em vez de processar cada pedido de forma síncrona, uma técnica comum passa por colocar a aplicação a transmiti-los através de um sistema de mensagens para outro serviço (um serviço de consumidor) que os processa de forma assíncrona.Rather than process each request synchronously, a common technique is for the application to pass them through a messaging system to another service (a consumer service) that handles them asynchronously. Esta estratégia ajuda a garantir que a lógica de negócio na aplicação não é bloqueada enquanto os pedidos estão a ser processados.This strategy helps to ensure that the business logic in the application isn't blocked while the requests are being processed.

O número de pedidos pode variar significativamente ao longo do tempo por diversos motivos.The number of requests can vary significantly over time for many reasons. Um aumento repentino da atividade do utilizador ou dos pedidos agregados provenientes de vários inquilinos pode produzir uma carga de trabalho imprevisível.A sudden increase in user activity or aggregated requests coming from multiple tenants can cause an unpredictable workload. Nas horas de pico, um sistema pode ter de processar várias centenas de pedidos por segundo, enquanto noutras alturas o número pode ser muito pequeno.At peak hours a system might need to process many hundreds of requests per second, while at other times the number could be very small. Além disso, a natureza do trabalho realizado para processar estes pedidos pode ser altamente variável.Additionally, the nature of the work performed to handle these requests might be highly variable. A utilização de uma única instância do serviço de consumidor pode fazer com que esta seja inundada com pedidos ou o sistema de mensagens pode ficar sobrecarregado por uma afluência de mensagens provenientes da aplicação.Using a single instance of the consumer service can cause that instance to become flooded with requests, or the messaging system might be overloaded by an influx of messages coming from the application. Para processar esta carga de trabalho flutuante, o sistema pode executar várias instâncias do serviço de consumidor.To handle this fluctuating workload, the system can run multiple instances of the consumer service. No entanto, estes consumidores têm de estar coordenados para garantir que cada mensagem é apenas enviada para um único consumidor.However, these consumers must be coordinated to ensure that each message is only delivered to a single consumer. A carga de trabalho também tem de ser balanceada entre os consumidores para impedir que uma instância se torne num estrangulamento.The workload also needs to be load balanced across consumers to prevent an instance from becoming a bottleneck.

SoluçãoSolution

Utilize uma fila de mensagens para implementar o canal de comunicação entre a aplicação e as instâncias do serviço de consumidor.Use a message queue to implement the communication channel between the application and the instances of the consumer service. A aplicação publica pedidos sob a forma de mensagens na fila. Por sua vez, as instâncias do serviço de consumidor recebem as mensagens da fila e processam-nas.The application posts requests in the form of messages to the queue, and the consumer service instances receive messages from the queue and process them. Esta abordagem permite que o mesmo conjunto de instâncias do serviço de consumidor processe mensagens de qualquer instância da aplicação.This approach enables the same pool of consumer service instances to handle messages from any instance of the application. A figura mostra a utilização de uma fila de mensagens para distribuir o trabalho para as instâncias de um serviço.The figure illustrates using a message queue to distribute work to instances of a service.

Utilização de uma fila de mensagens para distribuir trabalho para instâncias de um serviço

Esta solução possui os benefícios seguintes:This solution has the following benefits:

  • Fornece um sistema de carga redistribuída que pode processar grandes variações no volume de pedidos enviados pelas instâncias da aplicação.It provides a load-leveled system that can handle wide variations in the volume of requests sent by application instances. A fila funciona como uma memória intermédia entre as instâncias da aplicação e as instâncias do serviço de consumidor.The queue acts as a buffer between the application instances and the consumer service instances. Este procedimento pode ajudar a minimizar o impacto sobre a disponibilidade e sobre a capacidade de resposta tanto para a aplicação como para as instâncias do serviço, conforme descrito pelo padrão de Redistribuição de Carga Baseada na Fila.This can help to minimize the impact on availability and responsiveness for both the application and the service instances, as described by the Queue-based Load Leveling pattern. O processamento de uma mensagem que precisa de um processamento de execução longa não impede que as outras mensagens sejam processadas em simultâneo por outras instâncias do serviço de consumidor.Handling a message that requires some long-running processing doesn't prevent other messages from being handled concurrently by other instances of the consumer service.

  • Melhora a fiabilidade.It improves reliability. Se um produtor comunicar diretamente com um consumidor em vez de utilizar este padrão, mas não monitorizar o consumidor, existirá uma grande probabilidade de as mensagens se perderem ou não serem processadas se o consumidor falhar.If a producer communicates directly with a consumer instead of using this pattern, but doesn't monitor the consumer, there's a high probability that messages could be lost or fail to be processed if the consumer fails. Neste padrão, as mensagens não são enviadas para uma instância de serviço específica.In this pattern, messages aren't sent to a specific service instance. Uma instância de serviço que falha não bloqueia um produtor e as mensagens podem ser processadas por qualquer instância de serviço em funcionamento.A failed service instance won't block a producer, and messages can be processed by any working service instance.

  • Não requer uma coordenação complexa entre os consumidores ou entre o produtor e as instâncias do consumidor.It doesn't require complex coordination between the consumers, or between the producer and the consumer instances. A fila de mensagens assegura que cada mensagem é entregue pelo menos uma vez.The message queue ensures that each message is delivered at least once.

  • É escalável.It's scalable. O sistema pode aumentar ou reduzir de forma dinâmica o número de instâncias do serviço de consumidor à medida que o volume de mensagens varia.The system can dynamically increase or decrease the number of instances of the consumer service as the volume of messages fluctuates.

  • Poderá melhorar a resiliência se a fila de mensagens fornecer operações de leitura transacionais.It can improve resiliency if the message queue provides transactional read operations. Se uma instância do serviço de consumidor ler e processar a mensagem como parte de uma operação transacional e se a instância do serviço de consumidor falhar, este padrão poderá garantir que a mensagem será devolvida à fila para que seja recolhida e processada por outra instância do serviço de consumidor.If a consumer service instance reads and processes the message as part of a transactional operation, and the consumer service instance fails, this pattern can ensure that the message will be returned to the queue to be picked up and handled by another instance of the consumer service.

Problemas e consideraçõesIssues and considerations

Na altura de decidir como implementar este padrão, considere os seguintes pontos:Consider the following points when deciding how to implement this pattern:

  • Ordenação de mensagens.Message ordering. A ordem pela qual as instâncias do serviço de consumidor recebem as mensagens não é garantida nem reflete necessariamente a ordem pela qual as mensagens foram criadas.The order in which consumer service instances receive messages isn't guaranteed, and doesn't necessarily reflect the order in which the messages were created. Crie o sistema para garantir que o processamento das mensagens é idempotente. Eliminará assim qualquer dependência relativamente à ordem de processamento das mensagens.Design the system to ensure that message processing is idempotent because this will help to eliminate any dependency on the order in which messages are handled. Para obter mais informações, veja Idempotency Patterns (Padrões de Idempotência) no blogue de Jonathan Oliver.For more information, see Idempotency Patterns on Jonathon Oliver’s blog.

    As Filas do Microsoft Azure Service Bus podem implementar uma ordenação de mensagens first-in-first-out garantida com sessões de mensagens.Microsoft Azure Service Bus Queues can implement guaranteed first-in-first-out ordering of messages by using message sessions. Para obter mais informações, veja Messaging Patterns Using Sessions (Padrões de Mensagens com Sessões).For more information, see Messaging Patterns Using Sessions.

  • Criar serviços para fins de resiliência.Designing services for resiliency. Se o sistema tiver sido concebido para detetar e reiniciar instâncias de serviço que falham, poderá ser necessário implementar o processamento realizado pelas instâncias do serviço como operações idempotentes para minimizar os efeitos de uma única mensagem a ser obtida e processada mais do que uma vez.If the system is designed to detect and restart failed service instances, it might be necessary to implement the processing performed by the service instances as idempotent operations to minimize the effects of a single message being retrieved and processed more than once.

  • Detetar mensagens não processáveis.Detecting poison messages. Uma mensagem incorretamente formada ou uma tarefa que requer acesso a recursos indisponíveis pode fazer com que uma instância de serviço falhe.A malformed message, or a task that requires access to resources that aren't available, can cause a service instance to fail. O sistema deve evitar que estas mensagens sejam devolvidas à fila. Em vez disso, deve capturar e armazenar os detalhes destas mensagens noutro local para que possam ser analisadas, se necessário.The system should prevent such messages being returned to the queue, and instead capture and store the details of these messages elsewhere so that they can be analyzed if necessary.

  • Processar resultados.Handling results. A instância de serviço a processar uma mensagem está totalmente desacoplada da lógica aplicacional que gera a mensagem e pode não ter a capacidade de comunicar diretamente.The service instance handling a message is fully decoupled from the application logic that generates the message, and they might not be able to communicate directly. Se a instância de serviço gerar resultados que têm de ser transmitidos para a lógica aplicacional, estas informações terão de ser armazenadas numa localização que esteja acessível para ambos.If the service instance generates results that must be passed back to the application logic, this information must be stored in a location that's accessible to both. Para evitar que a lógica aplicacional obtenha dados incompletos, o sistema tem de indicar quando é que o processamento é concluído.In order to prevent the application logic from retrieving incomplete data the system must indicate when processing is complete.

    Se estiver a utilizar o Azure, um processo de trabalho poderá transmitir os resultados de volta para a lógica aplicacional com uma fila de respostas de mensagens dedicada.If you're using Azure, a worker process can pass results back to the application logic by using a dedicated message reply queue. A lógica aplicacional tem de conseguir correlacionar estes resultados com a mensagem original.The application logic must be able to correlate these results with the original message. Este cenário é descrito com maior detalhe no Asynchronous Messaging Primer (Manual Básico de Mensagens Assíncronas).This scenario is described in more detail in the Asynchronous Messaging Primer.

  • Dimensionar o sistema de mensagens.Scaling the messaging system. Numa solução em grande escala, uma fila de mensagens única pode ficar sobrecarregada pelo número de mensagens e tornar-se num estrangulamento no sistema.In a large-scale solution, a single message queue could be overwhelmed by the number of messages and become a bottleneck in the system. Nesta situação, considere a criação de partições do sistema de mensagens para enviar mensagens de produtores específicos a uma fila específica ou utilize o balanceamento de carga para distribuir as mensagens por várias filas de mensagens.In this situation, consider partitioning the messaging system to send messages from specific producers to a particular queue, or use load balancing to distribute messages across multiple message queues.

  • Garantir a fiabilidade do sistema de mensagens.Ensuring reliability of the messaging system. É necessário um sistema de mensagens fiável para assegurar que, após a aplicação colocar uma mensagem em fila, esta não é perdida.A reliable messaging system is needed to guarantee that after the application enqueues a message it won't be lost. Esta é uma condição é essencial para garantir que todas as mensagens são fornecidas pelo menos uma vez.This is essential for ensuring that all messages are delivered at least once.

Quando utilizar este padrãoWhen to use this pattern

Utilize este padrão quando:Use this pattern when:

  • A carga de trabalho de uma aplicação está dividida em tarefas que podem ser executadas de forma assíncrona.The workload for an application is divided into tasks that can run asynchronously.
  • As tarefas são independentes e podem ser executadas em paralelo.Tasks are independent and can run in parallel.
  • O volume de trabalho é altamente variável e requer uma solução dimensionável.The volume of work is highly variable, requiring a scalable solution.
  • A solução tem de fornecer uma elevada disponibilidade e tem de ser resiliente caso ocorra uma falha no processamento de uma tarefa.The solution must provide high availability, and must be resilient if the processing for a task fails.

Este padrão poderá não ser prático quando:This pattern might not be useful when:

  • Não é fácil separar a carga de trabalho da aplicação em tarefas discretas ou há um elevado grau de dependência entre as tarefas.It's not easy to separate the application workload into discrete tasks, or there's a high degree of dependence between tasks.
  • As tarefas devem ser executadas de forma síncrona e a lógica aplicacional tem de aguardar pela conclusão de uma tarefa antes de continuar.Tasks must be performed synchronously, and the application logic must wait for a task to complete before continuing.
  • As tarefas têm de ser executadas numa sequência específica.Tasks must be performed in a specific sequence.

Alguns sistemas de mensagens suportam sessões que permitem a um produtor agrupar mensagens e asseguram que estas são processadas pelo mesmo consumidor.Some messaging systems support sessions that enable a producer to group messages together and ensure that they're all handled by the same consumer. Este mecanismo pode ser utilizado com mensagens prioritárias (se suportadas) para implementar uma forma de ordenação de mensagens em sequência de um produtor para um único consumidor.This mechanism can be used with prioritized messages (if they are supported) to implement a form of message ordering that delivers messages in sequence from a producer to a single consumer.

ExemploExample

O Azure fornece filas de armazenamento e filas do Service Bus que podem funcionar como um mecanismo de implementação deste padrão.Azure provides storage queues and Service Bus queues that can act as a mechanism for implementing this pattern. A lógica aplicacional pode publicar mensagens numa fila e os consumidores implementados como tarefas numa ou mais funções podem obter mensagens desta fila e processá-las.The application logic can post messages to a queue, and consumers implemented as tasks in one or more roles can retrieve messages from this queue and process them. No que concerne à resiliência, uma fila do Service Bus permite ao consumidor utilizar o modo PeekLock quando obtém uma mensagem da fila.For resiliency, a Service Bus queue enables a consumer to use PeekLock mode when it retrieves a message from the queue. Na verdade, este modo não remove a mensagem, apenas a oculta dos outros consumidores.This mode doesn't actually remove the message, but simply hides it from other consumers. O consumidor original poderá eliminar a mensagem quando o seu processamento for concluído.The original consumer can delete the message when it's finished processing it. Se o consumidor falhar, o bloqueio de pré-visualização excederá o limite de tempo e a mensagem ficará novamente visível e acessível a outros consumidores.If the consumer fails, the peek lock will time out and the message will become visible again, allowing another consumer to retrieve it.

Para obter informações detalhadas sobre a utilização das filas do Azure Service Bus, veja Filas, tópicos e subscrições do Service Bus.For detailed information on using Azure Service Bus queues, see Service Bus queues, topics, and subscriptions.

Para obter mais informações sobre a utilização das filas de armazenamento do Azure, veja Introdução ao Armazenamento de filas do Azure através do .NET.For information on using Azure storage queues, see Get started with Azure Queue storage using .NET.

O seguinte código da classe QueueManager na solução CompetingConsumers disponível no GitHub mostra como pode criar uma fila com uma instância QueueClient no processador de eventos Start numa função Web ou de trabalho.The following code from the QueueManager class in CompetingConsumers solution available on GitHub shows how you can create a queue by using a QueueClient instance in the Start event handler in a web or worker role.

private string queueName = ...;
private string connectionString = ...;
...

public async Task Start()
{
  // Check if the queue already exists.
  var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
  if (!manager.QueueExists(this.queueName))
  {
    var queueDescription = new QueueDescription(this.queueName);

    // Set the maximum delivery count for messages in the queue. A message
    // is automatically dead-lettered after this number of deliveries. The
    // default value for dead letter count is 10.
    queueDescription.MaxDeliveryCount = 3;

    await manager.CreateQueueAsync(queueDescription);
  }
  ...

  // Create the queue client. By default the PeekLock method is used.
  this.client = QueueClient.CreateFromConnectionString(
    this.connectionString, this.queueName);
}

O fragmento de código seguinte mostra como uma aplicação pode criar e enviar um lote de mensagens para a fila.The next code snippet shows how an application can create and send a batch of messages to the queue.

public async Task SendMessagesAsync()
{
  // Simulate sending a batch of messages to the queue.
  var messages = new List<BrokeredMessage>();

  for (int i = 0; i < 10; i++)
  {
    var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
    messages.Add(message);
  }
  await this.client.SendBatchAsync(messages);
}

O código seguinte mostra como uma instância de serviço de consumidor pode receber mensagens da fila ao seguir uma abordagem condicionada por eventos.The following code shows how a consumer service instance can receive messages from the queue by following an event-driven approach. O parâmetro processMessageTask para o método ReceiveMessages é um delegado que referencia o código a ser executado quando é recebida uma mensagem.The processMessageTask parameter to the ReceiveMessages method is a delegate that references the code to run when a message is received. Este código é executado de forma assíncrona.This code is run asynchronously.

private ManualResetEvent pauseProcessingEvent;
...

public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
  // Set up the options for the message pump.
  var options = new OnMessageOptions();

  // When AutoComplete is disabled it's necessary to manually
  // complete or abandon the messages and handle any errors.
  options.AutoComplete = false;
  options.MaxConcurrentCalls = 10;
  options.ExceptionReceived += this.OptionsOnExceptionReceived;

  // Use of the Service Bus OnMessage message pump.
  // The OnMessage method must be called once, otherwise an exception will occur.
  this.client.OnMessageAsync(
    async (msg) =>
    {
      // Will block the current thread if Stop is called.
      this.pauseProcessingEvent.WaitOne();

      // Execute processing task here.
      await processMessageTask(msg);
    },
    options);
}
...

private void OptionsOnExceptionReceived(object sender,
  ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
  ...
}

Tenha em atenção que as funcionalidades de dimensionamento automático, como as disponíveis no Azure, podem ser utilizadas para iniciar e parar as instâncias de funções à medida que o comprimento da fila varia.Note that autoscaling features, such as those available in Azure, can be used to start and stop role instances as the queue length fluctuates. Para obter mais informações, veja Orientações de Dimensionamento Automático.For more information, see Autoscaling Guidance. Além disso, não é necessário manter uma correspondência de um para um entre as instâncias de funções e os processos de trabalho (uma instância de função única pode implementar vários processos de trabalho).Also, it's not necessary to maintain a one-to-one correspondence between role instances and worker processes—a single role instance can implement multiple worker processes. Para obter mais informações, veja Padrão de Consolidação de Recursos de Computação.For more information, see Compute Resource Consolidation pattern.

Os padrões e as orientações que se seguem podem ser relevantes ao implementar este padrão:The following patterns and guidance might be relevant when implementing this pattern:

  • Asynchronous Messaging Primer (Manual Básico de Mensagens Assíncronas).Asynchronous Messaging Primer. As Filas de mensagens são um mecanismo de comunicação assíncrono.Message queues are an asynchronous communications mechanism. Se um serviço de consumidor precisar de enviar uma resposta para uma aplicação, poderá ser necessário implementar alguma forma de mensagem de resposta.If a consumer service needs to send a reply to an application, it might be necessary to implement some form of response messaging. O Manual Básico de Mensagens Assíncronas fornece informações sobre como implementar mensagens de pedido/resposta com filas de mensagens.The Asynchronous Messaging Primer provides information on how to implement request/reply messaging using message queues.

  • Orientações de Dimensionamento Automático.Autoscaling Guidance. Pode ser possível iniciar e parar instâncias de um serviço de consumidor, uma vez que o comprimento da fila na qual as aplicações publicam mensagens varia.It might be possible to start and stop instances of a consumer service since the length of the queue applications post messages on varies. O dimensionamento automático pode ajudar a manter o débito durante os períodos de pico de processamento.Autoscaling can help to maintain throughput during times of peak processing.

  • Padrão de Consolidação de Recursos de Computação.Compute Resource Consolidation pattern. Pode ser possível consolidar várias instâncias de um serviço de consumidor num único processo para reduzir os gastos e os custos de gestão.It might be possible to consolidate multiple instances of a consumer service into a single process to reduce costs and management overhead. O padrão de Consolidação de Recursos de Computação descreve os benefícios e os compromissos desta abordagem.The Compute Resource Consolidation pattern describes the benefits and tradeoffs of following this approach.

  • Padrão de Nivelamento de Carga Baseado na Fila.Queue-based Load Leveling pattern. A introdução de uma fila de mensagens pode adicionar resiliência ao sistema, o que permite que as instâncias de serviço processem diversos e variados volumes de pedidos de instâncias da aplicação.Introducing a message queue can add resiliency to the system, enabling service instances to handle widely varying volumes of requests from application instances. A fila de mensagens funciona como uma memória intermédia que redistribui a carga.The message queue acts as a buffer, which levels the load. O padrão de Redistribuição de Carga Baseada na Fila descreve este cenário mais detalhadamente.The Queue-based Load Leveling pattern describes this scenario in more detail.

  • Este padrão tem uma aplicação de exemplo associada.This pattern has a sample application associated with it.