December 2011

Volume 26 Number 12

Forecast:Cloudy - Completing the Trip with AppFabric Queues

By Joseph Fultz | December 2011

Joseph FultzIn the October issue, I touched upon some of the new features in the Azure AppFabric Service Bus (msdn.microsoft.com/magazine/hh456395). This month, I’ll continue with the scenario I started by following the return trip back. So far in the scenario, a store has requested an inventory check from nearby stores by publishing inventory check requests to a Topic. The stores subscribed to the Topic received the requests based on a Filter on the subscription that limited messages to those within their region and not sent by them.

For the return trip, I’ll rely on a couple of features of Azure AppFabric Service Bus Queues. First, there are two important properties to which I’ll assign values on the BrokeredMessage that’s sent to the Topic. The first property to which I’ll assign value is the ReplyTo property, to tell the recipient where to send the message. This will be a specific Queue created by the sender at the time it sends the message. I want to use a Queue instead of a Topic in this case, because the response is going to one recipient, unlike the request pattern, which equated to a broadcast for help to anyone listening.

The new part of the flow of the message is shown in Figure 1, as that which flows to the right of the row of stores.

Inquiry and Response Round-Trip
Figure 1 Inquiry and Response Round-Trip

Updating the Outbound Request Code

In the first pass, I got the message out to the Topic and demonstrated picking it up by the appropriate subscriptions, but two important items were left undone to support a nice, easy response. The first item is setting the CorrelationId property of the BrokeredMessage. The property is part of the API, so I don’t have to include it as a part of my data class or schema. Because it’s a string, it could be something meaningful, but it should be something that will uniquely identify the message such that any response that has a matching CorrelationId can’t be confused to match some other request in the system. For my sample purposes, I use a GUID, but that’s a pretty safe bet in practice, too. Here’s the code:

BrokeredMessage msg = BrokeredMessage.CreateMessage(data);
// Add props to message for filtering
msg.Properties["Region"] = data.Region;
msg.Properties["RequesterID"] = data.RequesterID;
// Set properties for message
msg.TimeToLive = TimeSpan.FromSeconds(30);
msg.ReplyTo = returnQueueName;
msg.CorrelationId = Guid.NewGuid().ToString();

The BrokeredMessage also has a property named SessionId, which I’m not using here. SessionId is another property for logical grouping that’s nice to have at the message envelope level, because it facilitates grouping of messages that are all related. The question arises of how it differs in intent to CorrelationId. There are two scenarios where the SessionId could be particularly useful. The first is in a system where multiple daemons are making various requests that are all related. The CorrelationId would be used to route the response to the requesting processor. The SessionId would be used to group all of the messages sent and received across processing nodes. Such grouping is useful in determining the state of processing in a system for analytics and debugging. For the same reasons, this is a useful construct in a system that makes many requests as part of an overall process (for example, purchase process, checking inventory, payment verification, send to fulfillment and so on), but the exact flow and timing isn’t guaranteed.

Having set the CorrelationId on the outbound message, the next change I need to make is to set the ReplyTo property. I could create another Topic and have all stores monitor for response or use a single queue, but that would create unnecessary traffic and under times of load be more likely to cause a bottleneck. Thus, it makes sense to simply create a response queue at the time of request and let the recipient know where that is. Because this is a string, it could be anything, though I would suggest a fully qualified name to prevent any confusion or collision in future evolutions of the software. You could start with only the queue name, but under maintenance and in expansion, this could lead to confusion as the system starts to support multiple service bus namespaces and subqueues. Additionally, a fully qualified address will be better for recipients not using the Microsoft .NET Framework.

The last two things I do before I send the message off is to create the response queue and start a timer to check for responses. I started with the GetQueue method. At the time of this writing, the documentation (bit.ly/pnByFw) for GetQueue states for the “Return Value” that “type” equals Microsoft.ServiceBus.Messaging.Queue; a Queue handle to the queue; or null if the queue doesn’t exist in the service namespace. However, this isn’t the case. In fact, it will throw an exception:

// Check if-exists and create response queue
try
{
  returnQ = this.ServiceBusNSClient.GetQueue(returnQueueName);
}
catch (System.Exception ex)
{
  Debug.WriteLine(ex.Message);
}
if (returnQ == null)
{
  returnQ = this.ServiceBusNSClient.CreateQueue(returnQueueName);
}
checkQTimer.Enabled = true;

Thus, I’ve wrapped the GetQueue method in a try-catch block and moved on. My sample code, as is typical, does nothing more than write out the error. Once the queue is created, I assign it to a variable so I can reference it for checking the queue and then enable the timer that I set up when the app started.

Updating the Recipient and Responding

With the proper modifications on the sender’s side, I have to make some additions so I can respond to the requests. I’m going to use a mostly hardcoded response, but I’m going to list the messages in a grid so I can select the one to which I’ll reply. I’ve set up an event mechanism for notifying the UI of new messages. Once a message is received, I set the current recipient’s store ID as the responder and then notify the UI of the message for display:

recvSuccess = msgReceiver.TryReceive(TimeSpan.FromSeconds(3), out NewMsg);
if (recvSuccess)
{
  CreateInquiry.InventoryQueryData qryData = 
    NewMsg.GetBody<CreateInquiry.InventoryQueryData>();
  NewMsg.Properties.Add("ResponderId", this.StoreID);
  EventContainer.RaiseMessageReceived(NewMsg);
}

Within the UI, I’ve subscribed to the event that will receive the message and I then pass it on to a method to update the UI elements that will do the necessary InvokeRequired method check:

void EventContainer_MessageReceivedEvent(object sender, MessagingEventArgs e)
{
  BrokeredMessage msg = (BrokeredMessage)e.MessageData;
  var RequestObject =
    msg.GetBody<CreateInquiry.InventoryQueryData>();
  RequestObject.ResponderID = msg.Properties["ResponderId"].ToString();
  this.ReceivedMessages.Add(RequestObject);
  UpdateUIElements(msg.MessageId, RequestObject);
}

With the code complete to fetch the messages for the Topic and to update the UI, I can now visualize the messages coming in (see Figure 2).

The Inventory Topic Monitor
Figure 2 The Inventory Topic Monitor

This UI (I wasn’t hired for my user experience design skills, obviously) will allow me to select one of the messages and respond to it with a quantity set in the bottom text box. The response code will be a pretty simple and short task, as I only have to add a small bit of code to set the quantity on the message object and then send it to the Queue specified in the message’s ReplyTo property.

For this sample, I’m simply adding received messages from the Topic to a Dictionary<string, BrokeredMessage> object where I’m using the BrokeredMessage.MessageId as the key for the dictionary. I simply use the Message Id from the grid to retrieve it from the dictionary and then create a new BrokeredMessage assigning the same CorrelationId and assigning value to the ResponderId and Quantity properties. Just as when receiving from the Topic, I’ll use the MessagingFactory to create a QueueClient and from that object a MessageSender:

// Send message
  SharedSecretCredential credential =
    TransportClientCredentialBase.CreateSharedSecretCredential(
    Constants.issuerName, Constants.issuerKey);
  Uri sbUri = ServiceBusEnvironment.CreateServiceUri(
           "sb", Constants.sbNamespace, String.Empty);
MessagingFactory Factory =  MessagingFactory.Create(sbUri, credential);
QueueClient QClient = Factory.CreateQueueClient(msg.ReplyTo);
MessageSender Sender = QClient.CreateSender();
Sender.Send(ResponseMsg);

That sends response back, so we have to move our focus back to the originating app to process the response.

Receiving the Response

For this sample, I just want to pull the responses off of the queue. Because the code to send the request was modified to start monitoring the ReplyTo queue, I really should add the actual code in to check the queue. I start off creating a MessageReceiver and set up a simple While loop to get all of the messages available on the queue this time around:

void checkQTimer_Tick(object sender, EventArgs e)
{
  MessageReceiver receiver =
    QClient.CreateReceiver(ReceiveMode.ReceiveAndDelete);
  BrokeredMessage NewMessage = null;
  while (receiver.TryReceive(TimeSpan.FromSeconds(1), out NewMessage))
  {
    InquiryResponse resp = new InquiryResponse();
    resp.CorrelationID = NewMessage.CorrelationId;
    resp.Message = NewMessage;
    InquiryResponses.Add(resp);
  }
 }

As before, I use the TryReceive method. While it works for this sample, I’d consider doing it a little differently for a real UI, because the method blocks the thread, thus lending itself to be better executed on a different thread. I want to fetch the entire BrokeredMessage from the list by CorrelationId, so I’ve created an object and use the CorrelationId to filter the object out later. I want the message as a BrokeredMessage, because I want data that is part of the BrokeredMessage envelope that now encapsulates my InquiryData object (see Figure 3).

Inquiry Request and Responses
Figure 3 Inquiry Request and Responses

Modifying the SelectedIndexChanged code of the ListBox, I simply grab the CorrelationId that I used as the item and use it to get the responses of interest out of the list I’m building as responses show up on the queue:

string correlationId =
  lbRequests.SelectedItem.ToString();
List<InquiryResponse> CorrelatedList =
  InquiryResponses.FindAll(
  delegate(InquiryResponse resp)
{
  return resp.CorrelationID == correlationId;
});

The last bit of work is to add the responses to the DataGrid to see the stores that responded to my inventory inquiry. Adding to the beauty of my barren UI, you’ll notice that I’m using GUIDs, but I hope that it’s obvious to the reader that this would be replaced by user-friendly descriptions, leaving the nasty GUIDs and IDs for the extended details pages.

Review

After growing up as an Army brat, I have a lot of sayings and other such things burned into my head. One example is the “Military Method” of education, which is that I’ll tell you what we’re going to do, we’ll do it, and then I’ll tell you what we did. Well, I’m at the “what we did” part. I started this writing as a single entry, but found that addressing the complete round-trip to be too much for a single column. So I split it up into two entries: one to get the request out and the second to get the response back. My goal was to present the basic features of the Azure AppFabric ServiceBus and give a general feel for using it. When thinking about technology, I always like to wrap it in some type of context, and in this case I wanted the context to be an inter-store product query, because it’s a feasible scenario and it makes sense to use a mix of both Topics and Queues.  

As of this writing, the May CTP of the Azure App­Fabric ServiceBus has been released and some information on it can be found at bit.ly/it5Wo2. Additionally, you can get involved on the forums at bit.ly/oJyCYx.


Joseph Fultz is a software architect at Hewlett-Packard Co., working as part of the HP.com Global IT. Previously he was a software architect for Microsoft working with its top-tier enterprise and ISV customers defining architecture and designing solutions.

Thanks to the following technical expert for reviewing this article: Jim Keane