基础内容

服务总线缓冲区

Juval Lowy

下载示例代码

在我 2009 年 10 月的专栏文章“服务总线中的路由器”(msdn.microsoft.com/magazine/ee335696) 中,我提出 Windows Azure AppFabric 服务总线未来可能的发展方向:成为最终的侦听器。我提出了路由器功能,并承诺下一步将写写队列。

自那之后,路由器和队列已被推迟到服务总线的第二个版本,暂时代之以由服务总线提供缓冲区。未来版本可能会增加日志记录、诊断和各种检测选项。我会在以后的文章中讲述这些方面。在本文中,我将对缓冲区加以说明,也会向您展示一些先进的 Windows Communication Foundation (WCF) 编程技术。

服务总线缓冲区

在服务总线中,服务命名空间的每一个 URI 实际上都是一个可寻址的消息系统交接点。客户端可以将消息发送到这个交接点,交接点可以将其转发到服务。不过,每个交接点也可以作为一个缓冲区(请参见图 1)。

Figure 1 Buffers in the Service Bus

图 1 服务总线中的缓冲区

即使没有服务正在监控缓冲区,该消息也会在缓冲区内存储配置的时间段。请注意,有多种服务可以监控缓冲区,但除非您明确查看并锁定消息,否则只有其中一种服务可以检索消息。

客户端会在缓冲区后面与服务分离,并且客户端和服务可不必在同一时间运行。由于客户端与缓冲区交互,并没有与实际的服务端点交互,因此所有的消息都是单向的,也没有现成的办法获取消息调用的结果或任何错误。

该服务总线缓冲区不应等同于 Microsoft 消息队列 (MSMQ) 等队列或 WCF 排队服务,它们之间有一系列关键性差异:

  • 该服务总线缓冲区不持久,而且消息存储在内存中。也就是说,如果服务总线本身发生灾难性故障(虽然有点不太可能),消息会有丢失的风险。
  • 该服务总线缓冲区不是事务性的,不可以作为事务处理的一部分来完成对消息的发送或检索。
  • 缓冲区无法处理持久消息。服务必须在 10 分钟内从缓冲区检索消息,否则消息会被丢弃。尽管基于 WCF MSMQ 的消息也有生存时间,不过这个时间段要长得多,默认为一天。这极大地增加了真正脱节的操作和断开的应用程序的范围。
  • 缓冲区的大小有限,所保留的消息不能超过 50 条。
  • 所缓冲消息的大小也有上限,每个 64KB。 虽然 MSMQ 也对消息规定了最大尺寸,它的上限却要大得多(每条消息 4MB)。

因此,缓冲区无法通过云系统提供真正的排队调用,不过,如果连接会在排队调用和“即发即弃”的异步调用之间的某处丢失调用,在某种程度上缓冲区可以恢复这些连接。

缓冲区在两种情况下会有用。一种是应用程序的客户端和服务之间进行交互的连接不稳固,而只要消息在短期离线期间得到缓冲,连接的丢弃和重新连接是可以容忍的。第二个(和更常见)的情况是客户端发出异步单向调用,并利用响应缓冲区(如后面的“响应服务”一节所述)来处理该调用的结果。这样的互动更多地是将网络连接视为有弹性的绳索,而不是没有存储容量的刚性网络连接线。

使用缓冲区

缓冲区地址必须是唯一的;您只能将一个缓冲区与一个地址相关联,且此地址不能为缓冲区或服务所使用。然而,多方都可以从同一缓冲区检索消息。此外,缓冲区地址对于方案必须使用 HTTP 或 HTTPS。为发送消息并从缓冲区检索消息,服务总线提供了类似于 System.Messaging API 的 API;也就是说,它要求您与原始消息进行互动。服务总线管理器独立于服务或客户端管理缓冲区。每个缓冲区必须有一个策略管理其行为 和生存期。默认情况下,服务总线管理器必须执行编程调用,以创建和管理缓冲区。

图 2 所示,每个缓冲区策略都通过 MessageBufferPolicy 类的实例表达。

图 2 MessageBufferPolicy 类

[DataContract]
public class MessageBufferPolicy : ...
{
  public MessageBufferPolicy();
  public MessageBufferPolicy(MessageBufferPolicy policyToCopy);

  public DiscoverabilityPolicy Discoverability
  {get;set;}

  public TimeSpan ExpiresAfter
  {get;set;}

  public int MaxMessageCount
  {get;set;}

  public OverflowPolicy OverflowPolicy
  {get;set;}

  public AuthorizationPolicy Authorization
  {get;set;} 
  
  public TransportProtectionPolicy TransportProtection
  {get;set;}
}

Discoverability 策略属性是类型 DiscoverabilityPolicy 的枚举,控制服务总线注册表(Atom 源)是否包括缓冲区:

public enum DiscoverabilityPolicy
{
  Managers,
  ManagersListeners,
  ManagersListenersSenders,
  Public 
}

Discoverability 默认为 DiscoverabilityPolicy.Managers,这意味着它需要有托管授权声明。将其设置为 DiscoverabilityPolicy.Public 会在没有任何授权的情况下将其发布到源。

ExpiresAfter 属性控制消息在缓冲区中的生存期。默认为 5 分钟,最低值是 1 分钟,允许的最大值是 10 分钟。任何配置更长生存期的尝试都会被忽略且无提示。

MaxMessageCount 属性设置了缓冲区大小的上限。该策略的默认值为 10 条消息,最低值当然设置为 1。如前所述,缓冲区大小最大为 50,配置更大尺寸的尝试都会被忽略且无提示。

OverflowPolicy 属性属枚举类型,其单个值定义为:

public enum OverflowPolicy
{
  RejectIncomingMessage
}

OverflowPolicy 控制当缓冲区消息数达到最大值(由 MaxMessageCount 定义)时,如何处理消息。唯一可行的选择是拒绝此消息:将其发送回发件人,并提示错误。

单值枚举可作为未来选项的占位符:如在不通知发件人的情况下丢弃此消息,或自缓冲区删除信息,接受新消息。

最后两个属性负责安全配置。AuthorizationPolicy 属性指示服务总线是否批准客户端的令牌:

public enum AuthorizationPolicy
{
  NotRequired,
  RequiredToSend,
  RequiredToReceive,
  Required
}

该 AuthorizationPolicy.Required 的默认值要求同时授权发送和接收客户端。 

最后,TransportProtection 属性使用类型 TransportProtectionPolicy 的枚举值,规定了消息传输到缓冲区的传输安全的最低水平:

public enum TransportProtectionPolicy
{
  None,
  AllPaths,
}

通过 TransportProtectionPolicy.AllPaths 实现传输安全是所有缓冲区策略的默认值,它会强制使用 HTTPS 地址。

图 3 所示,您可以使用 MessageBufferClient 类管理缓冲区。

图 3 MessageBufferClient 类

public sealed class MessageBufferClient
{
  public Uri MessageBufferUri
  {get;}

  public static MessageBufferClient CreateMessageBuffer(
    TransportClientEndpointBehavior credential,
    Uri messageBufferUri,MessageBufferPolicy policy);

  public static MessageBufferClient GetMessageBuffer(
    TransportClientEndpointBehavior credential,Uri messageBufferUri);
  public MessageBufferPolicy GetPolicy();
  public void DeleteMessageBuffer();

  // More members   
}

通过向静态方法提供类型 TransportClientEndpointBehavior 的服务总线凭据,您使用 MessageBufferClient 的静态方法来获取 MessageBufferClient 经过身份验证的实例。每次使用 MessageBufferClient 时,您通常都需要调用 GetMessageBuffer 方法,检查缓冲区是否已经存在于服务总线中。如果没有缓冲区,GetMessageBuffer 会引发异常。

以下说明如何通过编程方式创建缓冲区:

Uri bufferAddress = 
  new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer/");

TransportClientEndpointBehavior credential = ...

MessageBufferPolicy bufferPolicy = new MessageBufferPolicy();

bufferPolicy.MaxMessageCount = 12;
bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3);
bufferPolicy.Discoverability = DiscoverabilityPolicy.Public;

MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,
  bufferPolicy);

在这个例子中,您对缓冲区策略对象进行了实例化并将策略设置为某些所需的值。要安装缓冲区,所需的操作就是提供策略和某些有效凭据,调用 MessageBufferClient 的 CreateMessageBuffer 方法。

作为编程调用的替代方案,您可以使用服务总线资源管理器(在我的路由器文章中有所说明;也可在线查看并找到此文章的示例代码)查看和修改缓冲区。图 4 说明如何通过指定地址和各项策略属性创建新的缓冲区。您还可以删除服务命名空间中的所有缓冲区,方式大致相同。

Figure 4 Creating a Buffer Using the Service Bus Explorer
图 4 使用服务总线资源管理器创建缓冲区

图 5 所示,您还可以通过在服务命名空间树中选择缓冲区,并与右窗格中的缓冲区属性进行互动,查看和修改现有缓冲区的策略,从缓冲区中清除消息,甚至删除缓冲区。

Figure 5 A Buffer in the Service Bus Explorer
图 5 服务总线资源管理器中的缓冲区

简化管理

在创建缓冲区时,最好尽可能扩大缓冲区的大小和使用寿命,给客户端和服务更多的交互时间。此外,使缓冲区可发现是一个好主意,这样您就可以在服务总线注册表中查看缓冲区。涉及到缓冲区的使用时,客户端和服务应验证缓冲区是否已创建,如果未创建会着手创建它。

为自动完成这些步骤,我创建了 ServiceBusHelper 类:

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,string secret);
  public static void CreateBuffer(string bufferAddress,string issuer,
    string secret);

  public static void VerifyBuffer(string bufferAddress,string secret);
  public static void VerifyBuffer(string bufferAddress,string issuer,
    string secret);
  public static void PurgeBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential);
  public static void DeleteBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential); 
}

该 CreateBuffer 方法创建了一个可发现的新缓冲区,最高容量有 50 条消息,持续时间为 10 分钟。如果缓冲区已经存在,CreateBuffer 会删除旧的缓冲区。该 VerifyBuffer 方法会验证缓冲区是否存在,如果没有,则创建新的缓冲区。PurgeBuffer 可用于在诊断或调试过程中清除所有缓存的信息。DeleteBuffer 会完全删除此缓冲区。图 6 显示了这些方法的部分实现列表。

图 6 缓冲区帮助程序方法的部分列表

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,
    string issuer,string secret)
  {
    TransportClientEndpointBehavior credentials = ...;
    CreateBuffer(bufferAddress,credentials);
  }
  static void CreateBuffer(string bufferAddress,
    TransportClientEndpointBehavior credentials)
  {
    MessageBufferPolicy policy = CreateBufferPolicy();
    CreateBuffer(bufferAddress,policy,credentials);
  }
  static internal MessageBufferPolicy CreateBufferPolicy()
  {
    MessageBufferPolicy policy = new MessageBufferPolicy();                
    policy.Discoverability = DiscoverabilityPolicy.Public;
    policy.ExpiresAfter = TimeSpan.Fromminutes(10);
    policy.MaxMessageCount = 50;

    return policy;
  }
   public static void PurgeBuffer(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     Debug.Assert(BufferExists(bufferAddress,credentials));
     MessageBufferClient client = 
       MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
     MessageBufferPolicy policy = client.GetPolicy();
     client.DeleteMessageBuffer();
        
     MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,policy);
   }
   public static void VerifyBuffer(string bufferAddress,
     string issuer,string secret)
   {
     TransportClientEndpointBehavior credentials = ...;
     VerifyBuffer(bufferAddress,credentials);
   }
   internal static void VerifyBuffer(string bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     if(BufferExists(bufferAddress,credentials))
     {
       return;
     }
     CreateBuffer(bufferAddress,credentials);
   }
   internal static bool BufferExists(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     try
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
       client.GetPolicy();
       return true;
     }
     catch(FaultException)
     {}
      
     return false;
   }
   static void CreateBuffer(string bufferAddress,
     MessageBufferPolicy policy,
     TransportClientEndpointBehavior credentials)
   {   
     Uri address = new Uri(bufferAddress);
     if(BufferExists(address,credentials))
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,address);
       client.DeleteMessageBuffer();
     }  
     MessageBufferClient.CreateMessageBuffer(credentials,address,policy);
   }
}

该 BufferExists 方法使用 MessageBufferClient 的 GetPolicy 方法查看是否存在缓冲区,如果缓冲区不存在,它会以一条错误消息来表征。清除缓冲区的做法是:复制其策略,删除缓冲区,使用 旧的策略创建新的缓冲区(地址相同)。

发送和检索消息

如前所述,服务总线缓冲区需要与原始 WCF 消息互动。这是通过 MessageBufferClient(创建或得到缓冲区时获得)的 Send 和 Retrieve 方法实现的:

public sealed class MessageBufferClient
{
  public void Send(Message message);
  public void Send(Message message,TimeSpan timeout);

  public Message Retrieve();
  public Message Retrieve(TimeSpan timeout);

  // More members
}

这两种方法都有超时设置,对于无参数版本,默认为 1 分钟。对于发送者而言,超时是指在缓冲区已满的情况下等待多长时间。对于检索者而言,超时是指在缓冲区已空的情况下等待多长时间。

下面是将原始信息发送到缓冲区的发送端代码:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client =   
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);

Message message = Message.CreateMessage(MessageVersion.Default,"Hello");

client.Send(message,TimeSpan.MaxValue);

发送者首先创建凭据对象,并使用它来获得 MessageBufferClient 的实例。然后发送者会创建 WCF 消息并将其发送到缓冲区。下面是从缓冲区检索原始信息的检索端代码:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client = 
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = client.Retrieve();

Debug.Assert(message.Headers.Action == "Hello");

缓冲的服务

像前面的代码段一样使用原始 WCF 消息是服务总线必须提供的功能。但是,这种编程模型有很多不足。它繁琐、冗长、非结构化、非面向对象、非类型安全。这是倒退到以前没有 WCF 的时期,使用 System.Messaging API,编程方式明显背离 MSMQ。您需要解析消息内容并开启其元素。

幸运的是,您可以改善这项基本功能。您不应与原始消息进行互动,而是应该将互动提升为客户端与服务之间的结构化调用。虽然这需要提前做相当程度的低级别工作,不过,我可以通过一小组帮助程序类实现这项功能。

为在服务端提供结构化的缓冲调用,我编写了 BufferedServiceBusHost<T>,定义如下:

// Generic type parameter based host
public class ServiceHost<T> : ServiceHost
{...}

public class BufferedServiceBusHost<T> : ServiceHost<T>,...
{
  public BufferedServiceBusHost(params Uri[] bufferAddresses);
  public BufferedServiceBusHost(
    T singleton,params Uri[] bufferAddresses);

  /* Additional constructors */
}

在随 MSMQ 绑定使用了 WCF 之后,我构建了 BufferedServiceBusHost<T>。您需要向构造函数提供要从中检索消息的缓冲区地址。其余工作与处理常规 WCF 服务主机类似:

Uri buffer = new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(buffer);
host.Open();

请注意,您可以为构造函数提供多个要监控的缓冲区地址,就像 WCF 服务主机可以打开不同队列的多个端点。不需要(也无法)在配置文件的服务端点部分提供这些缓冲区的任何地址(尽管缓冲区地址可以来自应用程序设置部分,如果您如此设计的话)。

虽然与服务总线缓冲区的实际通信是通过原始 WCF 消息完成的,不过,对这项工作已进行了封装。BufferedServiceBusHost<T> 将验证是否实际存在所提供的缓冲区,不存在的话,会利用如图 6 所示的 ServiceBusHelper.VerifyBuffer 缓冲区策略加以创建。BufferedServiceBusHost<T> 将使用保护所有路径的默认传输安全策略。它还将验证所提供的服务一般类型参数 T 的约定都是单向的,即它们都只有单向操作(与单项中继绑定一样)。最后一项功能:当关闭主机,只采用调试版本时,BufferedServiceBusHost<T> 将清除所有缓冲区,以确保顺利启动下一个调试会话。

BufferedServiceBusHost<T> 的操作方法是,在本地承载指定服务。对于类型参数 T 的每个服务约定,BufferedServiceBusHost<T> 增加了 IPC(命名管道)上的端点。IPC 同这些端点的绑定被配置为永不超时。

即便是在每次会话服务被看作每次调用服务的情况下,IPC 也总是有一个传输会话模拟 MSMQ 的行为。每个被从队列中取消的 WCF 消息都将被播放到此服务的新实例,可能会与以前的消息同时运行,这一点与 MSMQ 绑定一样。如果所提供的服务类型是一个单例,BufferedServiceBusHost<T> 会认可这一点,将所有消息发送到同一服务实例的所有缓冲区和端点,就像对待 MSMQ 绑定一样。

BufferedServiceBusHost<T> 会在各个后台工作线程上监控每个指定的缓冲区。如果有消息被存放在缓冲区中,BufferedServiceBusHost<T> 会检索该消息,并将原始 WCF 消息转换成对 IPC 上相应端点的调用。

图 7 提供了 BufferedServiceBusHost<T> 的部分列表,删除了大部分错误处理和安全措施。

图 7 BufferedServiceBusHost<T> 的部分列表

public class BufferedServiceBusHost<T> : 
  ServiceHost<T>,IServiceBusProperties 
{
  Uri[] m_BufferAddresses;
  List<Thread> m_RetrievingThreads;
  IChannelFactory<IDuplexSessionChannel>
    m_Factory;
  Dictionary<string,IDuplexSessionChannel> 
    m_Proxies;

  const string CloseAction = 
    "BufferedServiceBusHost.CloseThread";

  public BufferedServiceBusHost(params Uri[] 
    bufferAddresses)
  {
    m_BufferAddresses = bufferAddresses;
    Binding binding = new NetNamedPipeBinding();
    binding.SendTimeout = TimeSpan.MaxValue;

    Type[] interfaces = 
      typeof(T).GetInterfaces();

    foreach(Type interfaceType in interfaces)
    {         
      VerifyOneway(interfaceType);
      string address = 
        @"net.pipe://localhost/" + Guid.NewGuid();
      AddServiceEndpoint(interfaceType,binding,
        address);
    }
    m_Factory = 
      binding.BuildChannelFactory
      <IDuplexSessionChannel>();
    m_Factory.Open();
  }
  protected override void OnOpened()
  {
    CreateProxies();                       
    CreateListeners();
    base.OnOpened();
  }
  protected override void OnClosing()
  {
    CloseListeners();

    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }

    m_Factory.Close();

    PurgeBuffers();

    base.OnClosing();
  }

  // Verify all operations are one-way
  
  void VerifyOneway(Type interfaceType)
  {...}
  void CreateProxies()
  {
    m_Proxies = 
      new Dictionary
      <string,IDuplexSessionChannel>();

    foreach(ServiceEndpoint endpoint in 
      Description.Endpoints)
    {
      IDuplexSessionChannel channel = 
        m_Factory.CreateChannel(endpoint.Address);
      channel.Open();
      m_Proxies[endpoint.Contract.Name] = 
        channel;
    }
  }

  void CreateListeners()
  {
    m_RetrievingThreads = new List<Thread>();

    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {         ?      ServiceBusHelper.VerifyBuffer(
        bufferAddress.AbsoluteUri,m_Credential);
         
      Thread thread = new Thread(Dequeue);

      m_RetrievingThreads.Add(thread);
      thread.IsBackground = true;
      thread.Start(bufferAddress);
    }
  }

  void Dequeue(object arg)
  {
    Uri bufferAddress = arg as Uri;

    MessageBufferClient bufferClient = ?      MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);      
    while(true)
    {
      Message message = 
        bufferClient.Retrieve(TimeSpan.MaxValue);
      if(message.Headers.Action == CloseAction)
      {
        return;
      }
      else
      {
        Dispatch(message);
      }      
    }
  }
   
  
  
  void Dispatch(Message message)
  {
    string contract = ExtractContract(message);
    m_Proxies[contract].Send(message);
  }
  string ExtractContract(Message message)
  {
    string[] elements = 
      message.Headers.Action.Split('/');
    return elements[elements.Length-2];         
  }
  protected override void OnClosing()
  {
    CloseListeners();
    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }
    m_Factory.Close();

    PurgeBuffers();
    base.OnClosing();
  }
  void SendCloseMessages()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      MessageBufferClient bufferClient =                 ?        MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);
      Message message =   
        Message.CreateMessage(
        MessageVersion.Default,CloseAction);
      bufferClient.Send(message);
    }   
  }
  void CloseListeners()
  {
    SendCloseMessages();

    foreach(Thread thread in m_RetrievingThreads)
    {
      thread.Join();
    }
  }   

  [Conditional("DEBUG")]
  void PurgeBuffers()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      ServiceBusHelper.PurgeBuffer(
        bufferAddress,m_Credential);
    }
  } 
}

BufferedServiceBusHost<T> 将代理存储在本地 IPC 端点名为 m_Proxies 的字典中:

Dictionary<string,IDuplexSessionChannel> m_Proxies;

进入字典的钥匙是端点的约定类型名称。

构造函数会存储所提供的缓冲区地址,然后使用反射来获取此服务类型上所有接口的集合。对于每一个接口,BufferedServiceBusHost<T> 会验证它只有单向操作,然后调用基 AddServiceEndpoint 为该约定类型添加端点。地址是 IPC 地址,将 GUID 用作管道名称。该构造函数使用 IPC 绑定建立类型 IChannelFactory<IDuplexSessionChannel> 的通道工厂。IChannelFactory<T> 用于创建绑定上的非强类型化通道:

public interface IChannelFactory<T> : IChannelFactory
{
  T CreateChannel(EndpointAddress to);
  // More members
}

打开内部主机及其所有 IPC 端点后,OnOpened 方法将创建这些端点的内部代理和缓冲的侦听器。这两个步骤是 BufferedServiceBusHost<T> 的核心部分。为创建代理,它将循环访问端点的集合。它会获取每个端点的地址,并使用 IChannelFactory<IDuplexSessionChannel> 针对该地址创建通道。然后该通道(或代理)将存储在字典中。CreateListeners 方法会循环访问指定的缓冲区地址。对于每一个地址,它会验证缓冲区,并创建工作线程,从队列中取消消息。

该 Dequeue 方法使用 MessageBufferClient 检索无限循环中的消息,并使用 Dispatch 方法对它们进行调度。Dispatch 从消息中提取目标约定名称,并用它来从代理字典中查找 IDuplexChannel,然后通过 IPC 发送消息。IDuplexChannel 受底层的 IPC 通道支持,为发送原始消息提供了途径:

public interface IOutputChannel : ...
{
  void Send(Message message,TimeSpan timeout);
  // More members
}
public interface IDuplexSessionChannel : IOutputChannel,...
{}

如果在 IPC 调用过程中发生了错误,BufferedServiceBusHost<T> 会针对该端点重建它所管理的通道(这一点未在图 7 中显示出来)。当您关闭主机时,您需要关闭代理。该操作将正常等待进展中的调用完成。因为 MessageBufferClient.Retrieve 是阻塞操作,没有内置的方法来中止它,所以如何正常关闭所有检索线程是个问题。解决方案是将特殊的私有消息张贴到每个监控的缓冲区,其操作会指示检索线程退出。这正是 SendCloseMessages 方法要进行的操作。该 CloseListeners 方法会将私有消息张贴到缓冲区,然后通过加入它们等待所有侦听线程终止。关闭侦听线程会停止将消息馈送到内部代理,一旦代理被关闭(当所有目前正在进行的调用已返回时),主机将准备关闭。BufferedServiceBusHost<T> 还支持非正常 Abort 方法,该方法将完全中止所有线程(未在图 7 中显示出来)。

最后,请注意 BufferedServiceBusHost<T> 支持接口 IServiceBusProperties,我的定义如下:

public interface IServiceBusProperties
{
  TransportClientEndpointBehavior Credential
  {get;set;}

  Uri[] Addresses
  {get;}
}

在构建我的框架中的几个地方(尤其是在简化缓冲方面),我需要这类接口。我针对客户端,编写了类 BufferedServiceBusClient<T>,定义如下:

public abstract class BufferedServiceBusClient<T> :                          
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties 
{
  // Buffer address from config
  public BufferedServiceBusClient() 
  {}
  // No need for config file
  public BufferedServiceBusClient(Uri bufferAddress);


  /* Additional constructors with different credentials */  
  protected virtual void Enqueue(Action action);
}

BufferedServiceBusClient<T> 派生自我的 HeaderClientBase<T,H>(用来在消息头中传递信息的帮助程序代理。请参阅我 2007 年 11 月的文章“WCF 中的同步环境”,网址为 msdn.microsoft.com/magazine/cc163321): 

public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T> 
                                              where T : class
{
  protected H Header
  {get;set;}

  // More members
}

该基类的目的是支持响应服务,如下面一节中所述。对于纯客户端的缓冲服务,这种推导无关大局。

无论有没有客户端配置文件,您都可以使用 BufferedServiceBusClient<T>。接受缓冲区地址的构造函数不需要配置文件。该无参数构造函数或接受端点名称的构造函数要求配置文件包含与带有单向中继绑定的约定类型相匹配的端点(尽管该绑定会被 BufferedServiceBusClient<T> 完全忽略)。

如果您的代理派生于 BufferedServiceBusClient<T>,您需要使用受保护的 Enqueue 方法,而不是直接使用 Channel 属性:

[ServiceContract]
interface IMyContract
{
  [OperationContract(IsOneWay = true)]
  void MyMethod(int number);
}

class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
  public void MyMethod(int number)
  {
    Enqueue(()=>Channel.MyMethod(number));
  }
}

Enqueue 接受覆盖了对 Channel 属性使用的委托(或 lambda 表达式)。结果仍属于类型安全。图 8 显示了 BufferedServiceBusClient<T> 类的部分列表。

图 8 BufferedServiceBusClient<T> 的部分列表

public abstract class BufferedServiceBusClient<T> :                               
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties where T : class
{
  MessageBufferClient m_BufferClient;

  public BufferedServiceBusClient(Uri bufferAddress) : 
    base(new NetOnewayRelayBinding(),new EndpointAddress(bufferAddress)) 
  {}

  protected virtual void Enqueue(Action action) 
  {
    try
    {
      action();
    }
    catch(InvalidOperationException exception)
    {
      Debug.Assert(exception.Message ==
        "This message cannot support the operation " +
        "because it has been written.");
    }
  }
  protected override T CreateChannel()
  {    
    ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);
    m_BufferClient =  ?      MessageBufferClient.GetMessageBuffer(Credential,m_BufferAddress);

    return base.CreateChannel();   
  }
  protected override void PreInvoke(ref Message request)
  {
    base.PreInvoke(ref request);       
           
    m_BufferClient.Send(request);
  }
  protected TransportClientEndpointBehavior Credential
  {
    get
    {...}
    set
    {...}
  }
}

对 BufferedServiceBusClient<T> 的构造函数向其基构造函数提供缓冲区地址和绑定,这一直是执行单向操作验证的单向中继绑定。该 CreateChannel 方法会验证目标缓冲区存在,并且获得了代表它的 MessageBufferClient。BufferedServiceBusClient<T> 的核心是 PreInvoke 方法。PreInvoke 是一个由 HeaderClientBase<T,H> 的基类 InterceptorClientBase<T> 提供的虚拟方法:

public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class
{
  protected virtual void PreInvoke(ref Message request);
  // Rest of the implementation 
}

PreInvoke 让您在 WCF 消息由客户端调度之前,轻松处理消息。BufferedServiceBusClient<T> 覆盖了 PreInvoke,并使用缓冲区客户端将消息发送到缓冲区。这样,客户端会维护结构化的编程模型,且 BufferedServiceBusClient<T> 会封装与 WCF 消息的互动。缺点是该消息只能发送一次,当 ClientBase 根类尝试发送它时,会引发 InvalidOperationException。这正是 Enqueue 大显身手的时候,它可以平息该异常。 

响应服务

在我 2007 年 2 月的专栏文章“构建排队 WCF 响应服务”(msdn.microsoft.com/magazine/cc163482) 中,我解释说,收到排队调用的结果(或错误)的唯一方法是使用排队响应服务。我展示了如何在消息头中传递包含逻辑方法 ID 和响应地址的响应上下文对象:

[DataContract]
public class ResponseContext
{
  [DataMember]
  public readonly string ResponseAddress;

  [DataMember]
  public readonly string MethodId;

  public ResponseContext(string responseAddress,string methodId);

  public static ResponseContext Current
  {get;set;}

  // More members 
}

处理缓冲时适用同一设计模式。客户端需要为缓冲响应的服务提供专门的响应缓冲区。客户端还需要通过消息头传递响应地址和方法 ID,如同对基于 MSMQ 的调用一样。如图 9 所示,基于 MSMQ 的响应服务与服务总线之间的主要区别是,响应缓冲区必须也驻留在服务总线中。

Figure 9 Service Bus Buffered Response Service 图 9 服务总线缓冲的响应服务

为了简化客户端,我编写了类 ClientBufferResponseBase<T>,定义如下:

 

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  protected readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress);

  /* Additional constructors with different credentials */
     
  protected virtual string GenerateMethodId();
}

ClientBufferResponseBase<T> 是专属于 BufferedServiceBusClient<T> 的子类,它还在消息头中增加了响应上下文。正是基于此原因,我的 BufferedServiceBusClient<T> 是派生自 HeaderClientBase<T,H>,而不仅仅是派生自 InterceptorClientBase<T>。如图 10 所示,您可以像使用 BufferedServiceBusClient 一样来使用 ClientBufferResponseBase<T>。

图 10 简化客户端

[ServiceContract]
interface ICalculator
{
  [OperationContract(IsOneWay = true)]
  void Add(int number1,int number2);
}

class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator
{
  public CalculatorClient(Uri responseAddress) : base(responseAddress)
  {}
   
  public void Add(int number1,int number2)
  {
     Enqueue(()=>Channel.Add(number1,number2));
  }
}

使用 ClientBufferResponseBase<T> 的子类非常简单:

Uri resposeAddress = 
  new Uri(@"sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/");

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
proxy.Close();

在管理客户端的响应方面,让调用客户端获取用来调度此呼叫的方法 ID 很方便。通过 Header 属性很容易完成这一点:

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;

图 11 列出了对 ClientBufferResponseBase<T> 的实施方式。ClientBufferResponseBase<T> 覆盖了 HeaderClientBase<T,H> 的 PreInvoke 方便,这样它就可以为每个呼叫生成新的方法 ID,并将其设置到标头中。

图 11 实现 ClientBufferResponseBase<T>

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  public readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress)
  {
    ResponseAddress = responseAddress;
  }
   
  /* More Constructors */

  protected override void PreInvoke(ref Message request)
  {
    string methodId = GenerateMethodId();
    Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId);         
    base.PreInvoke(ref request);
  }

  protected virtual string GenerateMethodId()
  {
    return Guid.NewGuid().ToString();
  }

  // Rest of the implementation 
}

图 12 所示,为了简化缓冲服务调用响应服务所需的工作,我编写了类 ServiceBufferResponseBase<T>。

图 12 ServiceBufferResponseBase<T> 类

public abstract class ServiceBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class 
{
  public ServiceBufferResponseBase() : 
   base(new Uri(ResponseContext.Current.ResponseAddress))
 {
   Header = ResponseContext.Current;
               
   // Grab the credentials the host was using 

   IServiceBusProperties properties = 
     OperationContext.Current.Host as IServiceBusProperties;
   Credential = properties.Credential;
  }
}

虽然这项服务可以完全使用 BufferedServiceBusClient<T> 将响应排入队列,您还是需要从标头提取响应缓冲区地址,并以某种方式获取登录到服务总线缓冲区的凭据。您还需要向传出调用的标头提供响应上下文。所有这些步骤都可以通过 ServiceBufferResponseBase<T> 得到简化。ServiceBufferResponseBase<T> 会从响应上下文中向基本构造函数提供地址,还会将上下文设置到传出标头中。

ServiceBufferResponseBase<T> 作出的另一种简化假设是响应服务可以使用它的主机用于从自己的缓冲区检索信息的同一凭据将消息发送到响应缓冲。为此,ServiceBufferResponseBase<T> 从操作上下文获得对自己主机的引用,并使用主机的 IServiceBusProperties 实现读取凭据。ServiceBufferResponseBase<T> 会复制这些凭据以供自用(在 BufferedServiceBusClient<T> 内完成)。当然,这首先会强制使用 BufferedServiceBusHost<T>。您的服务需要从 ServiceBufferResponseBase<T> 派生服务代理类并将其用于响应。例如,假设此服务约定如下所示:

[ServiceContract]
interface ICalculatorResponse
{
  [OperationContract(IsOneWay = true)]
  void OnAddCompleted(int result,ExceptionDetail error);
}
This would be the definition of the proxy to the response service:
class CalculatorResponseClient :    
  ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    Enqueue(()=>Channel.OnAddCompleted(result,error));
  }
}

图 13 说明了响应其客户端的简单缓冲服务。

图 13 使用 ServiceBufferResponseBase<T>

class MyCalculator : ICalculator
{
  [OperationBehavior(TransactionScopeRequired = true)]
  public void Add(int number1,int number2)
  {
    int result = 0;
    ExceptionDetail error = null;
    try
    {
      result = number1 + number2;
    }
     // Don’t rethrow 
    catch(Exception exception)
    {
      error = new ExceptionDetail(exception);
    }
    finally
    {
      CalculatorResponseClient proxy = new CalculatorResponseClient();
      proxy.OnAddCompleted(result,error);
      proxy.Close();
    }
  }
}

如下所示,所有的响应服务都需要从消息头访问方法 ID:

class MyCalculatorResponse : ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    string methodId = ResponseContext.Current.MethodId;
    ...
  }
}

我们会进一步研究服务总线,敬请关注。  

Juval Lowy 是 IDesign 的一名软件架构师,该公司提供 WCF 培训和 WCF 体系结构咨询。本文中的部分内容摘自他最近编写的《Programming WCF Services, Third Edition》(O’Reilly,2010)一书。另外,他还是 Microsoft 硅谷地区的区域总监。您可以通过 idesign.net 与 Lowy 联系。

衷心感谢以下技术专家审阅本文:Jeanne Baker