Padrão de consumidores concorrentesCompeting Consumers pattern

Habilite vários consumidores simultâneos para processar as mensagens recebidas no mesmo canal de mensagens.Enable multiple concurrent consumers to process messages received on the same messaging channel. Isso permite que um sistema processe várias mensagens simultaneamente para otimizar a taxa de transferência, melhorar a escalabilidade e a disponibilidade e equilibrar a carga de trabalho.This enables a system to process multiple messages concurrently to optimize throughput, to improve scalability and availability, and to balance the workload.

Contexto e problemaContext and problem

É esperado que um aplicativo em execução na nuvem lide com um grande número de solicitações.An application running in the cloud is expected to handle a large number of requests. Em vez de processar cada solicitação de forma síncrona, uma técnica comum é o aplicativo passá-las por meio de um sistema de mensagens para outro serviço (um serviço consumidor) que as trata de forma assíncrona.Rather than process each request synchronously, a common technique is for the application to pass them through a messaging system to another service (a consumer service) that handles them asynchronously. Essa estratégia ajuda a garantir que a lógica de negócios do aplicativo não seja bloqueada enquanto as solicitações estão sendo processadas.This strategy helps to ensure that the business logic in the application isn't blocked while the requests are being processed.

O número de solicitações pode variar significativamente com o passar do tempo por vários motivos.The number of requests can vary significantly over time for many reasons. Um aumento repentino de atividade do usuário ou solicitações agregadas provenientes de vários locatários pode causar uma carga de trabalho imprevisível.A sudden increase in user activity or aggregated requests coming from multiple tenants can cause an unpredictable workload. Nos horário de pico de um sistema, pode ser necessário processar centenas de solicitações por segundo, enquanto em outros momentos o número pode ser muito pequeno.At peak hours a system might need to process many hundreds of requests per second, while at other times the number could be very small. Além disso, a natureza do trabalho executado para lidar com essas solicitações pode ser altamente variável.Additionally, the nature of the work performed to handle these requests might be highly variable. Usar uma única instância do serviço consumidor pode fazer com que ela seja inundada por solicitações ou o sistema de mensagens pode ficar sobrecarregado por um fluxo de mensagens recebidas do aplicativo.Using a single instance of the consumer service can cause that instance to become flooded with requests, or the messaging system might be overloaded by an influx of messages coming from the application. Para lidar com essa carga de trabalho flutuante, o sistema pode executar várias instâncias do serviço do consumidor.To handle this fluctuating workload, the system can run multiple instances of the consumer service. No entanto, esses consumidores devem ser coordenados para garantir que cada mensagem seja entregue somente para um único consumidor.However, these consumers must be coordinated to ensure that each message is only delivered to a single consumer. A carga de trabalho também precisa ser balanceada entre os consumidores para impedir que uma instância se torne um gargalo.The workload also needs to be load balanced across consumers to prevent an instance from becoming a bottleneck.

SoluçãoSolution

Use uma fila de mensagens para implementar o canal de comunicação entre o aplicativo e as instâncias do serviço do consumidor.Use a message queue to implement the communication channel between the application and the instances of the consumer service. O aplicativo posta solicitações na forma de mensagens na fila e as instâncias de serviço do consumidor recebem mensagens da fila e as processa.The application posts requests in the form of messages to the queue, and the consumer service instances receive messages from the queue and process them. Essa abordagem permite que o mesmo pool de instâncias de serviço do consumidor lide com mensagens de qualquer instância do aplicativo.This approach enables the same pool of consumer service instances to handle messages from any instance of the application. A figura ilustra o uso de uma fila de mensagens para distribuir o trabalho para instâncias de um serviço.The figure illustrates using a message queue to distribute work to instances of a service.

Usar uma fila de mensagens para distribuir o trabalho para instâncias de um serviço

Essa solução oferece as seguintes vantagens:This solution has the following benefits:

  • Ele fornece um sistema para redistribuir a carga que pode lidar com grandes variações no volume de solicitações enviadas por instâncias de aplicativo.It provides a load-leveled system that can handle wide variations in the volume of requests sent by application instances. A fila atua como um buffer entre as instâncias do aplicativo e as instâncias de serviço do consumidor.The queue acts as a buffer between the application instances and the consumer service instances. Isso pode ajudar a minimizar o impacto sobre a disponibilidade e a capacidade de resposta tanto do aplicativo quanto das instâncias de serviço, conforme descrito pelo Padrão de nivelamento de carga baseado em fila.This can help to minimize the impact on availability and responsiveness for both the application and the service instances, as described by the Queue-based Load Leveling pattern. Lidar com uma mensagem que requer que processamento de longa execução não impede que outras mensagens sejam tratadas simultaneamente por outras instâncias do serviço do consumidor.Handling a message that requires some long-running processing doesn't prevent other messages from being handled concurrently by other instances of the consumer service.

  • Isso aumenta a confiabilidade.It improves reliability. Se um produtor se comunica diretamente com um consumidor em vez de usar esse padrão, mas não monitora o consumidor, há uma grande probabilidade de que as mensagens poderiam ser perdidas ou não conseguirem ser processadas se o consumidor falhar.If a producer communicates directly with a consumer instead of using this pattern, but doesn't monitor the consumer, there's a high probability that messages could be lost or fail to be processed if the consumer fails. Nesse padrão, as mensagens não são enviadas para uma instância de serviço específica.In this pattern, messages aren't sent to a specific service instance. Uma instância de serviço com falha não bloqueará um produtor e as mensagens podem ser processadas por qualquer instância de serviço do trabalho.A failed service instance won't block a producer, and messages can be processed by any working service instance.

  • Ela não requer coordenação complexa entre os consumidores ou entre as instâncias do produtor e do consumidor.It doesn't require complex coordination between the consumers, or between the producer and the consumer instances. A fila de mensagens garante que cada mensagem seja entregue pelo menos uma vez.The message queue ensures that each message is delivered at least once.

  • É escalonável.It's scalable. O sistema pode aumentar ou diminuir o número dinamicamente o número de instâncias do serviço de consumidor conforme o volume de mensagens flutua.The system can dynamically increase or decrease the number of instances of the consumer service as the volume of messages fluctuates.

  • Isso poderá melhorar a resiliência se a fila de mensagens fornecer operações de leitura transacionais.It can improve resiliency if the message queue provides transactional read operations. Se uma instância de serviço do consumidor lê e processa a mensagem como parte de uma operação transacional e a instância de serviço do consumidor falhar, esse padrão pode garantir que a mensagem seja retornada para a fila para ser coletada e tratada por outra instância e serviço do consumidor.If a consumer service instance reads and processes the message as part of a transactional operation, and the consumer service instance fails, this pattern can ensure that the message will be returned to the queue to be picked up and handled by another instance of the consumer service.

Problemas e consideraçõesIssues and considerations

Considere os seguintes pontos ao decidir como implementar esse padrão:Consider the following points when deciding how to implement this pattern:

  • Encomenda de mensagens.Message ordering. A ordem na qual as instâncias de serviço de consumidor recebem as mensagens não é garantida e não reflete necessariamente a ordem na qual as mensagens foram criadas.The order in which consumer service instances receive messages isn't guaranteed, and doesn't necessarily reflect the order in which the messages were created. Projete o sistema para garantir que o processamento de mensagens seja idempotente, pois isso ajudará a eliminar qualquer dependência em relação à ordem na qual as mensagens são manipuladas.Design the system to ensure that message processing is idempotent because this will help to eliminate any dependency on the order in which messages are handled. Para obter mais informações, consulte Padrões de idempotência no blog de Jonathon Oliver.For more information, see Idempotency Patterns on Jonathon Oliver’s blog.

    As Filas do Barramento de Serviço do Microsoft Azure podem implementar ordenação primeiro a entrar, primeiro a sair garantida de mensagens usando sessões de mensagens.Microsoft Azure Service Bus Queues can implement guaranteed first-in-first-out ordering of messages by using message sessions. Para obter mais informações, consulte Padrões de mensagens usando sessões.For more information, see Messaging Patterns Using Sessions.

  • Projetar serviços para resiliência.Designing services for resiliency. Se o sistema foi projetado para detectar e reiniciar as instâncias de serviço com falha, pode ser necessário implementar o processamento executado pelas instâncias do serviço como operações idempotentes para minimizar os efeitos de uma única mensagem sendo recuperada e processada mais de uma vez.If the system is designed to detect and restart failed service instances, it might be necessary to implement the processing performed by the service instances as idempotent operations to minimize the effects of a single message being retrieved and processed more than once.

  • Detecção de mensagens suspeitas.Detecting poison messages. Uma mensagem malformada ou uma tarefa que exige acesso a recursos que não estão disponíveis, pode causar uma falha em uma instância de serviço.A malformed message, or a task that requires access to resources that aren't available, can cause a service instance to fail. O sistema deve impedir que essas mensagens sejam retornadas para a fila, capturando-as em vez disso e armazenando os detalhes dessas mensagens em outro local para que possam ser analisados, se necessário.The system should prevent such messages being returned to the queue, and instead capture and store the details of these messages elsewhere so that they can be analyzed if necessary.

  • Lidar com resultados.Handling results. A instância de serviço que lidar com uma mensagem é totalmente separada da lógica do aplicativo que gera a mensagem e pode não ser capaz de se comunicar diretamente.The service instance handling a message is fully decoupled from the application logic that generates the message, and they might not be able to communicate directly. Se a instância de serviço gerar resultados que devem ser passados para a lógica do aplicativo, essas informações devem ser armazenadas em um local que seja acessível para ambos.If the service instance generates results that must be passed back to the application logic, this information must be stored in a location that's accessible to both. Para impedir que a lógica do aplicativo recupere dados incompletos, o sistema deve indicar quando o processamento é concluído.In order to prevent the application logic from retrieving incomplete data the system must indicate when processing is complete.

    Se você estiver usando o Azure, um processo de trabalho pode retornar resultados para a lógica do aplicativo por meio de uma fila de resposta de mensagem dedicada.If you're using Azure, a worker process can pass results back to the application logic by using a dedicated message reply queue. A lógica do aplicativo deve ser capaz de correlacionar esses resultados com a mensagem original.The application logic must be able to correlate these results with the original message. Esse cenário é descrito em mais detalhes no Primer de mensagens assíncronas.This scenario is described in more detail in the Asynchronous Messaging Primer.

  • Dimensionando o sistema de mensagens.Scaling the messaging system. Em uma solução de grande escala, uma única fila de mensagens poderia ficar sobrecarregada com o número de mensagens e se tornar um gargalo no sistema.In a large-scale solution, a single message queue could be overwhelmed by the number of messages and become a bottleneck in the system. Nessa situação, considere particionar o sistema de mensagens para enviar as mensagens de produtores específicos para determinada fila ou usar o balanceamento de carga para distribuir mensagens em várias filas de mensagens.In this situation, consider partitioning the messaging system to send messages from specific producers to a particular queue, or use load balancing to distribute messages across multiple message queues.

  • Assegurar a confiabilidade do sistema de mensagens.Ensuring reliability of the messaging system. Um sistema de mensagens confiável é necessário para garantir uma mensagem não seja perdida após o aplicativo enfileirá-la.A reliable messaging system is needed to guarantee that after the application enqueues a message it won't be lost. Isso é essencial para assegurar que todas as mensagens sejam entregues pelo menos uma vez.This is essential for ensuring that all messages are delivered at least once.

Quando usar esse padrãoWhen to use this pattern

Use esse padrão quando:Use this pattern when:

  • A carga de trabalho de um aplicativo é dividida em tarefas que podem ser executadas de forma assíncrona.The workload for an application is divided into tasks that can run asynchronously.
  • As tarefas são independentes e podem ser executados em paralelo.Tasks are independent and can run in parallel.
  • O volume de trabalho é altamente variável, o que exige uma solução escalonável.The volume of work is highly variable, requiring a scalable solution.
  • A solução deve fornecer alta disponibilidade e ser resiliente se o processamento de uma tarefa falhar.The solution must provide high availability, and must be resilient if the processing for a task fails.

Esse padrão pode não ser útil quando:This pattern might not be useful when:

  • Não for fácil separar a carga de trabalho do aplicativo em tarefas discretas ou se há um alto grau de dependência entre as tarefas.It's not easy to separate the application workload into discrete tasks, or there's a high degree of dependence between tasks.
  • As tarefas devem ser executadas de forma síncrona e a lógica do aplicativo deve esperar que uma tarefa seja concluída antes de continuar.Tasks must be performed synchronously, and the application logic must wait for a task to complete before continuing.
  • As tarefas devem ser executadas em uma sequência específica.Tasks must be performed in a specific sequence.

Alguns sistemas de mensagens dão suporte a sessões que permitem que um produtor agrupe as mensagens e verifique se elas são manipuladas pelo mesmo consumidor.Some messaging systems support sessions that enable a producer to group messages together and ensure that they're all handled by the same consumer. Esse mecanismo pode ser usado com mensagens priorizadas (se houver suporte para elas) para implementar uma forma de ordenação de mensagens que entrega as mensagens na sequência de um produtor para um único consumidor.This mechanism can be used with prioritized messages (if they are supported) to implement a form of message ordering that delivers messages in sequence from a producer to a single consumer.

ExemploExample

O Azure fornece filas de armazenamento e filas do Barramento de Serviço que podem atuar como um mecanismo para implementar esse padrão.Azure provides storage queues and Service Bus queues that can act as a mechanism for implementing this pattern. A lógica do aplicativo pode postar mensagens em uma fila e os consumidores implementados como tarefas em uma ou mais funções podem recuperá-las dessa fila e processá-las.The application logic can post messages to a queue, and consumers implemented as tasks in one or more roles can retrieve messages from this queue and process them. Para garantir a resiliência, uma fila do Barramento de Serviço permite que um consumidor use o modo PeekLock quando recuperar uma mensagem da fila.For resiliency, a Service Bus queue enables a consumer to use PeekLock mode when it retrieves a message from the queue. Na verdade, esse modo não remove a mensagem, mas simplesmente a oculta de outros consumidores.This mode doesn't actually remove the message, but simply hides it from other consumers. O consumidor original poderá excluí-la quando tiver terminado de processá-la.The original consumer can delete the message when it's finished processing it. Se o consumidor falhar, o bloqueio de inspeção atingirá o tempo limite e a mensagem se tornará visível novamente, permitindo que outro consumidor possa recuperá-la.If the consumer fails, the peek lock will time out and the message will become visible again, allowing another consumer to retrieve it.

Para ver informações detalhadas sobre como usar filas do Barramento de Serviço do Azure, consulte Filas, tópicos e assinaturas do Barramento de Serviço.For detailed information on using Azure Service Bus queues, see Service Bus queues, topics, and subscriptions.

Para obter mais informações sobre como usar filas do armazenamento do Azure, consulte Introdução ao Armazenamento de Filas do Azure usando o .NET.For information on using Azure storage queues, see Get started with Azure Queue storage using .NET.

O código a seguir da classe QueueManager na solução CompetingConsumers disponível no GitHub mostra como criar uma fila usando uma instância QueueClient no manipulador de eventos Start em uma função Web ou de trabalho.The following code from the QueueManager class in CompetingConsumers solution available on GitHub shows how you can create a queue by using a QueueClient instance in the Start event handler in a web or worker role.

private string queueName = ...;
private string connectionString = ...;
...

public async Task Start()
{
  // Check if the queue already exists.
  var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
  if (!manager.QueueExists(this.queueName))
  {
    var queueDescription = new QueueDescription(this.queueName);

    // Set the maximum delivery count for messages in the queue. A message
    // is automatically dead-lettered after this number of deliveries. The
    // default value for dead letter count is 10.
    queueDescription.MaxDeliveryCount = 3;

    await manager.CreateQueueAsync(queueDescription);
  }
  ...

  // Create the queue client. By default the PeekLock method is used.
  this.client = QueueClient.CreateFromConnectionString(
    this.connectionString, this.queueName);
}

O seguinte snippet de código mostra como um aplicativo pode criar e enviar um lote de mensagens para a fila.The next code snippet shows how an application can create and send a batch of messages to the queue.

public async Task SendMessagesAsync()
{
  // Simulate sending a batch of messages to the queue.
  var messages = new List<BrokeredMessage>();

  for (int i = 0; i < 10; i++)
  {
    var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
    messages.Add(message);
  }
  await this.client.SendBatchAsync(messages);
}

O código a seguir mostra como uma instância de serviço do consumidor pode receber mensagens da fila, seguindo uma abordagem controlada por evento.The following code shows how a consumer service instance can receive messages from the queue by following an event-driven approach. O parâmetro processMessageTask para o método ReceiveMessages é um delegado que referencia o código a ser executado quando uma mensagem é recebida.The processMessageTask parameter to the ReceiveMessages method is a delegate that references the code to run when a message is received. Esse código é executado de forma assíncrona.This code is run asynchronously.

private ManualResetEvent pauseProcessingEvent;
...

public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
  // Set up the options for the message pump.
  var options = new OnMessageOptions();

  // When AutoComplete is disabled it's necessary to manually
  // complete or abandon the messages and handle any errors.
  options.AutoComplete = false;
  options.MaxConcurrentCalls = 10;
  options.ExceptionReceived += this.OptionsOnExceptionReceived;

  // Use of the Service Bus OnMessage message pump.
  // The OnMessage method must be called once, otherwise an exception will occur.
  this.client.OnMessageAsync(
    async (msg) =>
    {
      // Will block the current thread if Stop is called.
      this.pauseProcessingEvent.WaitOne();

      // Execute processing task here.
      await processMessageTask(msg);
    },
    options);
}
...

private void OptionsOnExceptionReceived(object sender,
  ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
  ...
}

Observe que os recursos de dimensionamento automático, como aqueles disponíveis no Azure, podem ser usados para iniciar e parar instâncias de função, à medida que o tamanho da fila varia.Note that autoscaling features, such as those available in Azure, can be used to start and stop role instances as the queue length fluctuates. Para obter mais informações, consulte Autoscaling Guidance.For more information, see Autoscaling Guidance. Além disso, não é necessário manter uma correspondência um para um entre as instâncias de função e os processos de trabalho — uma única instância de função pode implementar vários processos de trabalho.Also, it's not necessary to maintain a one-to-one correspondence between role instances and worker processes—a single role instance can implement multiple worker processes. Para obter mais informações, consulte o padrão de consolidação de recursos de computação.For more information, see Compute Resource Consolidation pattern.

Os padrões e diretrizes a seguir também podem ser relevantes ao implementar esse padrão:The following patterns and guidance might be relevant when implementing this pattern:

  • Prévia de mensagens assíncronas.Asynchronous Messaging Primer. Filas de mensagens são um mecanismo de comunicação assíncrona.Message queues are an asynchronous communications mechanism. Se um serviço do consumidor precisa enviar uma resposta para um aplicativo, pode ser necessário implementar alguma forma de mensagens de resposta.If a consumer service needs to send a reply to an application, it might be necessary to implement some form of response messaging. O Primer de mensagens assíncronas fornece informações sobre como implementar mensagens de solicitação/resposta usando filas de mensagens.The Asynchronous Messaging Primer provides information on how to implement request/reply messaging using message queues.

  • Orientação de autodimensionamento.Autoscaling Guidance. Pode ser possível iniciar e parar instâncias de um serviço do consumidor, visto que o tamanho da fila na qual os aplicativos postam as mensagens varia.It might be possible to start and stop instances of a consumer service since the length of the queue applications post messages on varies. O dimensionamento automático pode ajudar a manter a taxa de transferência durante horários de pico de processamento.Autoscaling can help to maintain throughput during times of peak processing.

  • Padrão de consolidação de recursos de computação.Compute Resource Consolidation pattern. Pode ser possível consolidar várias instâncias de um serviço do consumidor em um único processo para reduzir os custos e a sobrecarga de gerenciamento.It might be possible to consolidate multiple instances of a consumer service into a single process to reduce costs and management overhead. O Padrão de consolidação de recursos de computação descreve os benefícios e as vantagens e desvantagens de seguir essa abordagem.The Compute Resource Consolidation pattern describes the benefits and tradeoffs of following this approach.

  • Padrão de nivelamento de carga baseado em fila.Queue-based Load Leveling pattern. Apresentar uma fila de mensagens pode agregar resiliência ao sistema, permitindo que as instâncias de serviço lidem com volumes variáveis de solicitações de instâncias do aplicativo.Introducing a message queue can add resiliency to the system, enabling service instances to handle widely varying volumes of requests from application instances. A fila de mensagens atua como um buffer que nivela a carga.The message queue acts as a buffer, which levels the load. O Padrão de nivelamento de carga com base em fila descreve esse cenário mais detalhadamente.The Queue-based Load Leveling pattern describes this scenario in more detail.

  • Esse padrão tem um aplicativo de exemplo associado a ele.This pattern has a sample application associated with it.