Подсистемы Foundations

Буферы шины сервисов

Джувел Лоуи

Загрузка образца кода

В своей статье «Routers in the Service Bus» за октябрь 2009 г.(msdn.microsoft.com/magazine/ee335696) я представил вероятное направление развития Windows Azure AppFabric Service Bus на ближайшее будущее:она станет универсальным перехватчиком. Я также рассказал о функциональности маршрутизации и обещал написать об очередях в следующей статье.

С тех пор и маршрутизаторы, и очереди были отложены до второго выпуска шины сервисов, а пока вместо них шина сервисов будет предоставлять буферы. В будущих выпусках, весьма вероятно, появятся различные варианты протоколирования, диагностики и оснащения средствами мониторинга. Об этих аспектах мы поговорим, когда придет время. А в этой статье я расскажу о буферах и покажу некоторые «продвинутые» приемы программирования Windows Communication Foundation (WCF).

Буферы шины сервисов

В шине сервисов каждый URI в пространстве имен сервиса на самом деле является адресуемым узлом передачи сообщений (messaging junction). Клиент может послать сообщение этому узлу, а тот может ретранслировать его сервисам. Однако каждый узел также может функционировать в качестве буфера (рис. 1).

Figure 1 Buffers in the Service Bus

Рис. 1 Буферы в шине сервисов

Сообщения хранятся в буфере в течение заданного времени, даже если никакой сервис не проверяет этот буфер. Заметьте, что буфер может проверяться несколькими сервисами, но, если вы явным образом не выбираете и не блокируете сообщение, только один из них сможет извлечь сообщение.

Клиент отделен от сервисов за буфером; кроме того, клиенту и сервису нет нужды выполняться одновременно. Поскольку клиент взаимодействует с буфером, а не с реальной конечной точкой сервиса, все сообщения являются односторонними, и нет никакого (готового) способа получить результаты передачи сообщения или ошибки.

Буферы шины сервисов не следует равнять с очередями вроде Microsoft Message Queuing (MSMQ) или WCF-сервисов; между ними есть ряд значимых различий:

  • буферы шины сервисов ненадежны, так как сообщения хранятся в памяти. Это влечет риск потери сообщений в случае какого-либо катастрофического сбоя (пусть и маловероятного) самой шины сервисов;
  • буферы не поддерживают транзакции — ни отправлять, ни получать сообщения в рамках транзакции нельзя;
  • буферы не позволяют работать с сообщениями, которые должны храниться длительное время. Сервис должен извлечь сообщение из буфера в течение 10 минут — иначе сообщение будет отброшено. Хотя WCF-сообщения на основе MSMQ тоже имеют срок существования, он гораздо больше и по умолчанию равен суткам. Это открывает куда более широкие возможности в поддержке действительно разъединенных операций и отсоединенных приложений;
  • буферы ограничены в размере и не могут хранить более 50 сообщений;
  • буферизованные сообщения имеют фиксированный размер — по 64 Кб каждое. Хотя в MSMQ тоже накладываются ограничения на максимальный размер сообщения, он значительно больше (4 Мб на сообщение).

Таким образом, буферы на деле не поддерживают настоящие очереди вызовов, адресованных облаку; они скорее служат для некоторого повышения гибкости соединения, и при этом занимают нишу где-то между вызовами в настоящих очередях и асинхронными вызовами наподобие «выстрелил и забыл».

Существует две ситуации, в которых буферы полезны. Одна из них — клиент и сервис взаимодействуют по ненадежному, часто разрываемому соединению; если время в автономном режиме не превышает срок хранения сообщений, буферы позволяют нормализовать работу в таких условиях. Вторая (и более распространенная) ситуация — клиент инициирует асинхронные односторонние вызовы и использует буфер ответов (как будет описано позже в разделе «сервис ответов») для обработки результатов вызовов. Подобное взаимодействие осуществляется так, будто сетевое соединение в большей мере является амортизирующим тросом.

Работа с буферами

Адрес буфера должен быть уникальным; с одним адресом можно сопоставить только один буфер, и этот адрес не должен быть уже занят другим буфером или сервисом. Однако сообщения из одного буфера могут получать несколько сторон. Кроме того, адрес буфера должен соответствовать схеме протокола HTTP или HTTPS. Для отправки сообщений в буфер и получения их из него шина сервисов поддерживает API, аналогичный System.Messaging, т. е. требует от вас обмена неструктурированными сообщениями (raw messages). Администратор шины сервисов управляет буферами независимо от сервисов или клиентов. К каждому буферу применяется политика, управляющая его поведением и жизненным циклом. Изначально администратор шины сервисов должен выполнить ряд программных вызовов для создания и контроля буферов.

Политика каждого буфера выражается экземпляром класса MessageBufferPolicy, как показано на рис. 2.

Рис. 2 Класс MessageBufferPolicy

[DataContract]
public class MessageBufferPolicy : ...
{
  public MessageBufferPolicy();
  public MessageBufferPolicy(MessageBufferPolicy policyToCopy);

  public DiscoverabilityPolicy Discoverability
  {get;set;}

  public TimeSpan ExpiresAfter
  {get;set;}

  public int MaxMessageCount
  {get;set;}

  public OverflowPolicy OverflowPolicy
  {get;set;}

  public AuthorizationPolicy Authorization
  {get;set;} 
  
  public TransportProtectionPolicy TransportProtection
  {get;set;}
}

Свойство Discoverability политики — это перечислимое типа DiscoverabilityPolicy, которое управляет тем, будет ли буфер включен в реестр шины сервисов (ATOM-канал):

public enum DiscoverabilityPolicy
{
  Managers,
  ManagersListeners,
  ManagersListenersSenders,
  Public 
}

Значение Discoverability по умолчанию — DiscoverabilityPolicy.Managers, которое требует заявку на управляемую авторизацию (managed authorization claim). Значение DiscoverabilityPolicy.Public приводит к публикации буфера в канале без всякой авторизации.

Свойство ExpiresAfter контролирует жизненный цикл сообщений в буфере. По умолчанию он равен пяти минутам; минимальное значение — 1, а максимальное — 10. Любая попытка задать большее время, автоматически игнорируется.

Свойство MaxMessageCount определяет размер буфера. Политика по умолчанию — 10 сообщений, а минимум, естественно, равен 1. Как уже упоминалось, максимальный размер буфера — 50 сообщений, и попытки задать большее значение автоматически игнорируются.

Свойство OverflowPolicy является перечислимым с единственным значением:

public enum OverflowPolicy
{
  RejectIncomingMessage
}

OverflowPolicy определяет, что делать с сообщением, когда буфер уже заполнен (размер определяется MaxMessageCount). Единственный возможный вариант — отклонение сообщения и возврат отправителю с уведомлением об ошибке.

Перечислимое с единственным значением служит шаблоном подстановки для будущих вариантов, например отбрасывания сообщения без уведомления отправителя или удаления сообщений из буфера и приема нового.

Последние два свойства отвечают за конфигурацию защиты. Свойство AuthorizationPolicy указывает шине сервисов, следует ли авторизовать маркер клиента:

public enum AuthorizationPolicy
{
  NotRequired,
  RequiredToSend,
  RequiredToReceive,
  Required
}

Значение по умолчанию — AuthorizationPolicy.Required — требует авторизации клиентов как при передаче, так и при приеме сообщений.

Наконец, свойство TransportProtection определяет минимальный уровень защиты передачи сообщения в буфер, используя перечислимое типа TransportProtectionPolicy:

public enum TransportProtectionPolicy
{
  None,
  AllPaths,
}

По умолчанию транспорт защищается через TransportProtectionPolicy.AllPaths во всех политиках буферов; это значение требует использования HTTPS-адреса.

С помощью класса MessageBufferClient вы можете администрировать свой буфер, как показано на рис. 3.

Рис. 3 Класс MessageBufferClient

public sealed class MessageBufferClient
{
  public Uri MessageBufferUri
  {get;}

  public static MessageBufferClient CreateMessageBuffer(
    TransportClientEndpointBehavior credential,
    Uri messageBufferUri,MessageBufferPolicy policy);

  public static MessageBufferClient GetMessageBuffer(
    TransportClientEndpointBehavior credential,Uri messageBufferUri);
  public MessageBufferPolicy GetPolicy();
  public void DeleteMessageBuffer();

  // More members   
}

Чтобы получить аутентифицированный экземпляр MessageBufferClient, вы используете его статические методы, передавая им удостоверения шины сервисов (типа TransportClientEndpointBehavior). При каждом использовании MessageBufferClient обычно нужно проверять, существует ли данный буфер в шине сервисов, и для этого вызывается метод GetMessageBuffer. Если буфера нет, GetMessageBuffer генерирует исключение.

Вот как программно создать буфер:

Uri bufferAddress = 
  new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer/");

TransportClientEndpointBehavior credential = ...

MessageBufferPolicy bufferPolicy = new MessageBufferPolicy();

bufferPolicy.MaxMessageCount = 12;
bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3);
bufferPolicy.Discoverability = DiscoverabilityPolicy.Public;

MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,
  bufferPolicy);

В этом примере вы создаете экземпляр объекта политики буфера и задаете нужные значения. Для установки буфера достаточно вызвать метод CreateMessageBuffer класса MessageBufferClient с настроенной политикой и правильными удостоверениями.

В качестве альтернативы программным вызовам для просмотра и модификации буферов можно задействовать мой Service Bus Explorer (я опубликовал его в своей статье по маршрутизации, а полный исходный код содержится в наборе для скачивания к той статье). На рис. 4 показано, как создать новый буфер, указав его адрес и различные свойства политики. Во многом так же вы можете удалить все буферы в пространстве имен сервиса.

Figure 4 Creating a Buffer Using the Service Bus Explorer
Рис. 4 Создание буфера с помощью Service Bus Explorer

Вы также можете просматривать и изменять политики существующих буферов, сбрасывать сообщения в буфере и даже удалять буфер, выбирая его в дереве пространства имен сервиса и указывая нужные свойства в правой секции, как показано на рис. 5.

Figure 5 A Buffer in the Service Bus Explorer
Рис. 5 Буфер в Service Bus Explorer

Упрощение администрирования

Создавая буферы, лучше всего задать максимальные размер и срок хранения сообщений, чтобы у клиентов и сервисов было больше времени для взаимодействия. Более того, неплохо сделать буфер распознаваемым, чтобы он был виден в реестре шины сервисов. Когда дело доходит до использования буфера, клиент и сервис должны проверять, что буфер существует, — в ином случае они должны создавать его.

Чтобы автоматизировать эти операции, я создал класс ServiceBusHelper:

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,string secret);
  public static void CreateBuffer(string bufferAddress,string issuer,
    string secret);

  public static void VerifyBuffer(string bufferAddress,string secret);
  public static void VerifyBuffer(string bufferAddress,string issuer,
    string secret);
  public static void PurgeBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential);
  public static void DeleteBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential); 
}

Метод CreateBuffer создает новый распознаваемый буфер с максимальной емкостью в 50 сообщений и длительностью их хранения 10 минут. Если буфер уже имеется, CreateBuffer удаляет старый буфер. Метод VerifyBuffer проверяет, существует ли буфер, и, если его нет, создает новый. PurgeBuffer полезен для очистки всех буферизованных сообщений в ходе диагностики или отладки. DeleteBuffer просто удаляет буфер. На рис. 6 показан сокращенный листинг реализации этих методов.

Рис. 6 Сокращенный листинг вспомогательных методов буфера

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,
    string issuer,string secret)
  {
    TransportClientEndpointBehavior credentials = ...;
    CreateBuffer(bufferAddress,credentials);
  }
  static void CreateBuffer(string bufferAddress,
    TransportClientEndpointBehavior credentials)
  {
    MessageBufferPolicy policy = CreateBufferPolicy();
    CreateBuffer(bufferAddress,policy,credentials);
  }
  static internal MessageBufferPolicy CreateBufferPolicy()
  {
    MessageBufferPolicy policy = new MessageBufferPolicy();                
    policy.Discoverability = DiscoverabilityPolicy.Public;
    policy.ExpiresAfter = TimeSpan.Fromminutes(10);
    policy.MaxMessageCount = 50;

    return policy;
  }
   public static void PurgeBuffer(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     Debug.Assert(BufferExists(bufferAddress,credentials));
     MessageBufferClient client = 
       MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
     MessageBufferPolicy policy = client.GetPolicy();
     client.DeleteMessageBuffer();
        
     MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,policy);
   }
   public static void VerifyBuffer(string bufferAddress,
     string issuer,string secret)
   {
     TransportClientEndpointBehavior credentials = ...;
     VerifyBuffer(bufferAddress,credentials);
   }
   internal static void VerifyBuffer(string bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     if(BufferExists(bufferAddress,credentials))
     {
       return;
     }
     CreateBuffer(bufferAddress,credentials);
   }
   internal static bool BufferExists(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     try
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
       client.GetPolicy();
       return true;
     }
     catch(FaultException)
     {}
      
     return false;
   }
   static void CreateBuffer(string bufferAddress,
     MessageBufferPolicy policy,
     TransportClientEndpointBehavior credentials)
   {   
     Uri address = new Uri(bufferAddress);
     if(BufferExists(address,credentials))
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,address);
       client.DeleteMessageBuffer();
     }  
     MessageBufferClient.CreateMessageBuffer(credentials,address,policy);
   }
}

Метод BufferExists использует метод GetPolicy объекта MessageBufferClient для проверки наличия буфера и интерпретирует ошибку как указание на то, что буфера нет. Очистка буфера осуществляется копирование его политики, удалением самого буфера и созданием нового с тем же адресом и с той же политикой.

Передача и получение сообщений

Как уже упоминалось, буферы шины сервисов требуют взаимодействия на основе неструктурированных WCF-сообщений. Для этого служат методы Send и Retrieve объекта MessageBufferClient (получаемого при создании или обнаружении буфера):

public sealed class MessageBufferClient
{
  public void Send(Message message);
  public void Send(Message message,TimeSpan timeout);

  public Message Retrieve();
  public Message Retrieve(TimeSpan timeout);

  // More members
}

Оба метода по умолчанию используют заданный период ожидания, а в версиях без параметров этот интервал устанавливается равным одной минуте. Для отправителя этот период означает, сколько времени он будет ждать в том случае, если буфер уже заполнен, а для получателя — сколько времени он будет ждать, если буфер пуст.

Вот код (на стороне отправителя) для передачи неструктурированных сообщений в буфер:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client =   
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);

Message message = Message.CreateMessage(MessageVersion.Default,"Hello");

client.Send(message,TimeSpan.MaxValue);

Отправитель сначала создает объект удостоверений и с его помощью получает экземпляр MessageBufferClient. Затем создает WCF-сообщение и посылает его в буфер. Ниже приведен код (на стороне получателя) для извлечения сообщений из буфера:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client = 
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = client.Retrieve();

Debug.Assert(message.Headers.Action == "Hello");

Сервисы с буферизацией

Пока что шина сервисов может предложить лишь использование неструктурированных WCF-сообщений, как в предыдущем фрагменте кода. И такая модель программирования оставляет желать много лучшего. Она громоздка, нудна, требует много ручного труда, не структурирована, не ориентирована на объекты и не обеспечивает строгую типизацию. Фактически это возврат к прошлому — к тем дням, когда и WCF-то еще не было и приходилось заниматься программированием MSMQ с применением System.Messaging API. Содержимое сообщений нужно разбирать самостоятельно.

К счастью, эту ситуацию можно улучшить. Вместо взаимодействия через неструктурированные сообщения следует поднять уровень коммуникаций до структурированных вызовов между клиентами и сервисами. Хотя это требует довольно большого объема работы, я сумел инкапсулировать ее в небольшом наборе вспомогательных классов.

Чтобы создать возможность структурированных буферизуемые вызовы на стороне сервиса, я написал класс BufferedServiceBusHost<T>:

// Generic type parameter based host
public class ServiceHost<T> : ServiceHost
{...}

public class BufferedServiceBusHost<T> : ServiceHost<T>,...
{
  public BufferedServiceBusHost(params Uri[] bufferAddresses);
  public BufferedServiceBusHost(
    T singleton,params Uri[] bufferAddresses);

  /* Additional constructors */
}

Я смоделировал BufferedServiceBusHost<T> на основе использования WCF с привязкой MSMQ. Вам нужно передать его конструктору адрес или адреса буферов, из которых вы будете извлекать сообщения. Остальное делается так же, как и в случае обычного WCF-хоста сервисов:

Uri buffer = new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(buffer);
host.Open();

Заметьте, что вы можете передавать конструкторам адреса нескольких буферов для мониторинга точно так же, как WCF-хост сервисов может открывать несколько конечных точек с разными очередями. Указывать адреса этих буферов в разделе service конфигурационного файла конечной точки нет нужды (или способа, хотя их можно брать из раздела настроек приложения, если вы спроектируете именно так).

Хотя реальное взаимодействие с буфером шины сервисов выполняется с помощью неструктурированных WCF-сообщений, эта работа инкапсулирована. BufferedServiceBusHost<T> будет проверять, существуют ли переданные буферы, и создавать эти буферы, если их нет, используя политику ServiceBusHelper.VerifyBuffer, показанную на рис. 6. BufferedServiceBusHost<T> будет применять безопасность транспорта по умолчанию, защищая все пути. Он также будет проверять, чтобы все контракты сервиса, переданного в виде обобщенного параметра-типа T были односторонними, т. е. чтобы выполнялись лишь односторонние операции (как это делает привязка односторонней ретрансляции [one-way relay binding]). И последняя особенность: при закрытии хоста (только в режиме отладки) BufferedServiceBusHost<T> будет очищать все свои буферы, чтобы гарантировать нормальный запуск следующего сеанса отладки.

BufferedServiceBusHost<T> размещает указанный сервис локально. Для каждого контракта сервиса в параметре-типе T класс BufferedServiceBusHost<T> добавляет конечную точку по IPC (именованным каналам). Привязка IPC к этим конечным точкам конфигурируется на бесконечное время ожидания.

Хотя IPC всегда создает транспортный сеанс, чтобы имитировать поведение MSMQ, даже сервисы, индивидуальные для каждого сеанса (per-session services), интерпретируются как индивидуальные для каждого вызова (per-call services). Каждое извлеченное из очереди WCF-сообщение обрабатывается в новом экземпляре сервиса, потенциально параллельно с предыдущими сообщениями — как в случае привязки MSMQ. Если переданный тип сервиса является «одноэкземплярным» (singleton), BufferedServiceBusHost<T> подчиняется такому условию и передает все сообщения через все буферы и конечные точки одному и тому же экземпляру сервиса — тоже как в случае привязки MSMQ.

BufferedServiceBusHost<T> отслеживает каждый указанный буфер отдельным фоновым рабочим потоком. Когда сообщение помещается в буфер, BufferedServiceBusHost<T> извлекает его и преобразует в вызов соответствующей конечной точки по IPC.

На рис. 7 показан сокращенный листинг BufferedServiceBusHost<T>, из которого удалена большая часть обработки ошибок и кода, относящегося к защите.

Рис. 7 Сокращенный листинг BufferedServiceBusHost<T>

public class BufferedServiceBusHost<T> : 
  ServiceHost<T>,IServiceBusProperties 
{
  Uri[] m_BufferAddresses;
  List<Thread> m_RetrievingThreads;
  IChannelFactory<IDuplexSessionChannel>
    m_Factory;
  Dictionary<string,IDuplexSessionChannel> 
    m_Proxies;

  const string CloseAction = 
    "BufferedServiceBusHost.CloseThread";

  public BufferedServiceBusHost(params Uri[] 
    bufferAddresses)
  {
    m_BufferAddresses = bufferAddresses;
    Binding binding = new NetNamedPipeBinding();
    binding.SendTimeout = TimeSpan.MaxValue;

    Type[] interfaces = 
      typeof(T).GetInterfaces();

    foreach(Type interfaceType in interfaces)
    {         
      VerifyOneway(interfaceType);
      string address = 
        @"net.pipe://localhost/" + Guid.NewGuid();
      AddServiceEndpoint(interfaceType,binding,
        address);
    }
    m_Factory = 
      binding.BuildChannelFactory
      <IDuplexSessionChannel>();
    m_Factory.Open();
  }
  protected override void OnOpened()
  {
    CreateProxies();                       
    CreateListeners();
    base.OnOpened();
  }
  protected override void OnClosing()
  {
    CloseListeners();

    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }

    m_Factory.Close();

    PurgeBuffers();

    base.OnClosing();
  }

  // Verify all operations are one-way
  
  void VerifyOneway(Type interfaceType)
  {...}
  void CreateProxies()
  {
    m_Proxies = 
      new Dictionary
      <string,IDuplexSessionChannel>();

    foreach(ServiceEndpoint endpoint in 
      Description.Endpoints)
    {
      IDuplexSessionChannel channel = 
        m_Factory.CreateChannel(endpoint.Address);
      channel.Open();
      m_Proxies[endpoint.Contract.Name] = 
        channel;
    }
  }

  void CreateListeners()
  {
    m_RetrievingThreads = new List<Thread>();

    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {         ?      ServiceBusHelper.VerifyBuffer(
        bufferAddress.AbsoluteUri,m_Credential);
         
      Thread thread = new Thread(Dequeue);

      m_RetrievingThreads.Add(thread);
      thread.IsBackground = true;
      thread.Start(bufferAddress);
    }
  }

  void Dequeue(object arg)
  {
    Uri bufferAddress = arg as Uri;

    MessageBufferClient bufferClient = ?      MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);      
    while(true)
    {
      Message message = 
        bufferClient.Retrieve(TimeSpan.MaxValue);
      if(message.Headers.Action == CloseAction)
      {
        return;
      }
      else
      {
        Dispatch(message);
      }      
    }
  }
   
  
  
  void Dispatch(Message message)
  {
    string contract = ExtractContract(message);
    m_Proxies[contract].Send(message);
  }
  string ExtractContract(Message message)
  {
    string[] elements = 
      message.Headers.Action.Split('/');
    return elements[elements.Length-2];         
  }
  protected override void OnClosing()
  {
    CloseListeners();
    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }
    m_Factory.Close();

    PurgeBuffers();
    base.OnClosing();
  }
  void SendCloseMessages()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      MessageBufferClient bufferClient =                 ?        MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);
      Message message =   
        Message.CreateMessage(
        MessageVersion.Default,CloseAction);
      bufferClient.Send(message);
    }   
  }
  void CloseListeners()
  {
    SendCloseMessages();

    foreach(Thread thread in m_RetrievingThreads)
    {
      thread.Join();
    }
  }   

  [Conditional("DEBUG")]
  void PurgeBuffers()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      ServiceBusHelper.PurgeBuffer(
        bufferAddress,m_Credential);
    }
  } 
}

BufferedServiceBusHost<T> хранит прокси для локально размещенных конечных точек IPC в словаре с именем m_Proxies:

Dictionary<string,IDuplexSessionChannel> m_Proxies;

Ключ в словаре — имя типа контракта конечных точек.

Конструкторы сохраняют переданные адреса буферов, а затем через отражение получают набор всех интерфейсов типа сервиса. BufferedServiceBusHost<T> проверяет, что в каждом интерфейсе есть только односторонние операции, потом вызывает базовый AddServiceEndpoint для добавления конечной точки для этого типа контракта. Адрес - это адрес IPC, использующий GUID для имени канала. Конструкторы используют привязку IPC для создания фабрики канала типа IChannelFactory<IDuplexSessionChannel>. С помощью IChannelFactory<T> создается нестрого типизированный канал через привязку:

public interface IChannelFactory<T> : IChannelFactory
{
  T CreateChannel(EndpointAddress to);
  // More members
}

После открытия внутреннего хоста со всеми его конечными точками IPC метод OnOpened создает внутренние прокси для этих конечных точек и слушателей с буферизацией. Эти две операции являются центральными в BufferedServiceBusHost<T>. Чтобы создать прокси, он перебирает набор конечных точек. При этом извлекается адрес каждой конечной точки и используется IChannelFactory<IDuplexSessionChannel> для создания канала к этому адресу. Затем канал (или прокси) сохраняется в словаре. Метод CreateListeners перебирает указанные адреса буферов и проверяет каждый из этих буферов, а затем создает рабочий поток для извлечения из него сообщений.

Метод Dequeue использует MessageBufferClient для получения сообщений в бесконечном цикле и диспетчеризует их с помощью метода Dispatch. Последний извлекает из сообщения имя целевого контракта и на его основе получает IDuplexChannel из словаря прокси, а затем пересылает сообщение по IPC. IDuplexChannel поддерживается нижележащим IPC-каналом и позволяет передавать неструктурированные сообщения:

public interface IOutputChannel : ...
{
  void Send(Message message,TimeSpan timeout);
  // More members
}
public interface IDuplexSessionChannel : IOutputChannel,...
{}

Если при IPC-вызове произошла ошибка, BufferedServiceBusHost<T> заново создаст канал к соответствующей конечной точке (на рис. 7 это не показано). Закрыв хост, вы должны закрыть и все прокси. Это позволит корректно дождаться завершения всех вызовов. Проблема в том, как корректно закрыть все считывающие потоки, так как MessageBufferClient.Retrieve является блокирующей операцией и встроенного способа ее отмены нет. Решение — асинхронно отправлять в каждый отслеживаемый буфер специальное закрытое сообщение, которое уведомляет считывающий поток о необходимости выхода. Именно это и делает метод SendCloseMessages. Метод CloseListeners посылает закрытое сообщение буферам, а затем ждет завершения всех слушающих потоков. Закрытие слушающих потоков прекращает передачу сообщений во внутренние прокси, и, как только прокси закрываются (после возврата из всех текущих вызовов), хост готов к завершению работы. BufferedServiceBusHost<T> также поддерживает «грубую» отмену через метод Abort, который просто отменяет все потоки (на рис. 7 это не показано).

Наконец, заметьте, что BufferedServiceBusHost<T> поддерживает интерфейс IServiceBusProperties, который я определил так:

public interface IServiceBusProperties
{
  TransportClientEndpointBehavior Credential
  {get;set;}

  Uri[] Addresses
  {get;}
}

Такой интерфейс мне нужен был в нескольких местах своей инфраструктуры, особенно для упрощения буферизации. Для клиента я написал такой класс BufferedServiceBusClient<T>:

public abstract class BufferedServiceBusClient<T> :                          
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties 
{
  // Buffer address from config
  public BufferedServiceBusClient() 
  {}
  // No need for config file
  public BufferedServiceBusClient(Uri bufferAddress);


  /* Additional constructors with different credentials */  
  protected virtual void Enqueue(Action action);
}

BufferedServiceBusClient<T> наследует от моего HeaderClientBase<T, H> (вспомогательного прокси, используемого для передачи информации в заголовках сообщения; см. мою статью «Synchronization Contexts in WCF» за ноябрь 2007 г. по ссылке msdn.microsoft.com/magazine/cc163321):

public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T> 
                                              where T : class
{
  protected H Header
  {get;set;}

  // More members
}

Предназначение этого базового класса — поддержка сервиса ответов (о нем мы поговорим в следующем разделе). Для обычного клиента сервиса с буферизацией это наследование не имеет значения.

Вы можете использовать BufferedServiceBusClient<T> как с конфигурационным файлом клиента, так и без. Конструкторы, принимающие адрес буфера, не требуют конфигурационного файла. Конструктор без параметров или конструкторы, принимающие имя конечной точки, ожидают, что в конфигурационном файле содержится конечная точка, подходящая типу контракта с односторонней привязкой ретрансляции (one-way relay binding) (хотя эта привязка напрочь игнорируется BufferedServiceBusClient<T>).

Наследуя свой прокси от BufferedServiceBusClient<T>, вы должны использовать защищенный метод Enqueue, а не обращаться напрямую к свойству Channel:

[ServiceContract]
interface IMyContract
{
  [OperationContract(IsOneWay = true)]
  void MyMethod(int number);
}

class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
  public void MyMethod(int number)
  {
    Enqueue(()=>Channel.MyMethod(number));
  }
}

Enqueue принимает делегат (или лямбда-выражение), который обертывает все использование свойства Channel. В результате удается сохранить строгую типизацию. Сокращенный листинг реализации класса BufferedServiceBusClient<T> показан на рис. 8.

Рис. 8 Сокращенный листинг BufferedServiceBusClient<T>

public abstract class BufferedServiceBusClient<T> :                               
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties where T : class
{
  MessageBufferClient m_BufferClient;

  public BufferedServiceBusClient(Uri bufferAddress) : 
    base(new NetOnewayRelayBinding(),new EndpointAddress(bufferAddress)) 
  {}

  protected virtual void Enqueue(Action action) 
  {
    try
    {
      action();
    }
    catch(InvalidOperationException exception)
    {
      Debug.Assert(exception.Message ==
        "This message cannot support the operation " +
        "because it has been written.");
    }
  }
  protected override T CreateChannel()
  {    
    ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);
    m_BufferClient =  ?      MessageBufferClient.GetMessageBuffer(Credential,m_BufferAddress);

    return base.CreateChannel();   
  }
  protected override void PreInvoke(ref Message request)
  {
    base.PreInvoke(ref request);       
           
    m_BufferClient.Send(request);
  }
  protected TransportClientEndpointBehavior Credential
  {
    get
    {...}
    set
    {...}
  }
}

Конструкторы BufferedServiceBusClient<T> передают конструктору базового класса адрес буфера и привязку, которая всегда является односторонней привязкой ретрансляции, чтобы ввести в действие проверку на односторонние операции. Метод CreateChannel проверяет наличие целевого буфера и получает MessageBufferClient, представляющий его. Сердцевиной BufferedServiceBusClient<T> является метод PreInvoke. Это виртуальный метод, предоставляемый InterceptorClientBase<T> — базовым классом HeaderClientBase<T, H>:

public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class
{
  protected virtual void PreInvoke(ref Message request);
  // Rest of the implementation 
}

PreInvoke позволяет легко обрабатывать WCF-сообщения до их диспетчеризации клиенту. BufferedServiceBusClient<T> переопределяет PreInvoke и использует клиент для отправки сообщения в буфер. Тем самым на клиенте поддерживается модель структурного программирования, и BufferedServiceBusClient<T> инкапсулирует операции с WCF-сообщением. Недостаток в том, что сообщение можно отправить лишь раз и, когда это пытается сделать корневой класс для ClientBase, генерируется исключение InvalidOperationException. И тут очень полезен Enqueue, который обрабатывает это исключение.

Сервис ответов

В своей статье «Build a Queued WCF Response Service» за февраль 2007 г.(msdn.microsoft.com/magazine/cc163482) я объяснял, что единственный способ получить результат (или ошибки) от вызова в очереди — использовать сервис ответов с поддержкой очереди. Я показал, как передать заголовки сообщения в объект контекста ответа, который содержит логический идентификатор метода и адрес ответа:

[DataContract]
public class ResponseContext
{
  [DataMember]
  public readonly string ResponseAddress;

  [DataMember]
  public readonly string MethodId;

  public ResponseContext(string responseAddress,string methodId);

  public static ResponseContext Current
  {get;set;}

  // More members 
}

Тот же проектировочный шаблон годится и для работы с буферами. Клиенту нужно передать выделенный для ответов буфер сервису, чтобы последний помещал ответ в этот буфер. Клиенту также требуется передать адрес ответа и идентификатор метода в заголовках сообщения — точно так же, как в вызовах на основе MSMQ. Основное различие между сервисов ответов на основе MSMQ и шиной сервисов заключается в том, что буфер ответов должен находиться в шине сервисов, как показано на рис. 9.

Figure 9 Service Bus Buffered Response Service Рис. 9 Cервис ответов шины сервисов с буферизацией

Для упрощения операций на клиентской стороне я написал класс ClientBufferResponseBase<T>:

 

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  protected readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress);

  /* Additional constructors with different credentials */
     
  protected virtual string GenerateMethodId();
}

ClientBufferResponseBase<T> — специализированный подкласс BufferedServiceBusClient<T>, и он добавляет контекст ответа в заголовки сообщения. Вот почему я сделал BufferedServiceBusClient<T> производным от HeaderClientBase<T, H>, а не от InterceptorClientBase<T>. Вы можете использовать ClientBufferResponseBase<T> так же, как BufferedServiceBusClient (рис. 10).

Рис. 10 Упрощение клиентской стороны

[ServiceContract]
interface ICalculator
{
  [OperationContract(IsOneWay = true)]
  void Add(int number1,int number2);
}

class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator
{
  public CalculatorClient(Uri responseAddress) : base(responseAddress)
  {}
   
  public void Add(int number1,int number2)
  {
     Enqueue(()=>Channel.Add(number1,number2));
  }
}

Использовать подкласс ClientBufferResponseBase<T> достаточно легко:

Uri resposeAddress = 
  new Uri(@"sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/");

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
proxy.Close();

При управлении ответами на клиентской стороне удобно, когда вызвавший клиент получает идентификатор метода, используемый для диспетчеризации вызова. Это легко сделать через свойство Header:

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;

Реализация ClientBufferResponseBase<T> приведена на рис. 11.ClientBufferResponseBase<T> переопределяет метод PreInvoke класса HeaderClientBase<T, H> так, чтобы можно было генерировать новый идентификатор метода для каждого вызова и помещать его в заголовки.

Рис. 11 Реализация ClientBufferResponseBase<T>

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  public readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress)
  {
    ResponseAddress = responseAddress;
  }
   
  /* More Constructors */

  protected override void PreInvoke(ref Message request)
  {
    string methodId = GenerateMethodId();
    Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId);         
    base.PreInvoke(ref request);
  }

  protected virtual string GenerateMethodId()
  {
    return Guid.NewGuid().ToString();
  }

  // Rest of the implementation 
}

Для упрощения работы, требуемой сервисом с буферизацией для вызова сервиса ответов, я написал класс ServiceBufferResponseBase<T>, показанный на рис. 12.

Рис. 12 Класс ServiceBufferResponseBase<T>

public abstract class ServiceBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class 
{
  public ServiceBufferResponseBase() : 
   base(new Uri(ResponseContext.Current.ResponseAddress))
 {
   Header = ResponseContext.Current;
               
   // Grab the credentials the host was using 

   IServiceBusProperties properties = 
     OperationContext.Current.Host as IServiceBusProperties;
   Credential = properties.Credential;
  }
}

Хотя для вставки ответа в очередь сервис мог бы использовать просто BufferedServiceBusClient<T>, вам пришлось бы извлекать адрес буфера из заголовков и каким-то образом получать удостоверения для входа в буфер шины сервисов. Вам также понадобилось бы предоставлять заголовки исходящего вызова с контекстом ответа. Все это можно упростить с помощью ServiceBufferResponseBase<T>. Он предоставляет конструктору своего базового класса адрес из контекста ответа, а также помещает этот контекст в заголовки исходящего сообщения.

Еще одно допущение, которое делается в ServiceBufferResponseBase<T> для дальнейшего упрощения, заключается в том, что отвечающий сервис может использовать для передачи сообщений в буфер ответов те же удостоверения, что и его хост при получении сообщений из собственного буфера. Поэтому ServiceBufferResponseBase<T> получает ссылку на свой хост из контекста операции и считывает удостоверения с помощью реализации IServiceBusProperties хоста. ServiceBufferResponseBase<T> копирует эти удостоверения для собственного использования (в BufferedServiceBusClient<T>). Это, конечно, обязывает применять BufferedServiceBusHost<T> для хостинга сервиса. Ваш сервис должен создавать прокси-класс производным от ServiceBufferResponseBase<T> и использовать его для ответа. Например, при данном контракте ответа:

[ServiceContract]
interface ICalculatorResponse
{
  [OperationContract(IsOneWay = true)]
  void OnAddCompleted(int result,ExceptionDetail error);
}
This would be the definition of the proxy to the response service:
class CalculatorResponseClient :    
  ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    Enqueue(()=>Channel.OnAddCompleted(result,error));
  }
}

На рис. 13 показан простой сервис с буферизацией, отвечающий своему клиенту.

Рис. 13 Использование ServiceBufferResponseBase<T>

class MyCalculator : ICalculator
{
  [OperationBehavior(TransactionScopeRequired = true)]
  public void Add(int number1,int number2)
  {
    int result = 0;
    ExceptionDetail error = null;
    try
    {
      result = number1 + number2;
    }
     // Don’t rethrow 
    catch(Exception exception)
    {
      error = new ExceptionDetail(exception);
    }
    finally
    {
      CalculatorResponseClient proxy = new CalculatorResponseClient();
      proxy.OnAddCompleted(result,error);
      proxy.Close();
    }
  }
}

Сервису ответов нужен лишь доступ к идентификатору метода из заголовков сообщения:

class MyCalculatorResponse : ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    string methodId = ResponseContext.Current.MethodId;
    ...
  }
}

В будущем мы продолжим наши исследования шины сервисов.

Джувел Лоуи (Juval Lowy) — архитектор ПО в IDesign, консультант и преподаватель по архитектуре WCF. Автор книги «Programming WCF Services, Third Edition» (O'Reilly, 2010). Также является региональным директором корпорации Microsoft в Силиконовой долине. С ним можно связаться через сайт idesign.net.

Выражаю благодарность за рецензирование статьи эксперту: Джину Бейкеру (Jeanne Baker)