Mönstret Rör och filter

Dela upp en aktivitet som utför komplex bearbetningen i en serie olika element som kan återanvändas. Detta kan förbättra prestanda, skalbarhet och återanvändningsmöjligheter genom att tillåta att aktivitetselement som utför bearbetningen kan distribueras och skalas oberoende av varandra.

Kontext och problem

Ett program krävs för att utföra olika uppgifter av varierande komplexitet för den information som bearbetas. En enkel men oflexibel metod för att implementera ett program är att utföra denna bearbetning som en monolitisk modul. Den här metoden minskar dock troligtvis möjligheterna att omstrukturera koden, optimera den eller återanvända den om delar av samma bearbetning behövs någon annanstans i programmet.

Problemen med bearbetning av data med hjälp av den monolitiska metoden visas på bilden. Ett program tar emot och bearbetar data från två källor. Data från varje källa bearbetas av en separat modul som utför en serie aktiviteter för att omvandla data, innan resultatet skickas till affärslogiken i programmet.

Bild 1 – En lösning som implementeras med hjälp av monolitiska moduler

En del av de uppgifter som de monolitiska modulerna utför fungerar på ungefär samma sätt, men modulerna har utformats separat. Den kod som implementerar aktiviteterna är nära hopkopplad i en modul och har utvecklats med liten eller ingen hänsyn tagen till återanvändning och skalbarhet.

De bearbetningsuppgifter som utförs av varje modul eller distributionskraven för varje aktivitet kan dock förändras i takt med att företagets krav uppdateras. Vissa aktiviteter kan vara datorkraftsintensiva och kan dra nytta av att köras på kraftfull maskinvara, medan andra inte kanske kräver så dyra resurser. Dessutom kan ytterligare bearbetning krävas i framtiden, eller också kan den ordning som uppgifterna som utförs i under bearbetningen förändras. Det krävs en lösning som åtgärdar problemen och ökar möjligheterna till återanvändning av kod.

Lösning

Bryt ned den bearbetning som krävs för varje dataström till en uppsättning med olika komponenter (eller filter) som var och en utför en enskild uppgift. Genom att standardisera formatet för de data som varje komponent tar emot och skickar kan dessa filter kombineras tillsammans i en pipeline. Detta hjälper dig för att undvika att duplicera koden och gör det enkelt att ta bort, ersätta eller integrera ytterligare komponenter om kraven för bearbetning förändras. På nästa bild visas en lösning som implementeras med hjälp av rör och filter.

Bild 2 – En lösning som implementeras med hjälp av rör och filter

Hur lång tid det tar att bearbeta en enskild begäran beror på hastigheten på långsammaste filtret i pipelinen. Ett eller flera filter kan utgöra en flaskhals, särskilt om ett stort antal begäranden visas i en ström från en viss datakälla. En stor fördel med pipeline-strukturen är att det ger möjligheter att köra parallella instanser av långsamma filter, och gör det möjligt för systemet att sprida belastningen och förbättra genomflödet.

Filter som utgör en pipeline kan köras på olika datorer, så att de kan skalas oberoende av varandra och dra nytta av den elasticitet som många molnmiljöer erbjuder. Ett filter som är beräkningsintensivt kan köras på högpresterande maskinvara, medan andra mindre krävande filter kan finnas på billigare maskinvara. Filtren behöver inte ens finnas i samma datacenter eller geografiska plats, vilket gör att varje element i en pipeline kan köras i en miljö nära de resurser som krävs. Nästa figur visar ett exempel som tillämpas på pipelinen för data från källa 1.

Bild 3 visar ett exempel som tillämpas på pipelinen för data från källa 1

Om indata och utdata för ett filter är strukturerade som en dataström, är det möjligt att utföra bearbetningen för varje filter parallellt. Det första filtret i pipelinen startar sitt arbete och matar ut resultatet som skickas direkt till nästa filter i sekvensen innan det första filtret har slutfört sitt arbete.

En annan fördel är den återhämtningsförmåga som den här modellen innebär. Om fel uppstår i ett filter, eller om den dator den körs på inte längre är tillgänglig, kan pipelinen omfördela det arbete som utfördes av filtret och dirigera arbetet till en annan instans av komponenten. Fel i ett enda filter resulterar inte nödvändigtvis i fel på hela pipelinen.

Med hjälp av mönstret Rör och filter tillsammans med mönstret för kompenserande transaktioner är en annan metod för att implementera distribuerade transaktioner. En distribuerad transaktion kan delas upp i separata, kompenserbara uppgifter, som kan implementeras med hjälp av ett filter som även implementerar kompenserande transaktionsmönster. Filtren i en pipeline kan implementeras som separata värdbaserade aktiviteter som körs nära de data som de underhåller.

Problem och överväganden

När du bestämmer hur det här mönstret ska implementeras bör du överväga följande punkter:

  • Komplexitet. Den ökade flexibilitet som det här mönstret ger kan också introducera komplexitet, i synnerhet om filtren i en pipeline är fördelade på olika servrar.

  • Tillförlitlighet. Använd en infrastruktur som garanterar att data som flödar mellan filter i en pipeline inte går förlorade.

  • Idempotens. Om det uppstår ett fel i ett filter i en pipeline när det har tagit emot ett meddelande och arbetet omfördelas till en annan instans av filtret, kanske en del av arbetet redan har slutförts. Om detta arbete uppdaterar någon aspekt av det globala tillståndet (till exempel information som lagras i en databas), kan samma uppdatering upprepas. Ett liknande problem kan uppstå om ett fel uppstår i ett filter efter befordran av resultaten till nästa filter i pipelinen, men innan det anger att arbetet har slutförts. I dessa fall kan samma arbete upprepas av en annan instans av filtret, och göra att samma resultat publiceras två gånger. Detta kan resultera i att efterföljande filter i pipelinen bearbetar samma data två gånger. Filtren i en pipeline bör därför utformas ska vara idempotenta. Mer information finns i Idempotency Patterns (Idempotensmönster) på Jonathan Jonathans blogg.

  • Upprepade meddelanden. Om ett fel uppstår i ett filter i en pipeline när du skickar ett meddelande till nästa steg i pipeline kan en annan instans av filtret köras och den postar en kopia av samma meddelande till pipelinen. Det kan leda till att två instanser av samma meddelande skickas till nästa filter. Om du vill undvika detta ska pipelinen identifiera och eliminera dubbletter av meddelanden.

    Om du implementerar pipelinen med hjälp av meddelandeköer (till exempel Microsoft Azure Service Bus-köer), kan infrastrukturen hos meddelandekön ange automatiskt identifiering och borttagning av dubblettmeddelanden.

  • Kontext och tillstånd. I en pipeline körs varje filter i stort sett i isolerat och bör inte göra några antaganden om hur den anropades. Detta innebär att varje filter ska anges med tillräcklig kontext för att utföra sitt arbete. Den här kontexten kan omfatta en stor mängd information om tillstånd.

När du ska använda det här mönstret

Använd det här mönstret i sådana här scenarier:

  • Den bearbetning som krävs av ett program kan enkelt delas upp i en uppsättning fristående steg.

  • De bearbetningssteg som utförs av ett program har olika skalbarhetskrav.

    Det är möjligt att gruppera filter som ska skalas tillsammans i samma process. Mer information finns i Compute Resource Consolidation pattern (Mönster för konsolidering av beräkningsresurser).

  • Flexibilitet krävs för att ändra ordningen på de bearbetningssteg som utförs av ett program eller möjligheten att lägga till och ta bort steg.

  • Systemet kan dra nytta av att distribuera bearbetningen för steg mellan olika servrar.

  • En tillförlitlig lösning som minimerar effekterna av fel i ett steg medan data bearbetas krävs.

Det här mönstret är kanske inte användbart om:

  • De bearbetningssteg som utförs av ett program är inte fristående, utan måste utföras tillsammans som en del av samma transaktion.

  • Den mängd sammanhangs- eller tillståndsinformation som krävs av ett steg gör den här metoden ineffektiv. Det kan vara möjligt att bevara statusinformation i en databas i stället, men använd inte den här strategin om ytterligare belastning på databasen orsakar stor konkurrens.

Exempel

Du kan använda en sekvens av meddelandeköer för att tillhandahålla den infrastruktur som krävs för att implementera en pipeline. En inledande meddelandekö får obearbetade meddelanden. En komponent som implementeras som en filteraktivitet lyssnar efter ett meddelande i den här kön, utför arbetet och skickar sedan det omvandlade meddelandet till nästa kö i sekvensen. En annan aktivitet i filtret kan lyssna efter meddelanden i den här kön, bearbeta dem, lägga upp resultaten i en annan kö och så vidare tills fullständigt omvandlade data visas i det sista meddelandet i kön. På nästa bild visas implementering av en pipeline med hjälp av meddelandeköer.

Bild 4 – Implementera en pipeline med hjälp av meddelandeköer

Du kan använda Service Bus-köer för att tillhandahålla en tillförlitlig och skalbar köhanteringsmekanism om du bygger en lösning på Azure. Klassen ServiceBusPipeFilter som visas nedan i C# illustrerar hur du kan implementera ett filter som tar emot inkommande meddelanden från en kö, bearbetar dessa meddelanden och skickar resultatet till en annan kö.

Klassen ServiceBusPipeFilter definieras i projektet PipesAndFilters.Shared som finns på ServiceBusPipeFilter.

public class ServiceBusPipeFilter
{
  ...
  private readonly string inQueuePath;
  private readonly string outQueuePath;
  ...
  private QueueClient inQueue;
  private QueueClient outQueue;
  ...

  public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
  {
     ...
     this.inQueuePath = inQueuePath;
     this.outQueuePath = outQueuePath;
  }

  public void Start()
  {
    ...
    // Create the outbound filter queue if it doesn't exist.
    ...
    this.outQueue = QueueClient.CreateFromConnectionString(...);

    ...
    // Create the inbound and outbound queue clients.
    this.inQueue = QueueClient.CreateFromConnectionString(...);
  }

  public void OnPipeFilterMessageAsync(
    Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
  {
    ...

    this.inQueue.OnMessageAsync(
      async (msg) =>
    {
      ...
      // Process the filter and send the output to the
      // next queue in the pipeline.
      var outMessage = await asyncFilterTask(msg);

      // Send the message from the filter processor
      // to the next queue in the pipeline.
      if (outQueue != null)
      {
        await outQueue.SendAsync(outMessage);
      }

      // Note: There's a chance that the same message could be sent twice
      // or that a message gets processed by an upstream or downstream
      // filter at the same time.
      // This would happen in a situation where processing of a message was
      // completed, it was sent to the next pipe/queue, and then failed
      // to complete when using the PeekLock method.
      // Idempotent message processing and concurrency should be considered
      // in a real-world implementation.
    },
    options);
  }

  public async Task Close(TimeSpan timespan)
  {
    // Pause the processing threads.
    this.pauseProcessingEvent.Reset();

    // There's no clean approach for waiting for the threads to complete
    // the processing. This example simply stops any new processing, waits
    // for the existing thread to complete, then closes the message pump
    // and finally returns.
    Thread.Sleep(timespan);

    this.inQueue.Close();
    ...
  }

  ...
}

Metoden Start i ServiceBusPipeFilter-klassen ansluter till ett par av inkommande och utgående köer och Close-metoden kopplar från indatakön. Metoden OnPipeFilterMessageAsync utför den faktiska bearbetningen av meddelanden, parametern asyncFilterTask till den här metoden anger att bearbetningen ska utföras. Metoden OnPipeFilterMessageAsync väntar på inkommande meddelanden i indatakön, kör den kod som anges av asyncFilterTask-parametern över varje meddelande allt eftersom det anländer och skickar resultatet till den utgående kön. Köerna själva anges av konstruktören.

Exempellösningen implementerar filter i en uppsättning arbetsroller. Varje arbetsroll kan skalas oberoende av varandra, beroende på komplexiteten i den verksamhetsbearbetning som den utför eller de resurser som krävs för bearbetningen. Dessutom kan flera instanser av varje arbetsroll köras parallellt för att förbättra genomflödet.

Följande kod visar en Azure-arbetsroll som heter PipeFilterARoleEntry, som har definierats i PipeFilterA-projektet i exempellösningen.

public class PipeFilterARoleEntry : RoleEntryPoint
{
  ...
  private ServiceBusPipeFilter pipeFilterA;

  public override bool OnStart()
  {
    ...
    this.pipeFilterA = new ServiceBusPipeFilter(
      ...,
      Constants.QueueAPath,
      Constants.QueueBPath);

    this.pipeFilterA.Start();
    ...
  }

  public override void Run()
  {
    this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
    {
      // Clone the message and update it.
      // Properties set by the broker (Deliver count, enqueue time, ...)
      // aren't cloned and must be copied over if required.
      var newMsg = msg.Clone();

      await Task.Delay(500); // DOING WORK

      Trace.TraceInformation("Filter A processed message:{0} at {1}",
        msg.MessageId, DateTime.UtcNow);

      newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");

      return newMsg;
    });

    ...
  }

  ...
}

Den här rollen innehåller ett ServiceBusPipeFilter-objekt. OnStart-metoden i rollen ansluter till köerna för att ta emot inkommande meddelanden och skicka utgående meddelanden (namnen på köerna definieras i Constants-klassen). Metoden Run anropar metod OnPipeFilterMessagesAsync för att utföra viss bearbetning av varje meddelande som tas emot (i det här exemplet simuleras bearbetningen genom en kort paus). När bearbetningen har slutförts skapas ett nytt meddelande som innehåller resultatet (i det här fallet har det inkommande meddelandet en anpassad egenskap tillagd), och det här meddelandet skickas till den utgående kön.

Exempelkoden innehåller en annan arbetsroll med namnet PipeFilterBRoleEntry i PipeFilterB-projektet. Den här rollen liknar PipeFilterARoleEntry, förutom att det fungerar i olika vid bearbetning med Run-metoden. I exempellösningen kombineras dessa två roller för att skapa en pipeline; den utgående kön för rollen PipeFilterARoleEntry är indatakö för rollen PipeFilterBRoleEntry.

Exempellösningen innehåller också två ytterligare roller med namnet InitialSenderRoleEntry (i projektet InitialSender) och FinalReceiverRoleEntry (i projektet FinalReceiver). Rollen InitialSenderRoleEntry ger det första meddelandet i pipelinen. Metoden OnStart ansluter till en enskild kö och metoden Run skickar en metod till den här kön. Den här kön är den indatakö som används av rollen PipeFilterARoleEntry, och om ett e-postmeddelande skickas till den tas meddelandet emot och bearbetas av PipeFilterARoleEntry-rollen. Det bearbetade meddelandet passerar sedan rollen PipeFilterBRoleEntry.

Indatakön för rollen FinalReceiveRoleEntry är utdatakön för rollen PipeFilterBRoleEntry. Metoden Run i rollen FinalReceiveRoleEntry som visas nedan tar emot meddelandet och utför viss slutlig behandling. Sedan skriver den värdena för de anpassade egenskaper som har lagts till av filtren i pipeline till spårningsutdata.

public class FinalReceiverRoleEntry : RoleEntryPoint
{
  ...
  // Final queue/pipe in the pipeline to process data from.
  private ServiceBusPipeFilter queueFinal;

  public override bool OnStart()
  {
    ...
    // Set up the queue.
    this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
    this.queueFinal.Start();
    ...
  }

  public override void Run()
  {
    this.queueFinal.OnPipeFilterMessageAsync(
      async (msg) =>
      {
        await Task.Delay(500); // DOING WORK

        // The pipeline message was received.
        Trace.TraceInformation(
          "Pipeline Message Complete - FilterA:{0} FilterB:{1}",
          msg.Properties[Constants.FilterAMessageKey],
          msg.Properties[Constants.FilterBMessageKey]);

        return null;
      });
    ...
  }

  ...
}

Nästa steg

Följande riktlinjer kan även vara relevanta när du implementerar det här mönstret:

  • Ett exempel som visar det här mönstret finns på GitHub.

Följande mönster kan också vara relevanta när du implementerar det här mönstret:

  • Mönster för konkurrerande konsumenter. En pipeline kan innehålla flera instanser av ett eller flera filter. Den här metoden är användbar för att köra parallella instanser av långsamma filter, vilket gör det möjligt för systemet att sprida belastningen och förbättra genomflödet. Varje instans av ett filter konkurrerar om indata med de andra instanserna; två instanser av ett filter ska inte kunna behandla samma data. Innehåller en förklaring av den här metoden.
  • Mönster för konsolidering av beräkningsresurser. Det kan vara möjligt att gruppera filter som ska skalas tillsammans i samma process. Tillhandahåller mer information om för- och nackdelar med denna strategi.
  • Kompenserande transaktionsmönster. Ett filter kan implementeras som en åtgärd som kan ångras eller som har en kompenserande åtgärd som återställer tillståndet till en tidigare version vid fel. Förklarar hur detta kan implementeras för att underhålla eller uppnå slutlig konsekvens.
  • Idempotensmönster på Jonathan Jonathans blogg.