Patroon Concurrerende consumenten

Meerdere gelijktijdige consumenten inschakelen voor het verwerken van berichten die op hetzelfde kanaal worden ontvangen. Hiermee kan een systeem meerdere berichten tegelijkertijd verwerken en zo de doorvoer optimaliseren, schaalbaarheid en beschikbaarheid te verbeteren, en de werkbelasting in balans te houden.

Context en probleem

Een toepassing die wordt uitgevoerd in de cloud zal naar verwachting een groot aantal aanvragen verwerken. Een veelgebruikte techniek is dat de toepassing aanvragen doorvoert via een berichtensysteem (een consumentenservice) dat ze asynchroon verwerkt, in plaats van elke aanvraag synchroon te verwerken. Met deze strategie kunt u ervoor zorgen dat de bedrijfslogica in de toepassing niet wordt geblokkeerd terwijl de aanvragen worden verwerkt.

Het aantal aanvragen kan na enige tijd om verschillende redenen aanzienlijk veranderd zijn. Een plotselinge toename van gebruikersactiviteit of geaggregeerde aanvragen die afkomstig zijn van meerdere tenants kan leiden tot een onvoorspelbare werkbelasting. Tijdens piekuren moet een systeem mogelijk vele honderden aanvragen per seconde verwerken, terwijl op andere momenten het aantal juist erg klein kan zijn. Bovendien kan de aard van de werkzaamheden voor het afhandelen van deze aanvragen erg variëren. Het gebruik van één exemplaar van de consumentenservice kan ertoe leiden dat het desbetreffende exemplaar wordt overspoeld met aanvragen of dat het berichtensysteem overbelast raakt door een grote stroom aan berichten van de toepassing. Het systeem kan meerdere exemplaren van de consumentenservice uitvoeren om deze fluctuerende werkbelasting te verwerken. Deze consumenten moeten echter worden gecoördineerd om ervoor te zorgen dat elk bericht alleen aan een enkele consument wordt geleverd. Ook moet de werkbelasting gelijkmatig verdeeld zijn over consumenten om te voorkomen dat een exemplaar een knelpunt wordt.

Oplossing

Gebruik een berichtenwachtrij voor de implementatie van het communicatiekanaal tussen de toepassing en de exemplaren van de consumentenservice. De toepassing plaatst aanvragen in de wachtrij in de vorm van berichten en de exemplaren van de consumentenservice ontvangen berichten uit de wachtrij en verwerken deze vervolgens. Met deze aanpak kan dezelfde groep exemplaren van de consumentenservice berichten van een willekeurig exemplaar van de toepassing verwerken. In de afbeelding ziet u het gebruik van een berichtenwachtrij om werk naar exemplaren van een service te distribueren.

Een berichtenwachtrij gebruiken om werk naar exemplaren van een service te distribueren

Dit patroon biedt de volgende voordelen:

  • Het biedt een systeem met een gebalanceerde belasting dat grote veranderingen in de hoeveelheid aanvragen die worden verzonden door exemplaren van een toepassing kan verwerken. De wachtrij fungeert als buffer tussen de exemplaren van de toepassing en de exemplaren van de consumentenservice. Zo kunt u de gevolgen voor de beschikbaarheid en reactiesnelheid voor de exemplaren van de service en de toepassing minimaliseren, zoals wordt beschreven door het patroon Load leveling op basis van een wachtrij. Als er voor de verwerking van een bericht lange tijd nodig is, betekent dat niet dat er in die periode geen andere berichten worden verwerkt door andere exemplaren van de consumentenservice.

  • Dit verbetert de betrouwbaarheid. Als een producent rechtstreeks met een consument communiceert in plaats van dit patroon te gebruiken, maar de consument niet wordt bewaakt, is het zeer waarschijnlijk dat er berichten verloren gaan of niet kunnen worden verwerkt als de consument mislukt. Berichten worden in dit patroon niet verzonden naar een specifiek service-exemplaar. Een producent wordt niet geblokkeerd door een mislukt service-exemplaar en berichten kunnen worden verwerkt door elk willekeurig service-exemplaar.

  • Er is geen complexe coördinatie vereist tussen de consumenten of tussen de producent en de exemplaren van de consument. De berichtenwachtrij zorgt ervoor dat elk bericht ten minste één keer wordt geleverd.

  • Het is schaalbaar. Het systeem kan het aantal exemplaren van de consumentenservice dynamisch verhogen of verlagen wanneer de hoeveelheid berichten fluctueert.

  • Als de berichtenwachtrij transactionele leesbewerkingen levert, kan dit de flexibiliteit verbeteren. Als een exemplaar van de consumentenservice het bericht leest en verwerkt als onderdeel van een transactionele bewerking, en het exemplaar van de consumentenservice mislukt, kan dit patroon ervoor zorgen dat het bericht wordt teruggestuurd naar de wachtrij, waar het wordt opgevangen en verwerkt door een ander exemplaar van de consumentenservice.

Problemen en overwegingen

Beschouw de volgende punten als u besluit hoe u dit patroon wilt implementeren:

  • Berichtvolgorde. De volgorde waarin exemplaren van de consumentenservice berichten ontvangen, staat niet vast en komt niet per se overeen met de volgorde waarin de berichten zijn gemaakt. Ontwerp het systeem zo dat de berichtverwerking idempotent is, want dit draagt bij aan het verwijderen van eventuele afhankelijkheden van de volgorde waarin berichten worden verwerkt. Zie Idempotency Patterns (Idempotentiepatronen, Engelstalig) in de blog van Jonathan Oliver voor meer informatie.

    Microsoft Azure Service Bus-wachtrijen kunnen er door middel van berichtensessies voor zorgen dat berichten op volgorde worden verwerkt. Zie voor meer informatie Messaging Patterns Using Sessions (Berichtenpatronen die gebruikmaken van sessies).

  • Services ontwerpen voor flexibiliteit. Als het systeem is ontworpen om mislukte exemplaren van de service te detecteren en opnieuw te starten, kan het nodig zijn om de verwerkingen die worden uitgevoerd door de service-exemplaren te implementeren als idempotent-bewerkingen. Zo worden de gevolgen van een enkel bericht dat meer dan eens wordt opgehaald en verwerkt beperkt.

  • Onverwerkbare berichten detecteren. Een onjuist ingedeeld bericht of een taak waarvoor toegang tot resources nodig is die niet beschikbaar zijn, kan ertoe leiden dat een exemplaar van de service mislukt. Het systeem moet voorkomen dat dergelijke berichten worden teruggestuurd naar de wachtrij en in plaats daarvan de details van deze berichten vastleggen en ergens anders opslaan, zodat deze indien nodig kunnen worden geanalyseerd.

  • Resultaten verwerken. Het service-exemplaar dat een bericht verwerkt is volledig losgekoppeld van de toepassingslogica waarmee het bericht wordt gegenereerd en kan er mogelijk niet rechtstreeks mee communiceren. Als het service-exemplaar resultaten genereert die moeten worden teruggestuurd naar de toepassingslogica, moeten deze gegevens worden opgeslagen op een locatie die voor beide toegankelijk is. Om te voorkomen dat de toepassingslogica onvolledige gegevens ophaalt, moet het systeem aangeven wanneer het verwerken is voltooid.

    Als u Azure gebruikt, kan een werkproces resultaten terugsturen naar de toepassingslogica via een speciale antwoordenwachtrij. De toepassingslogica moet deze resultaten kunnen relateren aan het oorspronkelijke bericht. Dit scenario wordt uitgebreid beschreven in de Asynchronous Messaging Primer (Inleiding in asynchrone berichtverzending).

  • Het berichtensysteem schalen. In een grootschalige oplossing kan een enkele berichtenwachtrij overbelast raken door het aantal berichten en een knelpunt worden in het systeem. Overweeg in dit geval om het berichtensysteem te partitioneren om berichten van specifieke producenten naar een bepaalde wachtrij te verzenden. U kunt ook gebruikmaken van taakverdeling om berichten te verdelen over meerdere berichtenwachtrijen.

  • Betrouwbaarheid van het berichtensysteem garanderen. Een betrouwbaar berichtensysteem is nodig om er zeker van te zijn dat een bericht niet verloren gaat nadat de toepassing het in een wachtrij heeft geplaatst. Dit is essentieel om ervoor te zorgen dat alle berichten ten minste één keer worden geleverd.

Wanneer dit patroon gebruiken

Gebruik dit patroon wanneer:

  • De werkbelasting voor een toepassing is onderverdeeld in taken die asynchroon kunnen worden uitgevoerd.
  • Taken zijn onafhankelijk en kunnen parallel worden uitgevoerd.
  • Het werkvolume is zeer variabel en vereist een schaalbare oplossing.
  • De oplossing moet een hoge beschikbaarheid bieden en flexibel zijn als de verwerking voor een taak mislukt.

In de volgende gevallen is dit patroon mogelijk niet geschikt:

  • Het is niet eenvoudig om de werkbelasting van de toepassing onder te verdelen in afzonderlijke taken of er is een hoge mate van afhankelijkheid tussen taken.
  • Taken moeten synchroon worden uitgevoerd en de toepassingslogica moet wachten tot een taak is voltooid alvorens verder te gaan.
  • Taken moeten in een bepaalde volgorde worden uitgevoerd.

Sommige berichtsystemen ondersteunen sessies die een producent in staat stellen om berichten te groeperen en ervoor zorgen dat deze allemaal worden afgehandeld door de dezelfde consument. Dit mechanisme kan worden gebruikt met berichten met prioriteit (als deze worden ondersteund) voor de implementatie van een soort berichtrangschikking die berichten op volgorde van een producent aan een enkele gebruiker levert.

Voorbeeld

Azure biedt Service Bus wachtrijen en Azure Function-wachtrijtriggers die, indien gecombineerd, een directe implementatie zijn van dit cloudontwerppatroon. Azure Functions integreren met Azure Service Bus via triggers en bindingen. Door te integreren met Service Bus kunt u functies bouwen die wachtrijberichten gebruiken die door uitgevers worden verzonden. De publicatietoepassing(en) plaatst berichten in een wachtrij en consumenten, geïmplementeerd als Azure Functions, kunnen berichten uit deze wachtrij ophalen en verwerken.

Voor tolerantie stelt een Service Bus-wachtrij een consument in staat om de modus te gebruiken wanneer een bericht uit de wachtrij wordt opgehaald. Met deze modus wordt het bericht niet daadwerkelijk verwijderd, maar wordt het gewoon verborgen voor andere PeekLock consumenten. De Azure Functions-runtime ontvangt een bericht in de PeekLock-modus. Als de functie is voltooid, wordt Voltooid aanroepen op het bericht of Wordt verlaten aanroepen als de functie mislukt, en wordt het bericht weer zichtbaar, zodat een andere consument het kan ophalen. Als de functie langer dan de time-out van PeekLock wordt uitgevoerd, wordt de vergrendeling automatisch vernieuwd zolang de functie wordt uitgevoerd.

Azure Functions kunnen in-/uitschalen op basis van de diepte van de wachtrij, die allemaal als concurrerende consumenten van de wachtrij optreden. Als er meerdere exemplaren van de functies worden gemaakt, concurreren ze allemaal door de berichten onafhankelijk op te halen en te verwerken.

Zie voor gedetailleerde informatie over het gebruik van Azure Service Bus-wachtrijen Service Bus-wachtrijen, -onderwerpen en -abonnementen.

Zie Azure Service Bus trigger for Azure Functions Azure Functions (Azure Service Bus-trigger voorAzure Functions) voor meer informatie over Azure Functions geactiveerde Azure Functions.

De volgende code laat zien hoe u een nieuw bericht kunt maken en naar een Service Bus wachtrij kunt verzenden met behulp van een QueueClient -exemplaar.

private string serviceBusConnectionString = ...;
...

  public async Task SendMessagesAsync(CancellationToken  ct)
  {
   try
   {
    var msgNumber = 0;

    var queueClient = new QueueClient(serviceBusConnectionString, "myqueue");

    while (!ct.IsCancellationRequested)
    {
     // Create a new message to send to the queue
     string messageBody = $"Message {msgNumber}";
     var message = new Message(Encoding.UTF8.GetBytes(messageBody));

     // Write the body of the message to the console
     this._logger.LogInformation($"Sending message: {messageBody}");

     // Send the message to the queue
     await queueClient.SendAsync(message);

     this._logger.LogInformation("Message successfully sent.");
     msgNumber++;
    }
   }
   catch (Exception exception)
   {
    this._logger.LogException(exception.Message);
   }
  }

In het volgende codevoorbeeld ziet u een consument, geschreven als een C# Azure-functie, die metagegevens van berichten leest en een Service Bus Queue-bericht registreert. Let op hoe ServiceBusTrigger het kenmerk wordt gebruikt om het te binden aan een Service Bus wachtrij.

[FunctionName("ProcessQueueMessage")]
public static void Run(
    [ServiceBusTrigger("myqueue", Connection = "ServiceBusConnectionString")]
    string myQueueItem,
    Int32 deliveryCount,
    DateTime enqueuedTimeUtc,
    string messageId,
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function consumed message: {myQueueItem}");
    log.LogInformation($"EnqueuedTimeUtc={enqueuedTimeUtc}");
    log.LogInformation($"DeliveryCount={deliveryCount}");
    log.LogInformation($"MessageId={messageId}");
}

De volgende patronen en richtlijnen zijn mogelijk relevant bij de implementatie van dit patroon:

  • Asynchronous Messaging Primer (Inleiding in asynchrone berichtpatronen). Berichtenwachtrijen zijn een mechanisme voor asynchrone communicatie. Als een consumentenservice een antwoord naar een toepassing moet verzenden, is het mogelijk nodig om een vorm van antwoordberichten te implementeren. De inleiding in asynchrone berichten biedt informatie over het implementeren van aanvraag-/antwoordberichten via berichtenwachtrijen.

  • Richtlijnen voor automatisch schalen. Het kan mogelijk zijn om exemplaren van een consumentenservice te starten en te stoppen, aangezien de lengte van de wachtrij waarop toepassingen berichten plaatsen varieert. Automatisch schalen kan helpen om doorvoer te behouden op piekmomenten.

  • Patroon Consolidatie van rekenresources. Het kan mogelijk zijn om meerdere exemplaren van een consumentenservice in één proces te consolideren, om kosten en beheeroverhead te verminderen. In het patroon Consolidatie van berekenbronnen worden de voor- en nadelen van deze benadering beschreven.

  • Patroon Load Leveling op basis van wachtrij. Met de introductie van een berichtenwachtrij kan het systeem flexibeler worden, waardoor service-exemplaren uiteenlopende hoeveelheden aanvragen van exemplaren van een toepassing kunnen verwerken. De berichtenwachtrij fungeert als buffer, die de belasting verdeelt. In het patroon Load leveling op basis van wachtrij wordt dit scenario uitvoeriger beschreven.