Kanallar ve Filtreler düzeniPipes and Filters pattern

Karmaşık bir işlem gerçekleştiren bir görevi yeniden kullanılabilecek bir dizi ayrı öğe şeklinde parçalara ayırın.Decompose a task that performs complex processing into a series of separate elements that can be reused. Bu sayede, işlemleri gerçekleştiren görev öğeleri bağımsız olarak dağıtılabileceği ve ölçeklendirilebileceği için performans, ölçeklenebilirlik ve yeniden kullanılabilirlik iyileştirebilir.This can improve performance, scalability, and reusability by allowing task elements that perform the processing to be deployed and scaled independently.

Bağlam ve sorunContext and problem

Uygulamaların, işledikleri bilgiler üzerinde farklı karmaşıklık düzeyinde çeşitli görevler gerçekleştirmesi gerekir.An application is required to perform a variety of tasks of varying complexity on the information that it processes. Bir uygulamayı kullanıma geçirmeye yönelik basit ancak esnek olmayan bir yaklaşım, işleme faaliyetini tek parça bir modül olarak gerçekleştirmektir.A straightforward but inflexible approach to implementing an application is to perform this processing as a monolithic module. Ancak, aynı işlemin parçaları uygulama içinde başka bir yerde gerekliyse bu yaklaşım kodu yeniden düzenlemeye, iyileştirmeye veya yeniden kullanmaya yönelik fırsatları azaltabilir.However, this approach is likely to reduce the opportunities for refactoring the code, optimizing it, or reusing it if parts of the same processing are required elsewhere within the application.

Aşağıdaki şekilde, tek parçalı bir yaklaşım kullanarak veri işlemeden doğan sorunlar görülebilir.The figure illustrates the issues with processing data using the monolithic approach. Bir uygulama iki farklı kaynaktan veri alıp işlemektedir.An application receives and processes data from two sources. Her bir kaynaktan gelen veriler, ayrı bir modül tarafından işlenir. Modül, gerçekleştirdiği bir dizi görevle bu verileri dönüştürerek elde edilen sonucu uygulamanın iş mantığına geçirir.The data from each source is processed by a separate module that performs a series of tasks to transform this data, before passing the result to the business logic of the application.

Şekil 1 - Tek parçalı modüller kullanılarak uygulanan bir çözüm

Tek parçalı modüllerin gerçekleştirdiği görevlerin bazıları birbirine çok benzeyen işlevleri yerine getirir, ancak modüller ayrı olarak tasarlanmıştır.Some of the tasks that the monolithic modules perform are functionally very similar, but the modules have been designed separately. Görevleri uygulayan kod, modüle sıkı bir şekilde bağlıdır. Kod geliştirilirken yeniden kullanılabilirlik ve ölçeklenebilirlik konuları pek dikkate alınmamıştır.The code that implements the tasks is closely coupled in a module, and has been developed with little or no thought given to reuse or scalability.

Ancak, iş gereksinimleri güncelleştirildikçe her modülün gerçekleştirdiği işleme görevleri ya da her bir göreve ilişkin dağıtım gereksinimleri de değişebilir.However, the processing tasks performed by each module, or the deployment requirements for each task, could change as business requirements are updated. Bazı görevlerin işlem gücü kullanımı yoğun olabilir ve güçlü bir donanımda çalışması gerekebilir, bazı görevler için de böyle pahalı kaynaklara gerek olmayabilir.Some tasks might be compute intensive and could benefit from running on powerful hardware, while others might not require such expensive resources. Ayrıca, gelecekte ek işlemler yapılması gerekebilir veya işleme esnasında gerçekleştirilen görevlerin sırası değişebilir.Also, additional processing might be required in the future, or the order in which the tasks performed by the processing could change. Bu sorunları gideren ve kodun yeniden kullanılma olasılığını artıran bir çözüm gereklidir.A solution is required that addresses these issues, and increases the possibilities for code reuse.

ÇözümSolution

Her akış için gereken işleme faaliyetini her biri tek bir görevi gerçekleştiren bir dizi ayrı bileşene (yani filtreler) ayırın.Break down the processing required for each stream into a set of separate components (or filters), each performing a single task. Her bir bileşenin aldığı ve gönderdiği verilerin biçimi standart hale getirilirse bu filtreler birleştirilerek bir işlem hattı oluşturulabilir.By standardizing the format of the data that each component receives and sends, these filters can be combined together into a pipeline. Böylece aynı kodun yeniden oluşturulması önlenebilir ve işleme gereksinimlerinin değişmesi halinde bileşenlerin eklenmesi, kaldırılması veya yeni bileşenlerin tümleştirilmesi kolaylaşır.This helps to avoid duplicating code, and makes it easy to remove, replace, or integrate additional components if the processing requirements change. Aşağıdaki şekilde kanallar ve filtreler kullanılarak uygulanan bir çözüm gösterilmektedir.The next figure shows a solution implemented using pipes and filters.

Şekil 2 - Kanallar ve filtreler kullanılarak uygulanan bir çözüm

Tek bir isteğin işlenmesi için gereken süre, işlem hattındaki en yavaş filtrenin hızına bağlıdır.The time it takes to process a single request depends on the speed of the slowest filter in the pipeline. Bir veya daha fazla filtre bir performans sorunu oluşturabilir. Bu durum, özellikle bir akışta belirli bir veri kaynağından çok sayıda istek olduğunda ortaya çıkabilir.One or more filters could be a bottleneck, especially if a large number of requests appear in a stream from a particular data source. İşlem hattı yapısının sağladığı önemli avantajlardan biri, yavaş filtrelerin paralel örneklerinin çalıştırılmasına fırsat vermesidir. Sistem bu sayede yükü dağıtabilir ve işleme miktarını artırabilir.A key advantage of the pipeline structure is that it provides opportunities for running parallel instances of slow filters, enabling the system to spread the load and improve throughput.

Bir işlem hattını oluşturan filtreler farklı makinelerde çalıştırılabilir. Böylece bu filtreler bağımsız olarak ölçeklendirilebilir ve birçok bulut ortamının sağladığı esneklikten yararlanabilir.The filters that make up a pipeline can run on different machines, enabling them to be scaled independently and take advantage of the elasticity that many cloud environments provide. İşlem gücü kullanımı yoğun olan bir filtre yüksek performanslı bir donanımda çalıştırılırken daha sınırlı kaynak gerektiren filtreler daha ucuz ticari donanımlarda barındırılabilir.A filter that is computationally intensive can run on high performance hardware, while other less demanding filters can be hosted on less expensive commodity hardware. Filtrelerin aynı veri merkezinde veya coğrafi konumda olmasına bile gerek yoktur. Bu sayede, işlem hattının her öğesi gerek duyduğu kaynaklara yakın bir ortamda çalıştırılabilir.The filters don't even have to be in the same data center or geographical location, which allows each element in a pipeline to run in an environment that is close to the resources it requires. Sıradaki şekilde, bahsedilenlerin Kaynak 1’den gelen verilere yönelik işlem hattına uygulanmasının bir örneği gösterilmektedir.The next figure shows an example applied to the pipeline for the data from Source 1.

Şekil 3, bahsedilenlerin Kaynak 1’den gelen verilere yönelik işlem hattına uygulanmasının bir örneğini göstermektedir

Filtreye ait giriş ve çıkış bir akış olarak yapılandırılmışsa her bir filtrenin gerçekleştirdiği işlemeyi paralel olarak yürütmek mümkündür.If the input and output of a filter are structured as a stream, it's possible to perform the processing for each filter in parallel. İşlem hattındaki ilk filtre çalışmaya ve sonuçları çıktı olarak vermeye başlayabilir. Bu çıktı, daha ilk filtre çalışmasını tamamlamadan önce doğrudan sıradaki filtreye iletilir.The first filter in the pipeline can start its work and output its results, which are passed directly on to the next filter in the sequence before the first filter has completed its work.

Elde edilen bir diğer avantaj da bu modelin sağlayabileceği dayanıklılıktır.Another benefit is the resiliency that this model can provide. Bir filtre başarısız olursa veya üzerinde çalıştırıldığı makine kullanılamaz hale gelirse, işlem hattı filtrenin yaptığı işi yeniden zamanlayabilir ve bu işi bileşenin başka bir örneğine yönlendirebilir.If a filter fails or the machine it's running on is no longer available, the pipeline can reschedule the work that the filter was performing and direct this work to another instance of the component. Tek bir filtrede yaşanan sorun tüm işlem hattının başarısız olmasına neden olmaz.Failure of a single filter doesn't necessarily result in failure of the entire pipeline.

Kanallar ve Filtreler düzeninin Telafi İşlemi düzeni ile birlikte kullanılması, dağıtılmış işlemlerin uygulanmasına yönelik alternatif bir yaklaşımdır.Using the Pipes and Filters pattern in conjunction with the Compensating Transaction pattern is an alternative approach to implementing distributed transactions. Dağıtılmış işlem ayrı, telafi edilebilir görevlere bölünebilir. Bu görevlerin her biri, Telafi İşlemi düzenini de uygulayan bir filtre kullanılarak uygulanabilir.A distributed transaction can be broken down into separate, compensable tasks, each of which can be implemented by using a filter that also implements the Compensating Transaction pattern. İşlem hattındaki filtreler, kullandıkları verilerin yakınında çalıştırılan, birbirinden ayrı barındırılan görevler şeklinde uygulanabilir.The filters in a pipeline can be implemented as separate hosted tasks running close to the data that they maintain.

Sorunlar ve dikkat edilmesi gerekenlerIssues and considerations

Bu düzeni nasıl uygulayacağınıza karar verirken aşağıdaki noktaları dikkate almalısınız:You should consider the following points when deciding how to implement this pattern:

  • Karmaşıklık.Complexity. Bu düzen esnekliği artırırken, özellikle işlem hattındaki filtreler farklı sunuculara dağıtılmışsa karmaşıklığın ortaya çıkmasına da neden olabilir.The increased flexibility that this pattern provides can also introduce complexity, especially if the filters in a pipeline are distributed across different servers.

  • Güvenilirlik.Reliability. İşlem hattındaki filtreler arasında akan verilerin kaybolmamasını sağlayacak bir altyapı kullanın.Use an infrastructure that ensures that data flowing between filters in a pipeline won't be lost.

  • Tek Sefer Etkili Olma.Idempotency. İşlem hattındaki filtrelerden biri, bir iletiyi aldıktan sonra başarısız olursa ve iş filtrenin başka bir örneği için yeniden zamanlanırsa işin bir kısmı zaten tamamlanmış olabilir.If a filter in a pipeline fails after receiving a message and the work is rescheduled to another instance of the filter, part of the work might have already been completed. Bu iş, genel durumun bir unsurunu (örneğin, bir veritabanında depolanan bilgiler) güncelleştiriyorsa aynı güncelleştirme yinelenebilir.If this work updates some aspect of the global state (such as information stored in a database), the same update could be repeated. Bir filtre sonuçlarını işlem hattında sıradaki filtreye gönderdikten sonra, ancak işinin başarıyla tamamlandığını belirtmeden önce başarısız olursa yine benzer bir sorun ortaya çıkabilir.A similar issue might occur if a filter fails after posting its results to the next filter in the pipeline, but before indicating that it's completed its work successfully. Böyle durumlarda, aynı iş filtrenin başka bir örneği tarafından yinelenebilir ve bu nedenle aynı sonuçlar iki kez gönderilebilir.In these cases, the same work could be repeated by another instance of the filter, causing the same results to be posted twice. Bu da işlem hattında sonraki filtrelerin aynı verileri iki kez işlemesine yol açabilir.This could result in subsequent filters in the pipeline processing the same data twice. Bu nedenle, işlem hattındaki filtreler tek sefer etkili olacak şekilde tasarlanmalıdır.Therefore filters in a pipeline should be designed to be idempotent. Daha fazla bilgi için Jonathan Oliver’ın blogunda yer alan Tek Sefer Etkili Olma Düzenleri başlıklı makaleye bakın.For more information see Idempotency Patterns on Jonathan Oliver’s blog.

  • Yinelenen iletiler.Repeated messages. İşlem hattındaki bir filtre işlem hattının sonraki aşamasına bir ileti gönderdikten sonra başarısız olursa filtrenin başka bir örneği çalıştırılabilir ve bu örnek de işlem hattına aynı iletinin bir kopyasını gönderecektir.If a filter in a pipeline fails after posting a message to the next stage of the pipeline, another instance of the filter might be run, and it'll post a copy of the same message to the pipeline. Bunun sonucunda, sıradaki filtreye aynı iletinin iki örneği geçirilebilir.This could cause two instances of the same message to be passed to the next filter. Böyle bir durumu önlemek için işlem hattının yinelenen iletileri algılayıp ortadan kaldırması gerekir.To avoid this, the pipeline should detect and eliminate duplicate messages.

    İşlem hattını ileti kuyrukları (Microsoft Azure Service Bus kuyrukları gibi) kullanarak uyguluyorsanız ileti sıraya alma altyapısı, yinelenen iletilerin otomatik olarak algılanıp kaldırılması hizmetini sağlayabilir.If you're implementing the pipeline by using message queues (such as Microsoft Azure Service Bus queues), the message queuing infrastructure might provide automatic duplicate message detection and removal.

  • Bağlam ve durum.Context and state. İşlem hattındaki her filtre temel olarak yalıtılmış bir şekilde çalışır ve nasıl çağrıldığı hakkında varsayımlarda bulunmamalıdır.In a pipeline, each filter essentially runs in isolation and shouldn't make any assumptions about how it was invoked. Bu nedenle, her bir filtreye işini yapması için yeterli bağlam sağlanmalıdır.This means that each filter should be provided with sufficient context to perform its work. Söz konusu bağlam, yüksek miktarda durum bilgisi içerebilir.This context could include a large amount of state information.

Bu düzenin kullanılacağı durumlarWhen to use this pattern

Bu düzeni aşağıdaki durumlarda kullanın:Use this pattern when:

  • Uygulama için gereken işlem kolayca bir dizi bağımsız adım şeklinde ayrılabiliyor.The processing required by an application can easily be broken down into a set of independent steps.

  • Uygulama tarafından gerçekleştirilen işleme adımlarının farklı ölçeklenebilirlik gereksinimleri var.The processing steps performed by an application have different scalability requirements.

    Birlikte ölçeklendirilmesi gereken filtreler aynı işlemde bir araya getirilebilir.It's possible to group filters that should scale together in the same process. Daha fazla bilgi için bkz. İşlem Kaynağı Birleştirme düzeni.For more information, see the Compute Resource Consolidation pattern.

  • Uygulama tarafından gerçekleştirilen işleme adımlarının yeniden sıralanabilmesi veya adım eklenip çıkarılabilmesi için esneklik gerekiyor.Flexibility is required to allow reordering of the processing steps performed by an application, or the capability to add and remove steps.

  • Adımların işlenmesinin farklı sunuculara dağıtılması sisteme yarar sağlayacak.The system can benefit from distributing the processing for steps across different servers.

  • Veriler işlenirken bir adımda yaşanabilecek bir başarısızlığın etkisini en aza indiren güvenilir bir çözüm gerekiyor.A reliable solution is required that minimizes the effects of failure in a step while data is being processed.

Bu düzen aşağıdaki durumlarda kullanışlı olmayabilir:This pattern might not be useful when:

  • Uygulama tarafından gerçekleştirilen işleme adımları birbirinden bağımsız değil veya aynı işlemin bir parçası olarak birlikte gerçekleştirilmeleri gerekiyor.The processing steps performed by an application aren't independent, or they have to be performed together as part of the same transaction.

  • Bir adım için gerekli olan bağlam ya da durum bilgisi miktarı bu yaklaşımı verimsiz kılacak kadar yüksek.The amount of context or state information required by a step makes this approach inefficient. Bu yaklaşımın yerine durum bilgilerini bir veritabanında kalıcı hale getirmek mümkün olabilir. Ancak, veritabanında oluşan ek yük çok fazla çekişmeye neden oluyorsa bu stratejiyi kullanmayın.It might be possible to persist state information to a database instead, but don't use this strategy if the additional load on the database causes excessive contention.

ÖrnekExample

İşlem hattının uygulanmasında gereken altyapıyı sağlamak için bir dizi ileti kuyruğu kullanabilirsiniz.You can use a sequence of message queues to provide the infrastructure required to implement a pipeline. İlk ileti kuyruğu, işlenmemiş iletiler alır.An initial message queue receives unprocessed messages. Filtre görevi olarak uygulanan bir bileşen, bu kuyruktaki iletileri dinler, kendi işini yapar ve ardından dönüştürülmüş iletiyi sırada yer alan bir sonraki kuyruğa gönderir.A component implemented as a filter task listens for a message on this queue, performs its work, and then posts the transformed message to the next queue in the sequence. Başka bir filtre görevi bu kuyruktaki iletileri dinler, bu iletileri işler ve sonuçları başka bir kuyruğa gönderir. Tamamen dönüştürülmüş veriler kuyruktaki son iletide görünene kadar süreç bu şekilde devam eder.Another filter task can listen for messages on this queue, process them, post the results to another queue, and so on until the fully transformed data appears in the final message in the queue. Aşağıdaki şekilde, ileti kuyrukları kullanılarak bir işlem hattının uygulanması gösterilmektedir.The next figure illustrates implementing a pipeline using message queues.

Şekil 4 - İleti kuyrukları kullanarak bir işlem hattı uygulama

Azure’da bir çözüm oluşturuyorsanız Service Bus kuyruklarını kullanarak güvenilir ve ölçeklenebilir bir sıraya alma mekanizması elde edebilirsiniz.If you're building a solution on Azure you can use Service Bus queues to provide a reliable and scalable queuing mechanism. Aşağıda C# dilinde sunulan ServiceBusPipeFilter sınıfı, bir kuyruktan giriş iletileri alan, bu iletileri işleyen ve sonuçları başka bir kuyruğa gönderen bir filtrenin nasıl uygulanacağını göstermektedir.The ServiceBusPipeFilter class shown below in C# demonstrates how you can implement a filter that receives input messages from a queue, processes these messages, and posts the results to another queue.

ServiceBusPipeFilter sınıfı, GitHub’dan edinebileceğiniz PipesAndFilters.Shared projesinde tanımlanmıştır.The ServiceBusPipeFilter class is defined in the PipesAndFilters.Shared project available from GitHub.

public class ServiceBusPipeFilter
{
  ...
  private readonly string inQueuePath;
  private readonly string outQueuePath;
  ...
  private QueueClient inQueue;
  private QueueClient outQueue;
  ...

  public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
  {
     ...
     this.inQueuePath = inQueuePath;
     this.outQueuePath = outQueuePath;
  }

  public void Start()
  {
    ...
    // Create the outbound filter queue if it doesn't exist.
    ...
    this.outQueue = QueueClient.CreateFromConnectionString(...);

    ...
    // Create the inbound and outbound queue clients.
    this.inQueue = QueueClient.CreateFromConnectionString(...);
  }

  public void OnPipeFilterMessageAsync(
    Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
  {
    ...

    this.inQueue.OnMessageAsync(
      async (msg) =>
    {
      ...
      // Process the filter and send the output to the
      // next queue in the pipeline.
      var outMessage = await asyncFilterTask(msg);

      // Send the message from the filter processor
      // to the next queue in the pipeline.
      if (outQueue != null)
      {
        await outQueue.SendAsync(outMessage);
      }

      // Note: There's a chance that the same message could be sent twice
      // or that a message gets processed by an upstream or downstream
      // filter at the same time.
      // This would happen in a situation where processing of a message was
      // completed, it was sent to the next pipe/queue, and then failed
      // to complete when using the PeekLock method.
      // Idempotent message processing and concurrency should be considered
      // in a real-world implementation.
    },
    options);
  }

  public async Task Close(TimeSpan timespan)
  {
    // Pause the processing threads.
    this.pauseProcessingEvent.Reset();

    // There's no clean approach for waiting for the threads to complete
    // the processing. This example simply stops any new processing, waits
    // for the existing thread to complete, then closes the message pump
    // and finally returns.
    Thread.Sleep(timespan);

    this.inQueue.Close();
    ...
  }

  ...
}

ServiceBusPipeFilter sınıfındaki Start yöntemi, bir giriş ve bir çıkış kuyruğuna bağlanır ve Close yöntemi giriş kuyruğuyla bağlantıyı keser.The Start method in the ServiceBusPipeFilter class connects to a pair of input and output queues, and the Close method disconnects from the input queue. OnPipeFilterMessageAsync yöntemi iletilerin işlenmesini gerçekleştirir. Bu yöntemin asyncFilterTask parametresi hangi işlemenin gerçekleştirileceğini belirtir.The OnPipeFilterMessageAsync method performs the actual processing of messages, the asyncFilterTask parameter to this method specifies the processing to be performed. OnPipeFilterMessageAsync yöntemi giriş kuyruğundan ileti gelmesini bekler, gelen her ileti üzerinde asyncFilterTask parametresi tarafından belirtilen kodu çalıştırır ve sonuçları çıkış kuyruğuna gönderir.The OnPipeFilterMessageAsync method waits for incoming messages on the input queue, runs the code specified by the asyncFilterTask parameter over each message as it arrives, and posts the results to the output queue. Kuyrukların kendileri, oluşturucu tarafından belirtilir.The queues themselves are specified by the constructor.

Örnek çözüm, filtreleri bir grup çalışan rolünde uygular.The sample solution implements filters in a set of worker roles. Her çalışan rolü, gerçekleştirdiği iş işleminin karmaşıklığına veya işlem için gereken kaynaklara bağlı bir şekilde bağımsız olarak ölçeklendirilebilir.Each worker role can be scaled independently, depending on the complexity of the business processing that it performs or the resources required for processing. Ayrıca, her çalışan rolünün birden çok örneği paralel biçimde çalıştırılarak işleme miktarı artırılabilir.Additionally, multiple instances of each worker role can be run in parallel to improve throughput.

Aşağıdaki kod, örnek çözümdeki PipeFilterA projesinde tanımlanan PipeFilterARoleEntry adlı bir Azure çalışan rolü göstermektedir.The following code shows an Azure worker role named PipeFilterARoleEntry, defined in the PipeFilterA project in the sample solution.

public class PipeFilterARoleEntry : RoleEntryPoint
{
  ...
  private ServiceBusPipeFilter pipeFilterA;

  public override bool OnStart()
  {
    ...
    this.pipeFilterA = new ServiceBusPipeFilter(
      ...,
      Constants.QueueAPath,
      Constants.QueueBPath);

    this.pipeFilterA.Start();
    ...
  }

  public override void Run()
  {
    this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
    {
      // Clone the message and update it.
      // Properties set by the broker (Deliver count, enqueue time, ...)
      // aren't cloned and must be copied over if required.
      var newMsg = msg.Clone();

      await Task.Delay(500); // DOING WORK

      Trace.TraceInformation("Filter A processed message:{0} at {1}",
        msg.MessageId, DateTime.UtcNow);

      newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");

      return newMsg;
    });

    ...
  }

  ...
}

Bu rol, bir ServiceBusPipeFilter nesnesi içerir.This role contains a ServiceBusPipeFilter object. Roldeki OnStart yöntemi, giriş iletileri almaya ve çıkış iletileri göndermeye yönelik kuyruklara (kuyrukların adları Constants sınıfında tanımlanır) bağlanır.The OnStart method in the role connects to the queues for receiving input messages and posting output messages (the names of the queues are defined in the Constants class). Run yöntemi, alınan her ileti üzerinde bir miktar işleme yapmak için OnPipeFilterMessagesAsync yöntemini çağırır (bu örnekte, işlemeye örnek olarak kısa bir süre bekleme işlemi kullanılmıştır).The Run method invokes the OnPipeFilterMessagesAsync method to perform some processing on each message that's received (in this example, the processing is simulated by waiting for a short period of time). İşleme tamamlandığında sonuçları içeren yeni bir ileti oluşturulur (bu örnekte, giriş iletisine özel bir özellik eklenmektedir) ve bu ileti çıkış kuyruğuna gönderilir.When processing is complete, a new message is constructed containing the results (in this case, the input message has a custom property added), and this message is posted to the output queue.

Örnek kod, PipeFilterB projesinde PipeFilterBRoleEntry adlı başka bir çalışan rolü içermektedir.The sample code contains another worker role named PipeFilterBRoleEntry in the PipeFilterB project. Bu rol, PipeFilterARoleEntry rolüne benzer ancak Run yönteminde farklı bir işlem gerçekleştirir.This role is similar to PipeFilterARoleEntry except that it performs different processing in the Run method. Örnek çözümde bu iki rol birleştirilerek bir işlem hattı oluşturulmaktadır. PipeFilterARoleEntry rolünün çıkış kuyruğu, PipeFilterBRoleEntry rolünün giriş kuyruğudur.In the example solution, these two roles are combined to construct a pipeline, the output queue for the PipeFilterARoleEntry role is the input queue for the PipeFilterBRoleEntry role.

Örnek çözüm, ayrıca InitialSenderRoleEntry (InitialSender projesinde) ve FinalReceiverRoleEntry (FinalReceiver projesinde) adlı iki ek rol sağlamaktadır.The sample solution also provides two additional roles named InitialSenderRoleEntry (in the InitialSender project) and FinalReceiverRoleEntry (in the FinalReceiver project). InitialSenderRoleEntry rolü işlem hattındaki ilk iletiyi sağlar.The InitialSenderRoleEntry role provides the initial message in the pipeline. OnStart yöntemi tek bir kuyruğa bağlanır ve Run yöntemi bu kuyruğa bir ileti gönderir.The OnStart method connects to a single queue and the Run method posts a method to this queue. Bu kuyruk, PipeFilterARoleEntry rolü tarafından giriş kuyruğu olarak kullanılır. Bu nedenle, bu kuyruğa bir ileti gönderildiğinde söz konusu ileti PipeFilterARoleEntry rolü tarafından alınıp işlenir.This queue is the input queue used by the PipeFilterARoleEntry role, so posting a message to it causes the message to be received and processed by the PipeFilterARoleEntry role. İşlenen ileti daha sonra PipeFilterBRoleEntry rolünden geçer.The processed message then passes through the PipeFilterBRoleEntry role.

FinalReceiveRoleEntry rolünün giriş kuyruğu, PipeFilterBRoleEntry rolünün çıkış kuyruğudur.The input queue for the FinalReceiveRoleEntry role is the output queue for the PipeFilterBRoleEntry role. Aşağıda gösterilen FinalReceiveRoleEntry rolündeki Run yöntemi iletiyi alır ve son işlemleri gerçekleştirir.The Run method in the FinalReceiveRoleEntry role, shown below, receives the message and performs some final processing. Ardından, işlem hattındaki filtreler tarafından eklenen özel özelliklerin değerlerini izleme çıktısına yazar.Then it writes the values of the custom properties added by the filters in the pipeline to the trace output.

public class FinalReceiverRoleEntry : RoleEntryPoint
{
  ...
  // Final queue/pipe in the pipeline to process data from.
  private ServiceBusPipeFilter queueFinal;

  public override bool OnStart()
  {
    ...
    // Set up the queue.
    this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
    this.queueFinal.Start();
    ...
  }

  public override void Run()
  {
    this.queueFinal.OnPipeFilterMessageAsync(
      async (msg) =>
      {
        await Task.Delay(500); // DOING WORK

        // The pipeline message was received.
        Trace.TraceInformation(
          "Pipeline Message Complete - FilterA:{0} FilterB:{1}",
          msg.Properties[Constants.FilterAMessageKey],
          msg.Properties[Constants.FilterBMessageKey]);

        return null;
      });
    ...
  }

  ...
}

Bu düzen uygulanırken aşağıdaki düzenler ve yönergeler de yararlı olabilir:The following patterns and guidance might also be relevant when implementing this pattern:

  • Bu düzeni gösteren bir örnek GitHub’dan edinilebilir.A sample that demonstrates this pattern is available on GitHub.
  • Rakip Tüketiciler düzeni.Competing Consumers pattern. Bir işlem hattı, bir veya daha fazla filtrenin birden fazla örneğini içerebilir.A pipeline can contain multiple instances of one or more filters. Bu yaklaşım, yavaş filtrelerin paralel örneklerini çalıştırmak için kullanışlıdır. Sistem bu sayede yükü dağıtabilir ve işleme miktarını artırabilir.This approach is useful for running parallel instances of slow filters, enabling the system to spread the load and improve throughput. Filtrenin her örneği, girişler için diğer örneklerle rekabete girer. Bir filtrenin iki örneği aynı verileri işleyememelidir.Each instance of a filter will compete for input with the other instances, two instances of a filter shouldn't be able to process the same data. Bu yaklaşımın bir açıklaması sağlanmaktadır.Provides an explanation of this approach.
  • İşlem Kaynağı Birleştirme düzeni.Compute Resource Consolidation pattern. Birlikte ölçeklendirilmesi gereken filtrelerin aynı işlemde bir araya getirilmesi mümkün olabilir.It might be possible to group filters that should scale together into the same process. Bu stratejinin avantajları ve dezavantajları hakkında daha fazla bilgi sağlamaktadır.Provides more information about the benefits and tradeoffs of this strategy.
  • Telafi İşlemi düzeni.Compensating Transaction pattern. Bir filtre, ters çevrilebilen bir işlem ya da bir arıza olması halinde daha önceki bir durumu geri yükleyen bir telafi işlemine sahip olan bir işlem olarak uygulanabilir.A filter can be implemented as an operation that can be reversed, or that has a compensating operation that restores the state to a previous version in the event of a failure. Nihai tutarlılığı elde etmek veya sürdürmek için bu uygulamanın nasıl gerçekleştirileceği açıklanmaktadır.Explains how this can be implemented to maintain or achieve eventual consistency.
  • Tek Sefer Etkili Olma Düzenleri. Jonathan Oliver’ın blogundan bir makaledir.Idempotency Patterns on Jonathan Oliver’s blog.