Patroon Pijpen en filters
Splits een taak waarmee een complexe verwerking wordt uitgevoerd, op in een reeks afzonderlijke elementen die kunnen worden hergebruikt. Zo kunt u de prestaties, schaalbaarheid en herbruikbaarheid verbeteren door het mogelijk te maken dat taakelementen die de verwerking uitvoeren, onafhankelijk worden geïmplementeerd en geschaald.
Context en probleem
Een toepassing moet een verscheidenheid aan taken van uiteenlopende complexiteit uitvoeren met de informatie die wordt verwerkt. Een eenvoudige, maar niet-flexibele benadering van de implementatie van een toepassing is deze verwerking uit te voeren als een monolithische module. Maar met deze benadering bestaat de kans dat de mogelijkheden afnemen voor het herstructureren van de code, voor het optimaliseren ervan of de code opnieuw te gebruiken als onderdelen van dezelfde verwerking elders in de toepassing zijn vereist.
De afbeelding illustreert de problemen met de verwerking van gegevens bij gebruik van de monolithische aanpak. Een toepassing ontvangt en verwerkt gegevens van twee bronnen. De gegevens van elke bron worden verwerkt door een afzonderlijke module die een reeks taken uitvoert om deze gegevens te transformeren voordat het resultaat wordt doorgegeven aan de bedrijfslogica van de toepassing.

Enkele van de taken die de monolithische modules uitvoeren, zijn functioneel zeer vergelijkbaar, maar de modules zijn afzonderlijk ontworpen. De code die de taken implementeert, is nauw gekoppeld binnen een module en is ontwikkeld met weinig of geen aandacht voor hergebruik of schaalbaarheid.
De verwerkingstaken die worden uitgevoerd door elke module of de implementatievereisten voor elke taak kunnen echter veranderen naarmate de zakelijke vereisten worden aangepast. Sommige taken kunnen rekenintensief zijn en kunnen profiteren van krachtige hardware, terwijl andere mogelijk geen dure resources vereisen. Ook kan in de toekomst extra verwerking nodig zijn of kan de volgorde veranderen waarin de taken door de verwerking worden uitgevoerd. Er is een oplossing vereist die deze problemen aanpakt en de mogelijkheden voor hergebruik van code vergroot.
Oplossing
Splits de vereiste verwerking voor elke stroom in een set afzonderlijke onderdelen (of filters), die elk één enkele taak uitvoeren. Als u de indeling van de gegevens die elk onderdeel verzendt en ontvangt standaardiseert, kunnen deze filters worden gecombineerd in een pijplijn. Zo wordt voorkomen dat code moet worden gedupliceerd en kunnen extra onderdelen gemakkelijker worden verwijderd, vervangen of geïntegreerd als de verwerkingsvereisten veranderen. In de volgende afbeelding ziet u een oplossing die is geïmplementeerd met pijpen en filters.

De tijd die nodig is om één aanvraag te verwerken, is afhankelijk van de snelheid van het traagste filter in de pijplijn. Een of meer filters kunnen een knelpunt vormen, met name als er een groot aantal aanvragen verschijnt in een stroom van een bepaalde gegevensbron. Een groot voordeel van de pijplijnstructuur is dat deze mogelijkheden biedt om parallelle exemplaren van trage filters uit te voeren, waardoor het systeem de belasting kan spreiden en de doorvoer kan verbeteren.
De filters waaruit een pijplijn bestaat, kunnen worden uitgevoerd op verschillende computers, waardoor ze onafhankelijk van elkaar kunnen worden geschaald en kunnen profiteren van de elasticiteit die veel cloudomgevingen bieden. Een filter dat rekenintensief is, kan worden uitgevoerd op krachtige hardware, terwijl andere minder veeleisende filters kunnen worden gehost op goedkopere basishardware. De filters hoeven zich niet eens in hetzelfde datacenter of dezelfde geografische locatie te bevinden, waardoor elk element in een pijplijn kan worden uitgevoerd in een omgeving dicht bij de resources die nodig zijn. In de volgende afbeelding ziet u een voorbeeld dat is toegepast op de pijplijn voor de gegevens van bron 1.

Als de invoer en uitvoer van een filter zijn gestructureerd als een stroom, is het mogelijk de verwerking voor elk filter parallel uit te voeren. Het eerste filter in de pijplijn kan met zijn werk beginnen en de resultaten uitvoeren. Deze worden vervolgens rechtstreeks doorgegeven aan het volgende filter in de reeks voordat het eerste filter zijn werk heeft voltooid.
Een ander voordeel is de tolerantie die dit model kan bieden. Als een filter mislukt of als de computer waarop dit wordt uitgevoerd, niet meer beschikbaar is, kan de pijplijn het werk dat het filter uitvoerde opnieuw plannen en dit werk sturen naar een ander exemplaar van het onderdeel. Uitval van één filter leidt niet noodzakelijkerwijs tot uitval van de hele pijplijn.
Het gebruik van het patroon Pijpen en filters in combinatie met het patroon Compenserende transactie is een alternatieve methode om gedistribueerde transacties te implementeren. Een gedistribueerde transactie kan worden opgesplitst in afzonderlijke, compenseerbare taken, die elk kunnen worden geïmplementeerd met behulp van een filter dat ook het patroon Compenserende transactie implementeert. De filters in een pijplijn kunnen worden geïmplementeerd als afzonderlijke gehoste taken die worden uitgevoerd dicht bij de gegevens die ze onderhouden.
Problemen en overwegingen
U moet de volgende punten overwegen wanneer u besluit hoe u dit patroon wilt implementeren:
Complexiteit. De grotere flexibiliteit die dit patroon biedt, kan ook leiden tot extra complexiteit, met name als de filters in een pijplijn zijn verspreid over verschillende servers.
Betrouwbaarheid. Gebruik een infrastructuur die ervoor zorgt dat de gegevens die tussen filters in een pijplijn stromen, niet verloren gaan.
Idempotentie. Als een filter in een pijplijn mislukt nadat een bericht is ontvangen en het werk opnieuw is gepland voor een ander exemplaar van het filter, is een deel van het werk mogelijk al voltooid. Als dit werk een bepaald aspect van de globale status bijwerkt (zoals de informatie die in een database is opgeslagen), kan dezelfde update worden herhaald. Een soortgelijk probleem kan optreden als een filter mislukt nadat de resultaten naar het volgende filter in de pijplijn zijn gestuurd, maar voordat het filter heeft aangegeven dat het werk met succes is voltooid. In deze gevallen kan hetzelfde werk worden herhaald door een ander exemplaar van het filter, waardoor de dezelfde resultaten tweemaal worden geplaatst. Dit kan ertoe leiden dat volgende filters in de pijplijn dezelfde gegevens tweemaal verwerken. Filters in een pijplijn moeten daarom zo worden ontwerpen dat ze idempotent zijn. Zie Idempotency Patterns (Idempotentiepatronen) op de blog van Jonathan Jonathan voor meer informatie.
Herhaalde berichten. Als een filter in een pijplijn mislukt nadat een bericht naar de volgende fase van de pijplijn is verzonden, kan een ander exemplaar van het filter worden uitgevoerd en zal dit een kopie van hetzelfde bericht in de pijplijn plaatsen. Dit kan ertoe leiden dat twee exemplaren van hetzelfde bericht worden doorgegeven aan het volgende filter. Om dit te voorkomen, moet de pijplijn dubbele berichten detecteren en verwijderen.
Als u de pijplijn implementeert met behulp van berichtenwachtrijen (zoals Microsoft Azure Service Bus-wachtrijen), kan de infrastructuur voor berichtenwachtrijen dubbele berichten mogelijk automatisch detecteren en verwijderen.
Context en status. In een pijplijn wordt elk filter in feite geïsoleerd uitgevoerd en mag dit geen aannames doen over hoe het is aangeroepen. Dit betekent dat elk filter moet worden voorzien van voldoende context om zijn werk te kunnen uitvoeren. Deze context kan een grote hoeveelheid statusinformatie bevatten.
Wanneer dit patroon gebruiken
Gebruik dit patroon wanneer:
De verwerking die een toepassing vereist, kan eenvoudig worden opgesplitst in een reeks onafhankelijke stappen.
De verwerkingsstappen die worden uitgevoerd door een toepassing, hebben verschillende schaalbaarheidsvereisten.
Het is mogelijk filters die samen moeten worden geschaald, te groeperen in hetzelfde proces. Zie Patroon Consolidatie van rekenresources voor meer informatie.
Flexibiliteit is vereist om de volgorde van de stappen die door een toepassing worden uitgevoerd te kunnen wijzigen of de mogelijkheid te kunnen bieden stappen toe te voegen en te verwijderen.
Het systeem kan ervan profiteren als de verwerking van stappen over verschillende servers wordt verdeeld.
Er is een betrouwbare oplossing vereist die de gevolgen van een fout in een stap terwijl gegevens worden verwerkt, tot het minimum beperkt.
In de volgende gevallen is dit patroon mogelijk niet geschikt:
De verwerkingsstappen die worden uitgevoerd door een toepassing, zijn niet onafhankelijk of moeten samen worden uitgevoerd als onderdeel van dezelfde transactie.
Deze benadering is inefficiënt vanwege de hoeveelheid context of statusinformatie die vereist is voor een stap. Het kan mogelijk zijn statusinformatie in plaats daarvan vast te leggen in een database, maar u mag deze strategie niet gebruiken als de extra belasting van de database zeer veel conflicten veroorzaakt.
Voorbeeld
U kunt een reeks berichtenwachtrijen gebruiken om de infrastructuur te bieden die vereist is om een pijplijn te implementeren. Een eerste berichtenwachtrij ontvangt niet-verwerkte berichten. Een onderdeel dat is geïmplementeerd als een filtertaak, luistert naar een bericht in deze wachtrij, voert het werk uit en plaatst vervolgens het getransformeerde bericht in de volgende wachtrij in de reeks. Een andere filtertaak kan luisteren naar berichten in deze wachtrij, deze verwerken, de resultaten in een andere wachtrij plaatsen enzovoort, totdat de volledig getransformeerde gegevens verschijnen in het laatste bericht in de wachtrij. De volgende afbeelding toont een voorbeeld van de implementatie van een pijplijn met berichtenwachtrijen.

Als u een oplossing in Azure maakt, kunt u Service Bus-wachtrijen gebruiken om een betrouwbaar en schaalbaar wachtrijmechanisme te bieden. De klasse ServiceBusPipeFilter in C# hieronder laat zien hoe u een filter kunt implementeren dat invoerberichten ontvangt van een wachtrij, deze berichten verwerkt en de resultaten in een andere wachtrij plaatst.
De klasse
ServiceBusPipeFilteris gedefinieerd in het project PipesAndFilters.Shared, dat beschikbaar is via GitHub.
public class ServiceBusPipeFilter
{
...
private readonly string inQueuePath;
private readonly string outQueuePath;
...
private QueueClient inQueue;
private QueueClient outQueue;
...
public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
{
...
this.inQueuePath = inQueuePath;
this.outQueuePath = outQueuePath;
}
public void Start()
{
...
// Create the outbound filter queue if it doesn't exist.
...
this.outQueue = QueueClient.CreateFromConnectionString(...);
...
// Create the inbound and outbound queue clients.
this.inQueue = QueueClient.CreateFromConnectionString(...);
}
public void OnPipeFilterMessageAsync(
Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
{
...
this.inQueue.OnMessageAsync(
async (msg) =>
{
...
// Process the filter and send the output to the
// next queue in the pipeline.
var outMessage = await asyncFilterTask(msg);
// Send the message from the filter processor
// to the next queue in the pipeline.
if (outQueue != null)
{
await outQueue.SendAsync(outMessage);
}
// Note: There's a chance that the same message could be sent twice
// or that a message gets processed by an upstream or downstream
// filter at the same time.
// This would happen in a situation where processing of a message was
// completed, it was sent to the next pipe/queue, and then failed
// to complete when using the PeekLock method.
// Idempotent message processing and concurrency should be considered
// in a real-world implementation.
},
options);
}
public async Task Close(TimeSpan timespan)
{
// Pause the processing threads.
this.pauseProcessingEvent.Reset();
// There's no clean approach for waiting for the threads to complete
// the processing. This example simply stops any new processing, waits
// for the existing thread to complete, then closes the message pump
// and finally returns.
Thread.Sleep(timespan);
this.inQueue.Close();
...
}
...
}
De methode Start in de klasse ServiceBusPipeFilter maakt verbinding met een combinatie van een invoer- en een uitvoerwachtrij, en de methode Close verbreekt de verbinding met de invoerwachtrij. De methode OnPipeFilterMessageAsync voert de daadwerkelijke verwerking van berichten uit, waarbij de parameter asyncFilterTask voor deze methode aangeeft welke verwerking moet worden uitgevoerd. De methode OnPipeFilterMessageAsync wacht op inkomende berichten in de invoerwachtrij, voert de code uit die is opgegeven door de parameter asyncFilterTask voor elk bericht terwijl dit binnenkomt en plaatst de resultaten in de uitvoerwachtrij. De wachtrijen zelf worden opgegeven door de constructor.
De voorbeeldoplossing implementeert filters in een reeks werkrollen. Elke werkrol kan onafhankelijk worden geschaald, afhankelijk van de complexiteit van de bedrijfsverwerking die wordt uitgevoerd of de resources die zijn vereist voor de verwerking. Daarnaast kunnen meerdere exemplaren van elke werkrol parallel worden uitgevoerd om de doorvoer te verbeteren.
De volgende code toont een Azure-werkrol met de naam PipeFilterARoleEntry, die is gedefinieerd in het project PipeFilterA in de voorbeeldoplossing.
public class PipeFilterARoleEntry : RoleEntryPoint
{
...
private ServiceBusPipeFilter pipeFilterA;
public override bool OnStart()
{
...
this.pipeFilterA = new ServiceBusPipeFilter(
...,
Constants.QueueAPath,
Constants.QueueBPath);
this.pipeFilterA.Start();
...
}
public override void Run()
{
this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
{
// Clone the message and update it.
// Properties set by the broker (Deliver count, enqueue time, ...)
// aren't cloned and must be copied over if required.
var newMsg = msg.Clone();
await Task.Delay(500); // DOING WORK
Trace.TraceInformation("Filter A processed message:{0} at {1}",
msg.MessageId, DateTime.UtcNow);
newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");
return newMsg;
});
...
}
...
}
Deze rol omvat het object ServiceBusPipeFilter. De methode OnStart in de rol maakt verbinding met de wachtrijen voor het ontvangen van invoerberichten en het plaatsen van uitvoerberichten (de namen van de wachtrijen zijn gedefinieerd in de klasse Constants). De methode Run roept de methode OnPipeFilterMessagesAsync aan om elk ontvangen bericht te verwerken (in dit voorbeeld wordt de verwerking gesimuleerd door even te wachten). Wanneer de verwerking is voltooid, wordt een nieuw bericht samengesteld met de resultaten (in dit geval wordt aan het invoerbericht een aangepaste eigenschap toegevoegd) en wordt dit bericht in de uitvoerwachtrij geplaatst.
De voorbeeldcode bevat een andere werkrol met de naam PipeFilterBRoleEntry in het project PipeFilterB. Deze rol is vergelijkbaar met PipeFilterARoleEntry, behalve dat deze een andere bewerking uitvoert in de methode Run. In de voorbeeldoplossing worden deze twee rollen gecombineerd om een pijplijn te maken. De uitvoerwachtrij voor de rol PipeFilterARoleEntry is de invoerwachtrij voor de rol PipeFilterBRoleEntry.
De oplossing biedt ook twee aanvullende rollen genaamd InitialSenderRoleEntry (in het project InitialSender) en FinalReceiverRoleEntry (in het project FinalReceiver). De rol InitialSenderRoleEntry biedt het aanvankelijke bericht in de pijplijn. De methode OnStart maakt verbinding met één wachtrij en de methode Run plaatst een methode in deze wachtrij. Deze wachtrij is de invoerwachtrij die wordt gebruikt door de rol PipeFilterARoleEntry. Als een bericht wordt geplaatst, wordt het bericht daarom ontvangen en verwerkt door de rol PipeFilterARoleEntry. Het verwerkte bericht wordt vervolgens doorgegeven via de rol PipeFilterBRoleEntry.
De wachtrij voor de rol FinalReceiveRoleEntry is de uitvoerwachtrij voor de rol PipeFilterBRoleEntry. De methode Run in de rol FinalReceiveRoleEntry, hieronder weergegeven, ontvangt het bericht en voert de laatste verwerking uit. Vervolgens worden de waarden van de aangepaste eigenschappen die door de filters in de pijplijn zijn toegevoegd, naar de trace-uitvoer geschreven.
public class FinalReceiverRoleEntry : RoleEntryPoint
{
...
// Final queue/pipe in the pipeline to process data from.
private ServiceBusPipeFilter queueFinal;
public override bool OnStart()
{
...
// Set up the queue.
this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
this.queueFinal.Start();
...
}
public override void Run()
{
this.queueFinal.OnPipeFilterMessageAsync(
async (msg) =>
{
await Task.Delay(500); // DOING WORK
// The pipeline message was received.
Trace.TraceInformation(
"Pipeline Message Complete - FilterA:{0} FilterB:{1}",
msg.Properties[Constants.FilterAMessageKey],
msg.Properties[Constants.FilterBMessageKey]);
return null;
});
...
}
...
}
Volgende stappen
De volgende richtlijnen zijn mogelijk ook relevant bij de implementatie van dit patroon:
- Een voorbeeld waarin dit patroon wordt gedemonstreerd, is beschikbaar op GitHub.
Verwante informatie
De volgende patronen kunnen ook relevant zijn bij het implementeren van dit patroon:
- Patroon Concurrerende consumenten. Een pijplijn kan meerdere exemplaren van een of meer filters bevatten. Deze benadering is handig voor het uitvoeren van parallelle exemplaren van trage filters, waardoor het systeem de belasting kan spreiden en de doorvoer kan verbeteren. Elk exemplaar van een filter concurreert om invoer met de andere exemplaren. Het zou niet mogelijk moeten zijn dat twee exemplaren van een filter dezelfde gegevens verwerken. Biedt een uitleg van deze benadering.
- Patroon Consolidatie van rekenresources. Het kan mogelijk zijn filters die samen moeten worden geschaald, te groeperen in hetzelfde proces. Biedt meer informatie over de voor- en nadelen van deze strategie.
- Patroon Compenserende transactie. Een filter kan worden geïmplementeerd als een bewerking die kan worden teruggedraaid of die een compensatiebewerking heeft die de status herstelt naar een eerdere versie in het geval van een storing. Legt uit hoe dit kan worden geïmplementeerd om de uiteindelijke consistentie te handhaven of te bereiken.
- Idempotentiepatronen in de blog van Jonathan Jonathan.