Kanallar ve Filtreler düzeni

Karmaşık bir işlem gerçekleştiren bir görevi yeniden kullanılabilecek bir dizi ayrı öğe şeklinde parçalara ayırın. Bu sayede, işlemleri gerçekleştiren görev öğeleri bağımsız olarak dağıtılabileceği ve ölçeklendirilebileceği için performans, ölçeklenebilirlik ve yeniden kullanılabilirlik iyileştirebilir.

Bağlam ve sorun

Uygulamaların, işledikleri bilgiler üzerinde farklı karmaşıklık düzeyinde çeşitli görevler gerçekleştirmesi gerekir. Bir uygulamayı kullanıma geçirmeye yönelik basit ancak esnek olmayan bir yaklaşım, işleme faaliyetini tek parça bir modül olarak gerçekleştirmektir. Ancak, aynı işlemin parçaları uygulama içinde başka bir yerde gerekliyse bu yaklaşım kodu yeniden düzenlemeye, iyileştirmeye veya yeniden kullanmaya yönelik fırsatları azaltabilir.

Aşağıdaki şekilde, tek parçalı bir yaklaşım kullanarak veri işlemeden doğan sorunlar görülebilir. Bir uygulama iki farklı kaynaktan veri alıp işlemektedir. Her bir kaynaktan gelen veriler, ayrı bir modül tarafından işlenir. Modül, gerçekleştirdiği bir dizi görevle bu verileri dönüştürerek elde edilen sonucu uygulamanın iş mantığına geçirir.

Şekil 1 - Tek parçalı modüller kullanılarak uygulanan bir çözüm

Tek parçalı modüllerin gerçekleştirdiği görevlerin bazıları birbirine çok benzeyen işlevleri yerine getirir, ancak modüller ayrı olarak tasarlanmıştır. Görevleri uygulayan kod, modüle sıkı bir şekilde bağlıdır. Kod geliştirilirken yeniden kullanılabilirlik ve ölçeklenebilirlik konuları pek dikkate alınmamıştır.

Ancak, iş gereksinimleri güncelleştirildikçe her modülün gerçekleştirdiği işleme görevleri ya da her bir göreve ilişkin dağıtım gereksinimleri de değişebilir. Bazı görevlerin işlem gücü kullanımı yoğun olabilir ve güçlü bir donanımda çalışması gerekebilir, bazı görevler için de böyle pahalı kaynaklara gerek olmayabilir. Ayrıca, gelecekte ek işlemler yapılması gerekebilir veya işleme esnasında gerçekleştirilen görevlerin sırası değişebilir. Bu sorunları gideren ve kodun yeniden kullanılma olasılığını artıran bir çözüm gereklidir.

Çözüm

Her akış için gereken işleme faaliyetini her biri tek bir görevi gerçekleştiren bir dizi ayrı bileşene (yani filtreler) ayırın. Her bir bileşenin aldığı ve gönderdiği verilerin biçimi standart hale getirilirse bu filtreler birleştirilerek bir işlem hattı oluşturulabilir. Böylece aynı kodun yeniden oluşturulması önlenebilir ve işleme gereksinimlerinin değişmesi halinde bileşenlerin eklenmesi, kaldırılması veya yeni bileşenlerin tümleştirilmesi kolaylaşır. Aşağıdaki şekilde kanallar ve filtreler kullanılarak uygulanan bir çözüm gösterilmektedir.

Şekil 2 - Kanallar ve filtreler kullanılarak uygulanan bir çözüm

Tek bir isteğin işlenmesi için gereken süre, işlem hattındaki en yavaş filtrenin hızına bağlıdır. Bir veya daha fazla filtre bir performans sorunu oluşturabilir. Bu durum, özellikle bir akışta belirli bir veri kaynağından çok sayıda istek olduğunda ortaya çıkabilir. İşlem hattı yapısının sağladığı önemli avantajlardan biri, yavaş filtrelerin paralel örneklerinin çalıştırılmasına fırsat vermesidir. Sistem bu sayede yükü dağıtabilir ve işleme miktarını artırabilir.

Bir işlem hattını oluşturan filtreler farklı makinelerde çalıştırılabilir. Böylece bu filtreler bağımsız olarak ölçeklendirilebilir ve birçok bulut ortamının sağladığı esneklikten yararlanabilir. Yoğun performans gerektiren bir filtre yüksek performanslı donanımlar üzerinde çalışabilir, öte bir deyişle daha az maliyetli olan filtreler daha ucuz bir daha ucuz emtia donanımında barındırılabilir. Filtrelerin aynı veri merkezinde veya coğrafi konumda olması gerekmez, bu da bir işlem hattındaki her öğenin gerektirdiği kaynaklara yakın bir ortamda çalışmasına izin verir. Sıradaki şekilde, bahsedilenlerin Kaynak 1’den gelen verilere yönelik işlem hattına uygulanmasının bir örneği gösterilmektedir.

Şekil 3, bahsedilenlerin Kaynak 1’den gelen verilere yönelik işlem hattına uygulanmasının bir örneğini göstermektedir

Filtreye ait giriş ve çıkış bir akış olarak yapılandırılmışsa her bir filtrenin gerçekleştirdiği işlemeyi paralel olarak yürütmek mümkündür. İşlem hattındaki ilk filtre çalışmaya ve sonuçları çıktı olarak vermeye başlayabilir. Bu çıktı, daha ilk filtre çalışmasını tamamlamadan önce doğrudan sıradaki filtreye iletilir.

Elde edilen bir diğer avantaj da bu modelin sağlayabileceği dayanıklılıktır. Bir filtre başarısız olursa veya üzerinde çalıştırıldığı makine kullanılamaz hale gelirse, işlem hattı filtrenin yaptığı işi yeniden zamanlayabilir ve bu işi bileşenin başka bir örneğine yönlendirebilir. Tek bir filtrede yaşanan sorun tüm işlem hattının başarısız olmasına neden olmaz.

Kanallar ve Filtreler düzeninin Telafi İşlemi düzeni ile birlikte kullanılması, dağıtılmış işlemlerin uygulanmasına yönelik alternatif bir yaklaşımdır. Dağıtılmış işlem ayrı, telafi edilebilir görevlere bölünebilir. Bu görevlerin her biri, Telafi İşlemi düzenini de uygulayan bir filtre kullanılarak uygulanabilir. İşlem hattındaki filtreler, kullandıkları verilerin yakınında çalıştırılan, birbirinden ayrı barındırılan görevler şeklinde uygulanabilir.

Sorunlar ve dikkat edilmesi gerekenler

Bu düzeni nasıl uygulayacağınıza karar verirken aşağıdaki noktaları dikkate almalısınız:

  • Karmaşıklık. Bu düzen esnekliği artırırken, özellikle işlem hattındaki filtreler farklı sunuculara dağıtılmışsa karmaşıklığın ortaya çıkmasına da neden olabilir.

  • Güvenilirlik. İşlem hattındaki filtreler arasında akan verilerin kaybolmamasını sağlayacak bir altyapı kullanın.

  • Tek Sefer Etkili Olma. İşlem hattındaki filtrelerden biri, bir iletiyi aldıktan sonra başarısız olursa ve iş filtrenin başka bir örneği için yeniden zamanlanırsa işin bir kısmı zaten tamamlanmış olabilir. Bu iş, genel durumun bir unsurunu (örneğin, bir veritabanında depolanan bilgiler) güncelleştiriyorsa aynı güncelleştirme yinelenebilir. Bir filtre sonuçlarını işlem hattında sıradaki filtreye gönderdikten sonra, ancak işinin başarıyla tamamlandığını belirtmeden önce başarısız olursa yine benzer bir sorun ortaya çıkabilir. Böyle durumlarda, aynı iş filtrenin başka bir örneği tarafından yinelenebilir ve bu nedenle aynı sonuçlar iki kez gönderilebilir. Bu da işlem hattında sonraki filtrelerin aynı verileri iki kez işlemesine yol açabilir. Bu nedenle, işlem hattındaki filtreler tek sefer etkili olacak şekilde tasarlanmalıdır. Daha fazla bilgi için bkz. Jonathan zeytin bloguna göre Pampotlik desenleri .

  • Yinelenen iletiler. İşlem hattındaki bir filtre işlem hattının sonraki aşamasına bir ileti gönderdikten sonra başarısız olursa filtrenin başka bir örneği çalıştırılabilir ve bu örnek de işlem hattına aynı iletinin bir kopyasını gönderecektir. Bunun sonucunda, sıradaki filtreye aynı iletinin iki örneği geçirilebilir. Böyle bir durumu önlemek için işlem hattının yinelenen iletileri algılayıp ortadan kaldırması gerekir.

    İşlem hattını ileti kuyrukları (Microsoft Azure Service Bus kuyrukları gibi) kullanarak uyguluyorsanız ileti sıraya alma altyapısı, yinelenen iletilerin otomatik olarak algılanıp kaldırılması hizmetini sağlayabilir.

  • Bağlam ve durum. İşlem hattındaki her filtre temel olarak yalıtılmış bir şekilde çalışır ve nasıl çağrıldığı hakkında varsayımlarda bulunmamalıdır. Bu nedenle, her bir filtreye işini yapması için yeterli bağlam sağlanmalıdır. Söz konusu bağlam, yüksek miktarda durum bilgisi içerebilir.

Bu düzenin kullanılacağı durumlar

Bu düzeni aşağıdaki durumlarda kullanın:

  • Uygulama için gereken işlem kolayca bir dizi bağımsız adım şeklinde ayrılabiliyor.

  • Uygulama tarafından gerçekleştirilen işleme adımlarının farklı ölçeklenebilirlik gereksinimleri var.

    Birlikte ölçeklendirilmesi gereken filtreler aynı işlemde bir araya getirilebilir. Daha fazla bilgi için bkz. İşlem Kaynağı Birleştirme düzeni.

  • Uygulama tarafından gerçekleştirilen işleme adımlarının yeniden sıralanabilmesi veya adım eklenip çıkarılabilmesi için esneklik gerekiyor.

  • Adımların işlenmesinin farklı sunuculara dağıtılması sisteme yarar sağlayacak.

  • Veriler işlenirken bir adımda yaşanabilecek bir başarısızlığın etkisini en aza indiren güvenilir bir çözüm gerekiyor.

Bu düzen aşağıdaki durumlarda kullanışlı olmayabilir:

  • Uygulama tarafından gerçekleştirilen işleme adımları birbirinden bağımsız değil veya aynı işlemin bir parçası olarak birlikte gerçekleştirilmeleri gerekiyor.

  • Bir adım için gerekli olan bağlam ya da durum bilgisi miktarı bu yaklaşımı verimsiz kılacak kadar yüksek. Bu yaklaşımın yerine durum bilgilerini bir veritabanında kalıcı hale getirmek mümkün olabilir. Ancak, veritabanında oluşan ek yük çok fazla çekişmeye neden oluyorsa bu stratejiyi kullanmayın.

Örnek

İşlem hattının uygulanmasında gereken altyapıyı sağlamak için bir dizi ileti kuyruğu kullanabilirsiniz. İlk ileti kuyruğu, işlenmemiş iletiler alır. Filtre görevi olarak uygulanan bir bileşen, bu kuyruktaki iletileri dinler, kendi işini yapar ve ardından dönüştürülmüş iletiyi sırada yer alan bir sonraki kuyruğa gönderir. Başka bir filtre görevi bu kuyruktaki iletileri dinler, bu iletileri işler ve sonuçları başka bir kuyruğa gönderir. Tamamen dönüştürülmüş veriler kuyruktaki son iletide görünene kadar süreç bu şekilde devam eder. Aşağıdaki şekilde, ileti kuyrukları kullanılarak bir işlem hattının uygulanması gösterilmektedir.

Şekil 4 - İleti kuyrukları kullanarak bir işlem hattı uygulama

Azure’da bir çözüm oluşturuyorsanız Service Bus kuyruklarını kullanarak güvenilir ve ölçeklenebilir bir sıraya alma mekanizması elde edebilirsiniz. Aşağıda C# dilinde sunulan ServiceBusPipeFilter sınıfı, bir kuyruktan giriş iletileri alan, bu iletileri işleyen ve sonuçları başka bir kuyruğa gönderen bir filtrenin nasıl uygulanacağını göstermektedir.

ServiceBusPipeFilter sınıfı, ServiceBusPipeFilter’dan edinebileceğiniz PipesAndFilters.Shared projesinde tanımlanmıştır.

public class ServiceBusPipeFilter
{
  ...
  private readonly string inQueuePath;
  private readonly string outQueuePath;
  ...
  private QueueClient inQueue;
  private QueueClient outQueue;
  ...

  public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
  {
     ...
     this.inQueuePath = inQueuePath;
     this.outQueuePath = outQueuePath;
  }

  public void Start()
  {
    ...
    // Create the outbound filter queue if it doesn't exist.
    ...
    this.outQueue = QueueClient.CreateFromConnectionString(...);

    ...
    // Create the inbound and outbound queue clients.
    this.inQueue = QueueClient.CreateFromConnectionString(...);
  }

  public void OnPipeFilterMessageAsync(
    Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
  {
    ...

    this.inQueue.OnMessageAsync(
      async (msg) =>
    {
      ...
      // Process the filter and send the output to the
      // next queue in the pipeline.
      var outMessage = await asyncFilterTask(msg);

      // Send the message from the filter processor
      // to the next queue in the pipeline.
      if (outQueue != null)
      {
        await outQueue.SendAsync(outMessage);
      }

      // Note: There's a chance that the same message could be sent twice
      // or that a message gets processed by an upstream or downstream
      // filter at the same time.
      // This would happen in a situation where processing of a message was
      // completed, it was sent to the next pipe/queue, and then failed
      // to complete when using the PeekLock method.
      // Idempotent message processing and concurrency should be considered
      // in a real-world implementation.
    },
    options);
  }

  public async Task Close(TimeSpan timespan)
  {
    // Pause the processing threads.
    this.pauseProcessingEvent.Reset();

    // There's no clean approach for waiting for the threads to complete
    // the processing. This example simply stops any new processing, waits
    // for the existing thread to complete, then closes the message pump
    // and finally returns.
    Thread.Sleep(timespan);

    this.inQueue.Close();
    ...
  }

  ...
}

ServiceBusPipeFilter sınıfındaki Start yöntemi, bir giriş ve bir çıkış kuyruğuna bağlanır ve Close yöntemi giriş kuyruğuyla bağlantıyı keser. OnPipeFilterMessageAsync yöntemi iletilerin işlenmesini gerçekleştirir. Bu yöntemin asyncFilterTask parametresi hangi işlemenin gerçekleştirileceğini belirtir. OnPipeFilterMessageAsync yöntemi giriş kuyruğundan ileti gelmesini bekler, gelen her ileti üzerinde asyncFilterTask parametresi tarafından belirtilen kodu çalıştırır ve sonuçları çıkış kuyruğuna gönderir. Kuyrukların kendileri, oluşturucu tarafından belirtilir.

Örnek çözüm, filtreleri bir grup çalışan rolünde uygular. Her çalışan rolü, gerçekleştirdiği iş işleminin karmaşıklığına veya işlem için gereken kaynaklara bağlı bir şekilde bağımsız olarak ölçeklendirilebilir. Ayrıca, her çalışan rolünün birden çok örneği paralel biçimde çalıştırılarak işleme miktarı artırılabilir.

Aşağıdaki kod, örnek çözümdeki PipeFilterA projesinde tanımlanan PipeFilterARoleEntry adlı bir Azure çalışan rolü göstermektedir.

public class PipeFilterARoleEntry : RoleEntryPoint
{
  ...
  private ServiceBusPipeFilter pipeFilterA;

  public override bool OnStart()
  {
    ...
    this.pipeFilterA = new ServiceBusPipeFilter(
      ...,
      Constants.QueueAPath,
      Constants.QueueBPath);

    this.pipeFilterA.Start();
    ...
  }

  public override void Run()
  {
    this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
    {
      // Clone the message and update it.
      // Properties set by the broker (Deliver count, enqueue time, ...)
      // aren't cloned and must be copied over if required.
      var newMsg = msg.Clone();

      await Task.Delay(500); // DOING WORK

      Trace.TraceInformation("Filter A processed message:{0} at {1}",
        msg.MessageId, DateTime.UtcNow);

      newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");

      return newMsg;
    });

    ...
  }

  ...
}

Bu rol, bir ServiceBusPipeFilter nesnesi içerir. Roldeki OnStart yöntemi, giriş iletileri almaya ve çıkış iletileri göndermeye yönelik kuyruklara (kuyrukların adları Constants sınıfında tanımlanır) bağlanır. Run yöntemi, alınan her ileti üzerinde bir miktar işleme yapmak için OnPipeFilterMessagesAsync yöntemini çağırır (bu örnekte, işlemeye örnek olarak kısa bir süre bekleme işlemi kullanılmıştır). İşleme tamamlandığında sonuçları içeren yeni bir ileti oluşturulur (bu örnekte, giriş iletisine özel bir özellik eklenmektedir) ve bu ileti çıkış kuyruğuna gönderilir.

Örnek kod, PipeFilterB projesinde PipeFilterBRoleEntry adlı başka bir çalışan rolü içermektedir. Bu rol, PipeFilterARoleEntry rolüne benzer ancak Run yönteminde farklı bir işlem gerçekleştirir. Örnek çözümde bu iki rol birleştirilerek bir işlem hattı oluşturulmaktadır. PipeFilterARoleEntry rolünün çıkış kuyruğu, PipeFilterBRoleEntry rolünün giriş kuyruğudur.

Örnek çözüm, ayrıca InitialSenderRoleEntry (InitialSender projesinde) ve FinalReceiverRoleEntry (FinalReceiver projesinde) adlı iki ek rol sağlamaktadır. InitialSenderRoleEntry rolü işlem hattındaki ilk iletiyi sağlar. OnStart yöntemi tek bir kuyruğa bağlanır ve Run yöntemi bu kuyruğa bir ileti gönderir. Bu kuyruk, PipeFilterARoleEntry rolü tarafından giriş kuyruğu olarak kullanılır. Bu nedenle, bu kuyruğa bir ileti gönderildiğinde söz konusu ileti PipeFilterARoleEntry rolü tarafından alınıp işlenir. İşlenen ileti daha sonra PipeFilterBRoleEntry rolünden geçer.

FinalReceiveRoleEntry rolünün giriş kuyruğu, PipeFilterBRoleEntry rolünün çıkış kuyruğudur. Aşağıda gösterilen FinalReceiveRoleEntry rolündeki Run yöntemi iletiyi alır ve son işlemleri gerçekleştirir. Ardından, işlem hattındaki filtreler tarafından eklenen özel özelliklerin değerlerini izleme çıktısına yazar.

public class FinalReceiverRoleEntry : RoleEntryPoint
{
  ...
  // Final queue/pipe in the pipeline to process data from.
  private ServiceBusPipeFilter queueFinal;

  public override bool OnStart()
  {
    ...
    // Set up the queue.
    this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
    this.queueFinal.Start();
    ...
  }

  public override void Run()
  {
    this.queueFinal.OnPipeFilterMessageAsync(
      async (msg) =>
      {
        await Task.Delay(500); // DOING WORK

        // The pipeline message was received.
        Trace.TraceInformation(
          "Pipeline Message Complete - FilterA:{0} FilterB:{1}",
          msg.Properties[Constants.FilterAMessageKey],
          msg.Properties[Constants.FilterBMessageKey]);

        return null;
      });
    ...
  }

  ...
}

Sonraki adımlar

Bu düzeni uygularken aşağıdaki yönergeler de yararlı olabilir:

  • Bu düzeni gösteren bir örnek GitHub’dan edinilebilir.

Bu düzeni uygularken aşağıdaki desenler de ilgili olabilir:

  • Rekabet tüketicilere yönelik desenler. Bir işlem hattı, bir veya daha fazla filtrenin birden fazla örneğini içerebilir. Bu yaklaşım, yavaş filtrelerin paralel örneklerini çalıştırmak için kullanışlıdır. Sistem bu sayede yükü dağıtabilir ve işleme miktarını artırabilir. Filtrenin her örneği, girişler için diğer örneklerle rekabete girer. Bir filtrenin iki örneği aynı verileri işleyememelidir. Bu yaklaşımın bir açıklaması sağlanmaktadır.
  • Işlem kaynağı birleştirme stili. Birlikte ölçeklendirilmesi gereken filtrelerin aynı işlemde bir araya getirilmesi mümkün olabilir. Bu stratejinin avantajları ve dezavantajları hakkında daha fazla bilgi sağlamaktadır.
  • Telafi işlem kriteri. Bir filtre, ters çevrilebilen bir işlem ya da bir arıza olması halinde daha önceki bir durumu geri yükleyen bir telafi işlemine sahip olan bir işlem olarak uygulanabilir. Nihai tutarlılığı elde etmek veya sürdürmek için bu uygulamanın nasıl gerçekleştirileceği açıklanmaktadır.
  • Jonathan zeytin bloguna göre en küçük desenler .