May 2010

Volume 25 Number 05

Foundations - Service Bus Buffers

By Juval Lowy | May 2010

In my October 2009 column, “Routers in the Service Bus” (msdn.microsoft.com/magazine/ee335696), I presented the likely future direction of the Azure Service Bus—becoming the ultimate interceptor. I presented the routers feature and promised to write about queues next.

Since then, both routers and queues have been postponed to the second release of the service bus, and instead—for now—the service bus will provide buffers. Future releases will likely add logging, diagnostic and various instrumentation options. I will visit those aspects in a future article. In this article, I’ll describe the buffers aspect, and also show you some advanced Windows Communication Foundation (WCF) programming techniques.

Service Bus Buffers

In the service bus, every URI in the service namespace is actually an addressable messaging junction. The client can send a message to that junction, and the junction can relay it to the services. However, each junction can also function as a buffer (see Figure 1).

Figure 1 Buffers in the Service Bus

Figure 1 Buffers in the Service Bus

The messages are stored in the buffer for a configurable period of time, even when no service is monitoring the buffer. Note that multiple services can monitor the buffer, but unless you explicitly peek and lock the message, only one of them will be able to retrieve a message.

The client is decoupled from the services behind the buffer, and the client and service need not be running at the same time. Because the client interacts with a buffer and not with an actual service endpoint, all the messages are one-way, and there is no way (out of the box) to obtain the results of the message invocation or any errors.

The service bus buffers should not be equated with queues, such as Microsoft Message Queuing (MSMQ) queues or WCF queued services; they have a number of crucial differences:

  • The service bus buffers are not durable, and the messages are stored in memory. This implies a risk of losing messages in the (somewhat unlikely) event of a catastrophic failure of the service bus itself.
  • The service bus buffers are not transactional; neither sending nor retrieving messages can be done as part of a transaction.
  • The buffers can’t handle long-lasting messages. The service must retrieve a message from the buffer within 10 minutes or the message is discarded. Although the WCF MSMQ-based messages also feature a time-to-live, that period is much longer, defaulting to one day. This enables a far broader range of truly disjointed operations and disconnected applications.
  • The buffers are limited in size and can’t hold more than 50 messages.
  • The buffered messages are capped in size, at 64KB each. 
Although MSMQ also imposes its own maximum message size, it’s substantially larger (4MB per message).

Thus buffers do not provide true queued calls over the cloud; rather, they provide for elasticity in the connection, with calls falling somewhere in between queued calls and fire-and-forget asynchronous calls.

There are two scenarios where buffers are useful. One is an application where the client and the service are interacting over a shaky connection, and dropping the connection and picking it up again is tolerated as long as the messages are buffered during the short offline period. A second (and more common) scenario is a client issuing asynchronous one-way calls and utilizing a response buffer (as described later in the Response Service section) to handle the results of the calls. Such interaction is like viewing the network connection more as a bungee cord rather than a rigid network wire that has no storage capacity.

Working with Buffers

The buffer address must be unique; you can have only a single buffer associated with an address and the address can’t already be used by a buffer or a service. However, multiple parties can retrieve messages from the same buffer. In addition, the buffer address must use either HTTP or HTTPS for the scheme. To send and retrieve messages from the buffer, the service bus offers an API similar to that of System.Messaging; that is, it requires you to interact with raw messages. The service bus administrator manages the buffers independently of services or clients. Each buffer must have a policy governing its behavior 
and lifetime. Out of the box, the service bus administrator must perform programmatic calls to create and manage buffers.

Each buffer policy is expressed via an instance of the MessageBufferPolicy class as shown in Figure 2.

Figure 2 The MessageBufferPolicy Class

[DataContract]
public class MessageBufferPolicy : ...
{
  public MessageBufferPolicy();
  public MessageBufferPolicy(MessageBufferPolicy policyToCopy);

  public DiscoverabilityPolicy Discoverability
  {get;set;}

  public TimeSpan ExpiresAfter
  {get;set;}

  public int MaxMessageCount
  {get;set;}

  public OverflowPolicy OverflowPolicy
  {get;set;}

  public AuthorizationPolicy Authorization
  {get;set;} 
  
  public TransportProtectionPolicy TransportProtection
  {get;set;}
}

The Discoverability policy property is an enum of the type DiscoverabilityPolicy, controlling whether or not the buffer is included in the service bus registry (the ATOM feed):

public enum DiscoverabilityPolicy
{
  Managers,
  ManagersListeners,
  ManagersListenersSenders,
  Public 
}

Discoverability defaults to DiscoverabilityPolicy.Managers, which means it requires a managed authorization claim. Setting it to DiscoverabilityPolicy.Public publishes it to the feed without any authorization.

The ExpiresAfter property controls the lifetime of messages in the buffer. The default is five minutes, the minimum value is one minute and the maximum allowed value is 10 minutes. Any attempt to configure a longer lifetime is silently ignored.

The MaxMessageCount property caps the buffer size. The policy defaults to 10 messages, and the minimum value is, of course, set to one. As mentioned already, the maximum buffer size is 50, and attempts to configure a larger size are silently ignored.

The OverflowPolicy property is an enum with a single value defined as:

public enum OverflowPolicy
{
  RejectIncomingMessage
}

OverflowPolicy controls what to do with the message when the buffer is maxed out; that is, when it’s already filled to capacity (defined by MaxMessageCount). The only possible option is to reject the message—send it back with an error to the sender.

The single-value enum serves as a placeholder for future options, such as discarding the message without informing the sender or removing messages from the buffer and accepting the new message.

The last two properties are responsible for security configuration. The AuthorizationPolicy property instructs the service bus whether or not to authorize the client’s token:

public enum AuthorizationPolicy
{
  NotRequired,
  RequiredToSend,
  RequiredToReceive,
  Required
}

The default value of AuthorizationPolicy.Required requires authorizing both sending and receiving clients. 

Finally, the TransportProtection property stipulates the minimum level of transfer security for the message to the buffer, using an enum of the type TransportProtectionPolicy:

public enum TransportProtectionPolicy
{
  None,
  AllPaths,
}

Transport security via TransportProtectionPolicy.AllPaths is the default for all buffer policies, and it mandates the use of an HTTPS address.

You can use the MessageBufferClient class to administer your buffer, as shown in Figure 3.

Figure 3 The MessageBufferClient Class

public sealed class MessageBufferClient
{
  public Uri MessageBufferUri
  {get;}

  public static MessageBufferClient CreateMessageBuffer(
    TransportClientEndpointBehavior credential,
    Uri messageBufferUri,MessageBufferPolicy policy);

  public static MessageBufferClient GetMessageBuffer(
    TransportClientEndpointBehavior credential,Uri messageBufferUri);
  public MessageBufferPolicy GetPolicy();
  public void DeleteMessageBuffer();

  // More members   
}

You use the static methods of MessageBufferClient to obtain an authenticated instance of MessageBufferClient by providing the static methods with the service bus credentials (of the type TransportClientEndpointBehavior). Whenever using MessageBufferClient, you typically need to check if the buffer already exists in the service bus by calling the GetMessageBuffer method. If there’s no buffer, GetMessageBuffer throws an exception.

Here’s how to create a buffer programmatically:

Uri bufferAddress = 
  new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer/");

TransportClientEndpointBehavior credential = ...

MessageBufferPolicy bufferPolicy = new MessageBufferPolicy();

bufferPolicy.MaxMessageCount = 12;
bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3);
bufferPolicy.Discoverability = DiscoverabilityPolicy.Public;

MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,
  bufferPolicy);

In this example, you instantiate a buffer policy object and set the policy to some desired values. All it takes to install the buffer is calling the CreateMessageBuffer method of MessageBufferClient with the policy and some valid credentials.

As an alternative to programmatic calls, you can use my Service Bus Explorer (presented in my routers article and also available online with the sample code for this article) to both view and modify buffers. Figure 4 shows how to create a new buffer by specifying its address and various policy properties. In much the same way, you can also delete all buffers in the service namespace.

Figure 4 Creating a Buffer Using the Service Bus Explorer
Figure 4 Creating a Buffer Using the Service Bus Explorer

You can also review and modify the policies of existing buffers, purge messages from the buffer and even delete a buffer by selecting the buffer in the service namespace tree and interacting with the buffer properties in the right pane, as shown in Figure 5.

Figure 5 A Buffer in the Service Bus Explorer
Figure 5 A Buffer in the Service Bus Explorer

Streamlining Administration

When creating buffers, it’s best to maximize both the buffer size and its lifespan, to give the clients and services more time to interact. Moreover, it’s a good idea to make the buffer discoverable so you can view it on the service bus registry. When it comes to using the buffer, both the client and the service should verify that the buffer is already created, or else proceed to create it.

To automate these steps, I created the ServiceBusHelper class:

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,string secret);
  public static void CreateBuffer(string bufferAddress,string issuer,
    string secret);

  public static void VerifyBuffer(string bufferAddress,string secret);
  public static void VerifyBuffer(string bufferAddress,string issuer,
    string secret);
  public static void PurgeBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential);
  public static void DeleteBuffer(Uri bufferAddress,
    TransportClientEndpointBehavior credential); 
}

The CreateBuffer method creates a new discoverable buffer with a maximum capacity of 50 messages and a duration of 10 minutes. If the buffer already exists, CreateBuffer deletes the old buffer. The VerifyBuffer method verifies that a buffer exists and, if it doesn’t, creates a new buffer. PurgeBuffer is useful for purging all buffered messages during diagnostics or debugging. DeleteBuffer simply deletes the buffer. Figure 6 shows partial listing of the implementation of these methods.

Figure 6 Partial Listing of the Buffer Helper Methods

public static partial class ServiceBusHelper
{    
  public static void CreateBuffer(string bufferAddress,
    string issuer,string secret)
  {
    TransportClientEndpointBehavior credentials = ...;
    CreateBuffer(bufferAddress,credentials);
  }
  static void CreateBuffer(string bufferAddress,
    TransportClientEndpointBehavior credentials)
  {
    MessageBufferPolicy policy = CreateBufferPolicy();
    CreateBuffer(bufferAddress,policy,credentials);
  }
  static internal MessageBufferPolicy CreateBufferPolicy()
  {
    MessageBufferPolicy policy = new MessageBufferPolicy();                
    policy.Discoverability = DiscoverabilityPolicy.Public;
    policy.ExpiresAfter = TimeSpan.Fromminutes(10);
    policy.MaxMessageCount = 50;

    return policy;
  }
   public static void PurgeBuffer(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     Debug.Assert(BufferExists(bufferAddress,credentials));
     MessageBufferClient client = 
       MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
     MessageBufferPolicy policy = client.GetPolicy();
     client.DeleteMessageBuffer();
        
     MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,policy);
   }
   public static void VerifyBuffer(string bufferAddress,
     string issuer,string secret)
   {
     TransportClientEndpointBehavior credentials = ...;
     VerifyBuffer(bufferAddress,credentials);
   }
   internal static void VerifyBuffer(string bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     if(BufferExists(bufferAddress,credentials))
     {
       return;
     }
     CreateBuffer(bufferAddress,credentials);
   }
   internal static bool BufferExists(Uri bufferAddress,
     TransportClientEndpointBehavior credentials)
   {
     try
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
       client.GetPolicy();
       return true;
     }
     catch(FaultException)
     {}
      
     return false;
   }
   static void CreateBuffer(string bufferAddress,
     MessageBufferPolicy policy,
     TransportClientEndpointBehavior credentials)
   {   
     Uri address = new Uri(bufferAddress);
     if(BufferExists(address,credentials))
     {
       MessageBufferClient client = 
         MessageBufferClient.GetMessageBuffer(credentials,address);
       client.DeleteMessageBuffer();
     }  
     MessageBufferClient.CreateMessageBuffer(credentials,address,policy);
   }
}

The BufferExists method uses the GetPolicy method of MessageBufferClient to see if a buffer exists, and it interprets an error as an indication that the buffer does not exist. Purging a buffer is done by copying its policy, deleting the buffer and creating a new buffer (with the same address) with the old policy.

Sending and Retrieving Messages

As mentioned already, the service bus buffers require interactions with raw WCF messages. This is done with the Send and Retrieve methods of MessageBufferClient (obtained when creating or getting a buffer):

public sealed class MessageBufferClient
{
  public void Send(Message message);
  public void Send(Message message,TimeSpan timeout);

  public Message Retrieve();
  public Message Retrieve(TimeSpan timeout);

  // More members
}

Both methods are subject to a timeout that defaults to one minute for the parameter-less versions. For the sender, the timeout means how long to wait in case the buffer is full. For the retriever, the timeout means how long to wait in case the buffer is empty.

Here’s the sender-side code for sending raw messages to the buffer:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client =   
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);

Message message = Message.CreateMessage(MessageVersion.Default,"Hello");

client.Send(message,TimeSpan.MaxValue);

The sender first creates a credentials object and uses it to obtain an instance of MessageBufferClient. The sender then creates a WCF message and sends it to the buffer. Here is the retrieving-side code for retrieving raw messages from the buffer:

TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");

MessageBufferClient client = 
  MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = client.Retrieve();

Debug.Assert(message.Headers.Action == "Hello");

Buffered Services

Using raw WCF messages as in the preceding code snippets is what the service bus has to offer. And yet, such a programming model leaves much to be desired. It’s cumbersome, tedious, non-structured, not object-oriented and not type safe. It’s a throwback to the days before WCF itself, with explicit programming against MSMQ using the System.Messaging API. You need to parse the message content and switch on its elements.

Fortunately, you can improve on the basic offering. Instead of interacting with raw messages, you should elevate the interaction to structured calls between clients and services. Although this requires a considerable degree of low-level advanced work, I was able to encapsulate it with a small set of helper classes.

To provide for structured buffered calls on the service side, I wrote BufferedServiceBusHost<T> defined as:

// Generic type parameter based host
public class ServiceHost<T> : ServiceHost
{...}

public class BufferedServiceBusHost<T> : ServiceHost<T>,...
{
  public BufferedServiceBusHost(params Uri[] bufferAddresses);
  public BufferedServiceBusHost(
    T singleton,params Uri[] bufferAddresses);

  /* Additional constructors */
}

I modeled BufferedServiceBusHost<T> after using WCF with the MSMQ binding. You need to provide its constructor with the address or addresses of the buffers to retrieve messages from. The rest is just as with a regular WCF service host:

Uri buffer = new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(buffer);
host.Open();

Note that you can provide the constructors with multiple buffer addresses to monitor, just like a WCF service host can open multiple endpoints with different queues. There’s no need (or way) to provide any of these buffer addresses in the service endpoint section in the config file (although the buffer addresses can come from the app settings section if you so design).

While the actual communication with the service bus buffer is done with raw WCF messages, that work is encapsulated. BufferedServiceBusHost<T> will verify that the buffers provided actually exist and will create them if they don’t, using the buffer policy of ServiceBusHelper.VerifyBuffer shown in Figure 6. BufferedServiceBusHost<T> will use the default transfer security of securing all paths. It will also verify that the contracts of the provided service generic type parameter T are all one-way; that is, they all have only one-way operations (just as the one-way relay binding does). One last feature: when closing the host, in debug builds only, BufferedServiceBusHost<T> will purge all its buffers to ensure a smooth start for the next debug session.

BufferedServiceBusHost<T> operates by hosting the specified service locally. For each service contract on the type parameter T, BufferedServiceBusHost<T> adds an endpoint over IPC (named pipes). The IPC binding to those endpoints is configured to never time out.

Although IPC always has a transport session, to mimic MSMQ behavior even per-session services are treated as per-call services. Each dequeued WCF message is played to a new instance of the service, potentially concurrently with previous messages, just as with the MSMQ binding. If the provided service type is a singleton, BufferedServiceBusHost<T> respects that and will send all messages across all buffers and endpoints to the same service instance, just as with the MSMQ binding.

BufferedServiceBusHost<T> monitors each specified buffer on the separate background worker thread. When a message is deposited in the buffer, BufferedServiceBusHost<T> retrieves it and converts the raw WCF message into a call to the appropriate endpoint over IPC.

Figure 7 provides a partial listing of BufferedServiceBusHost<T>, with most of the error handling and security removed.

Figure 7 Partial Listing of BufferedServiceBusHost<T>

public class BufferedServiceBusHost<T> : 
  ServiceHost<T>,IServiceBusProperties 
{
  Uri[] m_BufferAddresses;
  List<Thread> m_RetrievingThreads;
  IChannelFactory<IDuplexSessionChannel>
    m_Factory;
  Dictionary<string,IDuplexSessionChannel> 
    m_Proxies;

  const string CloseAction = 
    "BufferedServiceBusHost.CloseThread";

  public BufferedServiceBusHost(params Uri[] 
    bufferAddresses)
  {
    m_BufferAddresses = bufferAddresses;
    Binding binding = new NetNamedPipeBinding();
    binding.SendTimeout = TimeSpan.MaxValue;

    Type[] interfaces = 
      typeof(T).GetInterfaces();

    foreach(Type interfaceType in interfaces)
    {         
      VerifyOneway(interfaceType);
      string address = 
        @"net.pipe://localhost/" + Guid.NewGuid();
      AddServiceEndpoint(interfaceType,binding,
        address);
    }
    m_Factory = 
      binding.BuildChannelFactory
      <IDuplexSessionChannel>();
    m_Factory.Open();
  }
  protected override void OnOpened()
  {
    CreateProxies();                       
    CreateListeners();
    base.OnOpened();
  }
  protected override void OnClosing()
  {
    CloseListeners();

    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }

    m_Factory.Close();

    PurgeBuffers();

    base.OnClosing();
  }

  // Verify all operations are one-way
  
  void VerifyOneway(Type interfaceType)
  {...}
  void CreateProxies()
  {
    m_Proxies = 
      new Dictionary
      <string,IDuplexSessionChannel>();

    foreach(ServiceEndpoint endpoint in 
      Description.Endpoints)
    {
      IDuplexSessionChannel channel = 
        m_Factory.CreateChannel(endpoint.Address);
      channel.Open();
      m_Proxies[endpoint.Contract.Name] = 
        channel;
    }
  }

  void CreateListeners()
  {
    m_RetrievingThreads = new List<Thread>();

    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {         ?      ServiceBusHelper.VerifyBuffer(
        bufferAddress.AbsoluteUri,m_Credential);
         
      Thread thread = new Thread(Dequeue);

      m_RetrievingThreads.Add(thread);
      thread.IsBackground = true;
      thread.Start(bufferAddress);
    }
  }

  void Dequeue(object arg)
  {
    Uri bufferAddress = arg as Uri;

    MessageBufferClient bufferClient = ?      MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);      
    while(true)
    {
      Message message = 
        bufferClient.Retrieve(TimeSpan.MaxValue);
      if(message.Headers.Action == CloseAction)
      {
        return;
      }
      else
      {
        Dispatch(message);
      }      
    }
  }
   
  
  
  void Dispatch(Message message)
  {
    string contract = ExtractContract(message);
    m_Proxies[contract].Send(message);
  }
  string ExtractContract(Message message)
  {
    string[] elements = 
      message.Headers.Action.Split('/');
    return elements[elements.Length-2];         
  }
  protected override void OnClosing()
  {
    CloseListeners();
    foreach(IDuplexSessionChannel proxy in 
      m_Proxies.Values)
    {
      proxy.Close();
    }
    m_Factory.Close();

    PurgeBuffers();
    base.OnClosing();
  }
  void SendCloseMessages()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      MessageBufferClient bufferClient =                 ?        MessageBufferClient.GetMessageBuffer(
        m_Credential,bufferAddress);
      Message message =   
        Message.CreateMessage(
        MessageVersion.Default,CloseAction);
      bufferClient.Send(message);
    }   
  }
  void CloseListeners()
  {
    SendCloseMessages();

    foreach(Thread thread in m_RetrievingThreads)
    {
      thread.Join();
    }
  }   

  [Conditional("DEBUG")]
  void PurgeBuffers()
  {
    foreach(Uri bufferAddress in 
      m_BufferAddresses)
    {
      ServiceBusHelper.PurgeBuffer(
        bufferAddress,m_Credential);
    }
  } 
}

BufferedServiceBusHost<T> stores the proxies to the locally hosted IPC endpoints in a dictionary called m_Proxies:

Dictionary<string,IDuplexSessionChannel> m_Proxies;

The key into the dictionary is the endpoints’ contract type name.

The constructors store the provided buffer addresses and then use reflection to obtain a collection of all the interfaces on the service type. For each interface, BufferedServiceBusHost<T> verifies it has only one-way operations, then calls the base AddServiceEndpoint to add an endpoint for that contract type. The address is an IPC address using a GUID for the pipe’s name. The constructors use the IPC binding to build a channel factory of the type IChannelFactory<IDuplexSessionChannel>. IChannelFactory<T> is used to create a non-strongly typed channel over the binding:

public interface IChannelFactory<T> : IChannelFactory
{
  T CreateChannel(EndpointAddress to);
  // More members
}

After opening the internal host with all its IPC endpoints, the OnOpened method creates the internal proxies to those endpoints and the buffered listeners. These two steps are the heart of BufferedServiceBusHost<T>. To create the proxies, it iterates over the collection of endpoints. It obtains each endpoint’s address and uses the IChannelFactory<IDuplexSessionChannel> to create a channel against that address. That channel (or proxy) is then stored in the dictionary. The CreateListeners method iterates over the specified buffer addresses. For each address, it verifies the buffer and creates a worker thread to dequeue its messages.

The Dequeue method uses a MessageBufferClient to retrieve the messages in an infinite loop and dispatch them using the Dispatch method. Dispatch extracts from the message the target contract name and uses it to look up the IDuplexChannel from the proxies dictionary and send the message over IPC. IDuplexChannel is supported by the underlying IPC channel and it provides for a way to send raw messages:

public interface IOutputChannel : ...
{
  void Send(Message message,TimeSpan timeout);
  // More members
}
public interface IDuplexSessionChannel : IOutputChannel,...
{}

If an error occurred during the IPC call, BufferedServiceBusHost<T> will recreate the channel it manages against that endpoint (not shown in Figure 7). When you close the host, you need to close the proxies. This will gracefully wait for the calls in progress to complete. The problem is how to gracefully close all the retrieving threads, because MessageBufferClient.Retrieve is a blocking operation and there is no built-in way to abort it. The solution is to post to each monitored buffer a special private message whose action signals the retrieving thread to exit. This is what the SendCloseMessages method does. The CloseListeners method posts that private message to the buffers and then waits for all the listening threads to terminate by joining them. Closing the listening threads stops feeding messages to the internal proxies, and once the proxies are closed (when all current calls in progress have returned), the host is ready to shut down. BufferedServiceBusHost<T> also supports an ungraceful Abort method that just aborts all threads (not shown in Figure 7).

Finally, note that BufferedServiceBusHost<T> supports the interface IServiceBusProperties I defined as:

public interface IServiceBusProperties
{
  TransportClientEndpointBehavior Credential
  {get;set;}

  Uri[] Addresses
  {get;}
}

I needed such an interface in a few places in building my framework, especially in streamlining buffering. For the client, I wrote the class BufferedServiceBusClient<T> defined as:

public abstract class BufferedServiceBusClient<T> :                          
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties 
{
  // Buffer address from config
  public BufferedServiceBusClient() 
  {}
  // No need for config file
  public BufferedServiceBusClient(Uri bufferAddress);


  /* Additional constructors with different credentials */  
  protected virtual void Enqueue(Action action);
}

BufferedServiceBusClient<T> derives from my HeaderClientBase<T,H> (a helper proxy used to pass information in the message headers; see my November 2007 article, “Synchronization Contexts in WCF,” available at msdn.microsoft.com/magazine/cc163321): 

public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T> 
                                              where T : class
{
  protected H Header
  {get;set;}

  // More members
}

The purpose of that base class is to support a response service, as discussed in the following section. For a plain client of a buffered service, that derivation is immaterial.

You can use BufferedServiceBusClient<T> with or without a client config file. The constructors that accept the buffer address do not require a config file. The parameter-less constructor or the constructors that accept the endpoint name expect the config file to contain an endpoint matching the contract type with the one-way relay binding (although that binding is completely ignored by  BufferedServiceBusClient<T>).

When deriving your proxy from BufferedServiceBusClient<T>, you will need to use the protected Enqueue method instead of directly using the Channel property:

[ServiceContract]
interface IMyContract
{
  [OperationContract(IsOneWay = true)]
  void MyMethod(int number);
}

class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
  public void MyMethod(int number)
  {
    Enqueue(()=>Channel.MyMethod(number));
  }
}

Enqueue accepts a delegate (or a lambda expression) that wraps the use of the Channel property. The result is still type safe. Figure 8 shows a partial listing of the BufferedServiceBusClient<T> class.

Figure 8 Partial Listing of BufferedServiceBusClient<T>

public abstract class BufferedServiceBusClient<T> :                               
  HeaderClientBase<T,ResponseContext>,IServiceBusProperties where T : class
{
  MessageBufferClient m_BufferClient;

  public BufferedServiceBusClient(Uri bufferAddress) : 
    base(new NetOnewayRelayBinding(),new EndpointAddress(bufferAddress)) 
  {}

  protected virtual void Enqueue(Action action) 
  {
    try
    {
      action();
    }
    catch(InvalidOperationException exception)
    {
      Debug.Assert(exception.Message ==
        "This message cannot support the operation " +
        "because it has been written.");
    }
  }
  protected override T CreateChannel()
  {    
    ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);
    m_BufferClient =  ?      MessageBufferClient.GetMessageBuffer(Credential,m_BufferAddress);

    return base.CreateChannel();   
  }
  protected override void PreInvoke(ref Message request)
  {
    base.PreInvoke(ref request);       
           
    m_BufferClient.Send(request);
  }
  protected TransportClientEndpointBehavior Credential
  {
    get
    {...}
    set
    {...}
  }
}

The constructors of BufferedServiceBusClient<T> supply its base constructor with the buffer address and the binding, which is always a one-way relay binding to enforce the one-way operations validation. The CreateChannel method verifies that the target buffer exists and obtains a MessageBufferClient representing it. The heart of BufferedServiceBusClient<T> is the PreInvoke method. PreInvoke is a virtual method provided by InterceptorClientBase<T>, the base class of HeaderClientBase<T,H>:

public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class
{
  protected virtual void PreInvoke(ref Message request);
  // Rest of the implementation 
}

PreInvoke allows you to easily process the WCF messages before they’re dispatched by the client. BufferedServiceBusClient<T> overrides PreInvoke and uses the buffer client to send the message to the buffer. That way, the client maintains a structured programming model and BufferedServiceBusClient<T> encapsulates the interaction with the WCF message. The downside is that the message can only be sent once, and when the root class of ClientBase tries to send it, it throws an InvalidOperationException. This is where Enqueue comes in handy by snuffing out that exception. 

Response Service

In my February 2007 column, “Build a Queued WCF Response Service” (msdn.microsoft.com/magazine/cc163482), I explained that the only way to receive the result (or errors) of a queued call is to use a queued response service. I showed how to pass in the message headers a response context object that contains the logical method ID and the response address:

[DataContract]
public class ResponseContext
{
  [DataMember]
  public readonly string ResponseAddress;

  [DataMember]
  public readonly string MethodId;

  public ResponseContext(string responseAddress,string methodId);

  public static ResponseContext Current
  {get;set;}

  // More members 
}

The same design pattern holds true when dealing with buffers. The client needs to provide a dedicated response buffer for the service to buffer the response to. The client also needs to pass the response address and the method ID in the message headers, just as with the MSMQ-based calls. The main difference between MSMQ-based response service and the service bus is that the response buffer must also reside in the service bus, as shown in Figure 9.

Figure 9 Service Bus Buffered Response Service Figure 9 Service Bus Buffered Response Service

To streamline the client side, I wrote the class ClientBufferResponseBase<T> defined as:

 

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  protected readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress);

  /* Additional constructors with different credentials */
     
  protected virtual string GenerateMethodId();
}

ClientBufferResponseBase<T> is a specialized subclass of BufferedServiceBusClient<T>, and it adds the response context to the message headers. This is why I made BufferedServiceBusClient<T> derive from HeaderClientBase<T,H> and not merely from InterceptorClientBase<T>. You can use ClientBufferResponseBase<T> just like BufferedServiceBusClient, as shown in Figure 10.

Figure 10 Streamlining the Client Side

[ServiceContract]
interface ICalculator
{
  [OperationContract(IsOneWay = true)]
  void Add(int number1,int number2);
}

class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator
{
  public CalculatorClient(Uri responseAddress) : base(responseAddress)
  {}
   
  public void Add(int number1,int number2)
  {
     Enqueue(()=>Channel.Add(number1,number2));
  }
}

Using the subclass of ClientBufferResponseBase<T> is straightforward:

Uri resposeAddress = 
  new Uri(@"sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/");

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
proxy.Close();

It’s handy when managing the responses on the client side to have the invoking client obtain the method ID used to dispatch the call. This is easily done via the Header property:

CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;

Figure 11 lists the implementation of ClientBufferResponseBase<T>.ClientBufferResponseBase<T> overrides the PreInvoke method of HeaderClientBase<T,H> so that it could generate a new method ID for each call and set it into the headers.

Figure 11 Implementing ClientBufferResponseBase<T>

public abstract class ClientBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class
{
  public readonly Uri ResponseAddress;

  public ClientBufferResponseBase(Uri responseAddress)
  {
    ResponseAddress = responseAddress;
  }
   
  /* More Constructors */

  protected override void PreInvoke(ref Message request)
  {
    string methodId = GenerateMethodId();
    Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId);         
    base.PreInvoke(ref request);
  }

  protected virtual string GenerateMethodId()
  {
    return Guid.NewGuid().ToString();
  }

  // Rest of the implementation 
}

To streamline the work required by the buffered service to call the response service, I wrote the class ServiceBufferResponseBase<T> shown in Figure 12.

Figure 12 The ServiceBufferResponseBase<T> Class

public abstract class ServiceBufferResponseBase<T> : 
  BufferedServiceBusClient<T> where T : class 
{
  public ServiceBufferResponseBase() : 
   base(new Uri(ResponseContext.Current.ResponseAddress))
 {
   Header = ResponseContext.Current;
               
   // Grab the credentials the host was using 

   IServiceBusProperties properties = 
     OperationContext.Current.Host as IServiceBusProperties;
   Credential = properties.Credential;
  }
}

Although the service could use a plain BufferedServiceBusClient<T> to enqueue the response, you will need to extract the response buffer address from the headers and somehow obtain the credentials to log into the service bus buffer. You will also need to provide the headers of the outgoing call with the response context. All these steps can be streamlined with ServiceBufferResponseBase<T>. ServiceBufferResponseBase<T> provides its base constructor with the address out of the response context, and it also sets that context into the outgoing headers.

Another simplifying assumption ServiceBufferResponseBase<T> makes is that the responding service can use the same credentials its host used (to retrieve messages from its own buffer) to send messages to the response buffer. To that end, ServiceBufferResponseBase<T> obtains a reference to its own host from the operation context and reads the credentials using the IServiceBusProperties implementation of the host. ServiceBufferResponseBase<T> copies those credentials for its own use (done inside BufferedServiceBusClient<T>). This, of course, mandates the use of BufferedServiceBusHost<T> to host the service in the first place. Your service needs to derive a proxy class from ServiceBufferResponseBase<T> and use it to respond. For example, given this response contract:

[ServiceContract]
interface ICalculatorResponse
{
  [OperationContract(IsOneWay = true)]
  void OnAddCompleted(int result,ExceptionDetail error);
}
This would be the definition of the proxy to the response service:
class CalculatorResponseClient :    
  ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    Enqueue(()=>Channel.OnAddCompleted(result,error));
  }
}

Figure 13 shows a simple buffered service responding to its client.

Figure 13 Using ServiceBufferResponseBase<T>

class MyCalculator : ICalculator
{
  [OperationBehavior(TransactionScopeRequired = true)]
  public void Add(int number1,int number2)
  {
    int result = 0;
    ExceptionDetail error = null;
    try
    {
      result = number1 + number2;
    }
     // Don’t rethrow 
    catch(Exception exception)
    {
      error = new ExceptionDetail(exception);
    }
    finally
    {
      CalculatorResponseClient proxy = new CalculatorResponseClient();
      proxy.OnAddCompleted(result,error);
      proxy.Close();
    }
  }
}

All the response service needs is to access the method ID from the message headers as shown here:

class MyCalculatorResponse : ICalculatorResponse
{
  public void OnAddCompleted(int result,ExceptionDetail error)
  {
    string methodId = ResponseContext.Current.MethodId;
    ...
  }
}

Stay tuned for further exploration of the service bus.  


Juval Lowy is a software architect with IDesign providing WCF training and architecture consulting. This article contains excerpts from his recent book, “Programming WCF Services, Third Edition” (O’Reilly, 2010). He’s also the Microsoft regional director for the Silicon Valley. Contact Lowy at idesign.net.

Thanks to the following technical expert for reviewing this article: Jeanne Baker