Model kanálů a filtrů
Rozdělí úlohu, která provádí komplexní zpracování, do řady samostatných prvků, které je možné využít znovu. Pokud umožníte prvkům úlohy, které provádí zpracování, aby byly nasazené a škálované nezávisle, může to vylepšit výkon, škálovatelnost a opětovné použití.
Kontext a problém
Po aplikaci se požaduje, aby prováděla různé úlohy s různou složitostí na zpracovávaných informacích. Jasným, ale nepřizpůsobitelným přístupem k implementování aplikace je provést toto zpracování jako monolitický modul. Tento přístup však pravděpodobně omezí možnosti refaktorování kódu, jeho optimalizace nebo opakovaného použití v případě, že se části stejného zpracování vyžadují jinde v rámci aplikace.
Obrázek znázorňuje problémy se zpracováním dat při použití monolitického přístupu. Aplikace obdrží a zpracuje data ze dvou zdrojů. Data z každého zdroje se zpracují odděleným modulem, který provádí řadu úloh, aby tato data transformoval, než výsledek předá obchodní logice aplikace.

Některé z úloh, které monolitické moduly provádí, jsou funkčně velmi podobné, ale tyto moduly byly navrženy odděleně. Kód, který úlohy implementuje, je úzce párovaný v modulu a při jeho vývoji se na opakované použití nebo škálovatelnost prakticky nemyslelo.
Zpracování úloh, které provádí každý modul, nebo požadavky na nasazení každého úkolu by se však mohly při aktualizaci obchodních požadavků změnit. Některé úlohy mohou být náročné na výpočetní výkon a může jim prospět, když poběží na výkonném hardwaru, zatímco jiné úlohy tyto nákladné prostředky nemusí potřebovat. Navíc do budoucna může být potřeba další zpracování nebo se pořadí, ve kterém se úlohy vykonávají, může změnit. Proto je potřeba řešení, které tyto problémy zohledňuje a zvyšuje šanci na opětovné použití kódu.
Řešení
Zpracování potřebné pro každý datový proud rozdělte do sady samostatných částí (nebo filtrů), aby každá část prováděla jediný úkol. Když standardizujete formát dat, který každá část přijímá a odesílá, můžete tyto filtry zkombinovat do kanálu. To pomáhá předejít duplikování kódu a usnadňuje odebrání, nahrazení nebo integrování dalších částí, pokud se požadavky na zpracování změní. Následující obrázek znázorňuje řešení implementované pomocí kanálů a filtrů.

Doba potřebná ke zpracování jedné žádosti závisí na rychlosti nejpomalejšího filtru v kanálu. Jeden nebo více filtrů mohou být kritickým bodem, zvlášť pokud se v datovém proudu objeví z konkrétního zdroje dat velký počet žádostí. Hlavní výhodou struktury kanálu je, že umožňuje spuštění paralelních instancí pomalých filtrů, což dává systému možnost rozložit zátěž a zlepšit propustnost.
Filtry, které kanál tvoří, mohou běžet na různých počítačích, což jim umožňuje provádět škálování nezávisle a mohou tak využít flexibility, které mnoho cloudových prostředí nabízí. Filtr, který je výpočetně náročný, může běžet na vysoce výkonném hardwaru, zatímco jiné méně náročné filtry je možné hostovat na levnějším komoditním hardwaru. Filtry ani nemusí být ve stejném datacentru nebo zeměpisném umístění, což umožňuje spuštění jednotlivých prvků v kanálu v prostředí blízko prostředkům, které vyžaduje. Další obrázek ukazuje příklad použitý na kanál pro data ze Zdroje 1.

Pokud jsou vstup a výstup filtru strukturované jako datový proud, je možné provést zpracování u každého paralelního filtru. První filtr v kanálu může začít pracovat a vydávat výsledky, které budou předány přímo dalšímu filtru v pořadí, než první filtr dokončí svou práci.
Další výhodou je odolnost proti chybám, kterou tento model poskytuje. Pokud filtr selže nebo pokud počítač, na kterém běží, už není dostupný, může kanál změnit plán práce, kterou filtr prováděl, a směrovat tuto práci na jinou instanci této součásti. Selhání jednoho filtru nemusí nutně znamenat selhání celého kanálu.
Alternativním přístupem k implementaci distribuovaných transakcí je použití modelu kanálů a filtrů společně s modelem kompenzačních transakcí. Distribuovanou transakci je možné rozdělit na samostatné a nahraditelné úkoly, kdy každý úkol je možné implementovat pomocí filtru, který zároveň implementuje model kompenzačních transakcí. Filtry v kanálu je možné implementovat jako samostatné hostované úlohy spuštěné v blízkosti dat, která spravují.
Problémy a důležité informace
Když se budete rozhodovat, jak tento model implementovat, měli byste vzít v úvahu následující skutečnosti:
Složitost: zvýšená flexibilita, kterou tento model poskytuje, může mít také za následek komplikovanost, zvlášť pokud jsou filtry v kanálu distribuované napříč různými servery.
Reliability. použijte infrastrukturu, která zajistí, že nedojde mezi filtry v kanálu ke ztrátě toku dat.
Idempotence: pokud filtr v kanálu selže po přijetí zprávy a práce je naplánovaná pro jinou instanci tohoto filtru, je možné, že část této práce byla již dokončena. Pokud tato práce aktualizuje některý aspekt globálního stavu (například informaci uloženou v databázi), je možné stejnou aktualizaci opakovat. K podobnému problému může dojít, pokud filtr selže po odeslání svých výsledků dalšímu filtru v kanálu, ale před oznámením, že práci úspěšně dokončil. V těchto případech je možné stejnou práci opakovat jinou instancí filtru, což by způsobilo dvojnásobné odeslání stejných výsledků. To by mohlo mít za následek dvojnásobné zpracování stejných dat následujícími filtry v kanálu. Proto by filtry v kanálu měly být navrhované jako idempotentní. Další informace najdete v článku o vzorech idempotence na blogu Jonathana Jonathana.
Opakované zprávy: pokud filtr v kanálu selže po odeslání zprávy do další fáze kanálu, může se spustit jiná instance filtru, která do kanálu odešle kopii stejné zprávy. To by mohlo způsobit předání dvou instancí stejné zprávy dalšímu filtru. Abyste tomu předešli, kanál by měl rozpoznávat a odstraňovat duplicitní zprávy.
Pokud kanál implementujete pomocí fronty zpráv (například fronty Microsoft Azure Service Bus), infrastruktura zařazování zpráv do front může poskytnout automatické rozpoznání a odstranění duplicitní zprávy.
Kontext a stav: Každý filtr v kanálu v podstatě běží izolovaně a neměl by jakkoli předpokládat, jakým způsobem byl vyvolán. To znamená, že každý filtr by měl být vybaven dostatečným kontextem k provedení své práce. Tento kontext by mohl obsahovat velké množství informací o stavu.
Kdy se má tento model použít
Tento model použijte v těchto případech:
Zpracování vyžadované aplikací může být snadno rozloženo do sady nezávislých kroků.
Kroky zpracování prováděné aplikací mají jiné požadavky na škálovatelnost.
Filtry, které by se měly škálovat společně ve stejném procesu, je možné seskupovat. Další informace najdete v tématu o modelu výpočtu konsolidace prostředků.
Aby bylo možné povolit změnu pořadí kroků zpracování, které provádí aplikace, nebo funkci pro přidání a odebrání kroků, je nutné zajistit flexibilitu.
Systém může mít užitek z distribuování zpracování pro kroky napříč různými servery.
Vyžaduje se spolehlivé řešení, které minimalizuje následky selhání v některém kroku, zatímco se data zpracovávají.
Tento model nebude pravděpodobně vhodný v následujících případech:
Kroky zpracování provedené aplikací nejsou nezávislé nebo musí být provedené společně v rámci stejné transakce.
Množství kontextových informací nebo informací o stavu, které libovolný krok vyžaduje, činí tento přístup neefektivním. Informace o stavu je možné zachovat v databázi. Tuto strategii však nepoužívejte, pokud by další zátěž databáze způsobovala značné kolize.
Příklad
Abyste poskytli infrastrukturu potřebnou k implementaci kanálu, můžete použít sekvenci front zpráv. Úvodní fronta zpráv přijme nezpracované zprávy. Součást implementovaná jako úloha filtru čeká na zprávu pro tuto frontu, provede svou práci a potom odešle transformovanou zprávu do dalších fronty v pořadí. Další úloha filtru může čekat na zprávy pro tuto frontu, zpracovat je, odeslat výsledky do další fronty a tak dále, dokud se plně transformovaná data neobjeví v poslední zprávě fronty. Následující obrázek znázorňuje implementaci kanálu pomocí fronty zpráv.

Pokud vytváříte řešení na platformě Azure, můžete použít fronty Service Bus, které poskytují spolehlivý a škálovatelný mechanismus řízení front. Níže uvedená třída ServiceBusPipeFilter v jazyce C# znázorňuje, jak můžete implementovat filtr, který přijímá vstupní zprávy z fronty, zpracovává je a výsledky odesílá do jiné fronty.
Třída
ServiceBusPipeFilterje definovaná v projektu PipesAndFilters.Shared, který je k dispozici na GitHubu.
public class ServiceBusPipeFilter
{
...
private readonly string inQueuePath;
private readonly string outQueuePath;
...
private QueueClient inQueue;
private QueueClient outQueue;
...
public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
{
...
this.inQueuePath = inQueuePath;
this.outQueuePath = outQueuePath;
}
public void Start()
{
...
// Create the outbound filter queue if it doesn't exist.
...
this.outQueue = QueueClient.CreateFromConnectionString(...);
...
// Create the inbound and outbound queue clients.
this.inQueue = QueueClient.CreateFromConnectionString(...);
}
public void OnPipeFilterMessageAsync(
Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
{
...
this.inQueue.OnMessageAsync(
async (msg) =>
{
...
// Process the filter and send the output to the
// next queue in the pipeline.
var outMessage = await asyncFilterTask(msg);
// Send the message from the filter processor
// to the next queue in the pipeline.
if (outQueue != null)
{
await outQueue.SendAsync(outMessage);
}
// Note: There's a chance that the same message could be sent twice
// or that a message gets processed by an upstream or downstream
// filter at the same time.
// This would happen in a situation where processing of a message was
// completed, it was sent to the next pipe/queue, and then failed
// to complete when using the PeekLock method.
// Idempotent message processing and concurrency should be considered
// in a real-world implementation.
},
options);
}
public async Task Close(TimeSpan timespan)
{
// Pause the processing threads.
this.pauseProcessingEvent.Reset();
// There's no clean approach for waiting for the threads to complete
// the processing. This example simply stops any new processing, waits
// for the existing thread to complete, then closes the message pump
// and finally returns.
Thread.Sleep(timespan);
this.inQueue.Close();
...
}
...
}
Metoda Start ve třídě ServiceBusPipeFilter provádí připojení k páru vstupních a výstupních front a metoda Close provádí odpojení ze vstupní fronty. Metoda OnPipeFilterMessageAsync provádí skutečné zpracování zpráv a parametr asyncFilterTask této metody určuje, jaké zpracování proběhne. Metoda OnPipeFilterMessageAsync čeká na zprávy přicházející do vstupní fronty, při přijetí každé zprávy spouští kód určený parametrem asyncFilterTask a výsledky odesílá do výstupní fronty. Samotné fronty jsou určené konstruktorem.
Ukázkové řešení implementuje filtry do sady rolí pracovních procesů. Každou roli pracovního procesu je možné nezávisle škálovat podle složitosti obchodního zpracování, které provádí, nebo podle prostředků potřebných ke zpracování. Kromě toho je možné spustit více instancí každé role pracovního procesu, aby se zlepšila propustnost.
Následující kód ukazuje roli pracovního procesu Azure pojmenovanou PipeFilterARoleEntry, která je definovaná v projektu PipeFilterA v ukázkovém řešení.
public class PipeFilterARoleEntry : RoleEntryPoint
{
...
private ServiceBusPipeFilter pipeFilterA;
public override bool OnStart()
{
...
this.pipeFilterA = new ServiceBusPipeFilter(
...,
Constants.QueueAPath,
Constants.QueueBPath);
this.pipeFilterA.Start();
...
}
public override void Run()
{
this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
{
// Clone the message and update it.
// Properties set by the broker (Deliver count, enqueue time, ...)
// aren't cloned and must be copied over if required.
var newMsg = msg.Clone();
await Task.Delay(500); // DOING WORK
Trace.TraceInformation("Filter A processed message:{0} at {1}",
msg.MessageId, DateTime.UtcNow);
newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");
return newMsg;
});
...
}
...
}
Tato role obsahuje objekt ServiceBusPipeFilter. Metoda OnStart v této roli se připojí k frontám pro přijímání vstupních a odesílání výstupních zpráv (názvy front určuje třída Constants). Metoda Run vyvolá metodu OnPipeFilterMessagesAsync, aby provedla nějaké zpracování u každé přijaté zprávy (v tomto příkladu je zpracování simulováno krátkým čekáním). Po dokončení zpracování se vytvoří nová zpráva s výsledky (v tomto případě se ke vstupní zprávě přidala vlastní vlastnost) a tato zpráva se odešle do výstupní fronty.
Vzorový kód obsahuje další roli pracovního procesu, která má v projektu PipeFilterB název PipeFilterBRoleEntry. Tato role je podobná roli PipeFilterARoleEntry s tím rozdílem, že v metodě Run provádí jiné zpracování. V ukázkovém řešení jsou tyto dvě role zkombinované, aby vytvořily kanál; výstupní fronta pro roli PipeFilterARoleEntry je vstupní frontou pro roli PipeFilterBRoleEntry.
Ukázkové řešení také poskytuje dvě další role s názvem InitialSenderRoleEntry (v projektu InitialSender) a FinalReceiverRoleEntry (v projektu FinalReceiver). Role InitialSenderRoleEntry poskytuje původní zprávu v kanálu. Metoda OnStart se připojí k jedné frontě a metoda Run odešle do této fronty nějakou metodu. Tato fronta je vstupní frontou, kterou používá role PipeFilterARoleEntry, takže když do ní odešlete zprávu, bude tato zpráva přijatá a zpracovaná rolí PipeFilterARoleEntry. Zpracovaná zpráva potom projde přes roli PipeFilterBRoleEntry.
Vstupní fronta pro roli FinalReceiveRoleEntry je výstupní frontou pro roli PipeFilterBRoleEntry. Metoda Run v roli FinalReceiveRoleEntry níže tuto zprávu přijme a provede konečné zpracování. Potom aby trasovala výstup, zapíše hodnoty vlastních vlastností přidaných filtry v kanálu.
public class FinalReceiverRoleEntry : RoleEntryPoint
{
...
// Final queue/pipe in the pipeline to process data from.
private ServiceBusPipeFilter queueFinal;
public override bool OnStart()
{
...
// Set up the queue.
this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
this.queueFinal.Start();
...
}
public override void Run()
{
this.queueFinal.OnPipeFilterMessageAsync(
async (msg) =>
{
await Task.Delay(500); // DOING WORK
// The pipeline message was received.
Trace.TraceInformation(
"Pipeline Message Complete - FilterA:{0} FilterB:{1}",
msg.Properties[Constants.FilterAMessageKey],
msg.Properties[Constants.FilterBMessageKey]);
return null;
});
...
}
...
}
Další kroky
Při implementaci tohoto modelu můžou být relevantní také následující pokyny:
- Ukázka, která tento model předvádí, je k dispozici na GitHubu.
Související informace
Při implementaci tohoto modelu můžou být relevantní taky tyto vzory:
- Model konkurenčních spotřebitelů: Kanál může obsahovat několik instancí jednoho nebo více filtrů. Tento přístup je užitečný ke spouštění paralelních instancí pomalých filtrů, protože umožňují systému rozložit zátěž a zlepšit propustnost. Každá instance filtru bude soutěžit o vstup s ostatními instancemi, nemělo by však dojít ke zpracování stejných dat dvěma instancemi filtru. Poskytuje vysvětlení tohoto přístupu.
- Model výpočtu konsolidace prostředků: Filtry, které by se měly škálovat společně do stejného procesu, je možné seskupovat. Poskytuje další informace o výhodách a nevýhodách této strategie.
- Vzor kompenzační transakce. Filtr je možné implementovat jako operaci, kterou je možné vrátit zpět nebo která má kompenzační operace, jež v případě selhání obnoví stav do předchozí verze. Vysvětluje, jak je možné takovou implementaci provést, aby se konečná konzistence zachovala nebo aby se jí dosáhlo.
- Idempotence vzory na blogu Jonathana Olivera.