天气预报:多云

以 AppFabric 队列结束探索之旅

Joseph Fultz

下载代码示例

在 10 月刊中,我介绍了 Windows Azure AppFabric 服务总线中的一些新功能 (msdn.microsoft.com/magazine/hh456395)。在本月刊中,我将接着上一次的方案,继续介绍该流程的返回过程。目前在此方案中,商店通过向某个主题发布库存检查请求,以此请求对临近店铺执行库存检查。订阅该主题的商店将基于订阅上的筛选器接收请求,该筛选器将消息限制为这些商店所在区域内并且不是由商店自身发送的消息。

对于返回过程,我将依赖于 Windows Azure AppFabric 服务总线队列的几个功能。首先,在发送到主题的 BrokeredMessage 上,我需要为两个重要的属性赋值。第一个要赋值的属性是 ReplyTo,该属性告知收件人要将消息发送到何处。这是在发件人发送消息时由发件人创建的特定队列。在本示例中,我希望使用队列而非主题,因为此时响应将发送到一个收件人,这一点与请求模式不同。请求模式等同于广播,用于向正在监听的任何人请求提供帮助。

图 1 中显示了消息流的新部分,该部分流向右侧一排的商店。


图 1 查询和响应的往返流程

更新出站请求代码

在第一个阶段中,我向主题发出了消息,并演示了通过适当的订阅来选取这条消息。但是,要轻松地实现良好的响应,还有两项重要的工作需要完成。第一项是设置 BrokeredMessage 的 CorrelationId 属性。该属性是 API 的一部分,因此无需作为我的数据类或架构的一部分包括在内。因为该属性是字符串,所以应该使用有意义的内容,同时还应该是可以唯一标识消息的字符串,这样与 CorrelationId 匹配的任何响应都不会造成混淆,即不会与系统中的其他请求匹配。在我的示例中使用了 GUID,在实际应用中,这也是一个相当稳妥的选择。代码如下:

BrokeredMessage msg = BrokeredMessage.CreateMessage(data);
// Add props to message for filtering
msg.Properties["Region"] = data.Region;
msg.Properties["RequesterID"] = data.RequesterID;
// Set properties for message
msg.TimeToLive = TimeSpan.FromSeconds(30);
msg.ReplyTo = returnQueueName;
msg.CorrelationId = Guid.NewGuid().ToString();

BrokeredMessage 同时还有一个名为 SessionId 的属性,在此处我并未使用。 SessionId 也是一个用于逻辑分组的属性,适合在消息封包级别使用,因为该属性可帮助对所有相关的消息进行分组。 现在的问题是其用途与 CorrelationId 有什么不同。 SessionId 在两种方案中尤为有用。 第一种是在一个系统中,多个后台程序发出各种全都相关的请求。 使用 CorrelationId 将响应路由到请求处理程序。 SessionId 可用于对处理节点之间发送和接收的所有消息进行分组。 这种分组方法在确定系统中的处理状态以便分析和调试时非常有用。 由于相同的原因,这是系统中一种非常有用的构造,可以将多个请求作为整个流程的一部分(例如,采购处理、检查库存、付款验证、发送以履行订单等等),但确切的流和时间无法保证。

在出站消息上设置 CorrelationId 之后,需要进行的下一项更改是设置 ReplyTo 属性。 我可以创建另一个主题,并且让所有商店监视响应或者使用单个队列,但这会造成不必要的流量,在高负载时导致瓶颈的可能性更高。 因此,只是在收到请求时创建响应队列并让收件人知道队列位置就非常有意义。 这个字符串可以是任意内容,不过我建议使用完全限定名称,以免在今后的软件革新时造成混淆或冲突。 开始时您可以只使用队列名称,不过在维护和扩展时,这会导致混淆,因为系统将开始支持多个服务总线命名空间和子队列。 此外,对于未使用 Microsoft .NET Framework 的收件人,使用完全限定地址是更好的选择。

在发送消息之前,我需要完成的最后两件事是创建响应队列并启动计时器来检查响应。 我从 GetQueue 方法开始着手。 在撰写本文时,关于 GetQueue 的文档 (bit.ly/pnByFw) 中声明了“返回值”的“类型”等于 Microsoft.ServiceBus.Messaging.Queue,这是队列的 Queue 句柄;如果服务命名空间中不存在队列,则为 null。 然而,实际情况并非如此。 事实上它将引发异常错误:

// Check if-exists and create response queue
try
{
  returnQ = this.ServiceBusNSClient.GetQueue(returnQueueName);
}
catch (System.Exception ex)
{
  Debug.WriteLine(ex.Message);
}
if (returnQ == null)
{
  returnQ = this.ServiceBusNSClient.CreateQueue(returnQueueName);
}
checkQTimer.Enabled = true;

因此,我将 GetQueue 方法包装到 try-catch 代码块中并继续。 我的示例代码非常典型,仅仅输出错误而已。 在创建队列之后,我将队列分配给一个变量,这样便可以引用它来检查队列,然后启用我在应用程序启动时设置的计时器。

更新收件人和响应

在发件人一端进行适当的修改后,我必须添加一些内容,从而对请求做出响应。 我将使用几乎完全硬编码的响应,但我打算在网格中列出消息,这样便可以选择要回复的消息。 我已设置了事件机制用于通知 UI 有新的消息。 在收到消息之后,我将当前收件人的商店 ID 设置为响应方,然后通知 UI 要显示的消息:

recvSuccess = msgReceiver.TryReceive(TimeSpan.FromSeconds(3), out NewMsg);
if (recvSuccess)
{
  CreateInquiry.InventoryQueryData qryData = 
    NewMsg.GetBody<CreateInquiry.InventoryQueryData>();
  NewMsg.Properties.Add("ResponderId", this.StoreID);
  EventContainer.RaiseMessageReceived(NewMsg);
}

在 UI 中,我已订阅了将接收消息的事件,然后将它传递到一个方法,以便更新将执行必要的 InvokeRequired 方法检查的 UI 元素:

void EventContainer_MessageReceivedEvent(object sender, MessagingEventArgs e)
{
  BrokeredMessage msg = (BrokeredMessage)e.MessageData;
  var RequestObject =
    msg.GetBody<CreateInquiry.InventoryQueryData>();
  RequestObject.ResponderID = msg.Properties["ResponderId"].ToString();
  this.ReceivedMessages.Add(RequestObject);
  UpdateUIElements(msg.MessageId, RequestObject);
}

完成用于提取主题的消息以及更新 UI 的代码之后,现在我可以使传入的消息可视化(请参见图 2)。


图 2 库存主题监视器

我可以使用此 UI(毫无疑问,用户体验设计技能并不是我的专长)来选择其中一条消息,并使用在底部文本框中设置的数量进行回复。响应代码是一个相当简短和精炼的任务,因为我只需要添加一点代码在消息对象上设置数量,然后将它发送到在消息的 ReplyTo 属性中指定的队列。

在本示例中,我只是将从主题收到的消息添加到 Dictionary<string, BrokeredMessage> 对象,在其中使用 BrokeredMessage.MessageId 作为字典的关键字。我只是使用网格中的消息 ID 从字典中检索消息,然后创建新的 BrokeredMessage,分配相同的 CorrelationId 并为 ResponderId 和 Quantity 属性赋值。就像从主题接收时一样,我将使用 MessagingFactory 创建 QueueClient。通过该对象,MessageSender 可以完成以下操作:

// Send message
  SharedSecretCredential credential =
    TransportClientCredentialBase.CreateSharedSecretCredential(
    Constants.issuerName, Constants.issuerKey);
  Uri sbUri = ServiceBusEnvironment.CreateServiceUri(
           "sb", Constants.sbNamespace, String.Empty);
MessagingFactory Factory =  MessagingFactory.Create(sbUri, credential);
QueueClient QClient = Factory.CreateQueueClient(msg.ReplyTo);
MessageSender Sender = QClient.CreateSender();
Sender.Send(ResponseMsg);

上面的代码将发送回响应,现在我们需要将注意力转回到发起应用程序来处理响应。

接收响应

在本示例中,我只想从队列中提取响应。 因为发送请求的代码已经过修改,可以开始监视 ReplyTo 队列,我应该真正添加实际的代码来检查队列。 我开始创建 MessageReceiver 并设置简单的 While 循环,以便获取此时队列上的全部可用消息:

void checkQTimer_Tick(object sender, EventArgs e)
{
  MessageReceiver receiver =
    QClient.CreateReceiver(ReceiveMode.ReceiveAndDelete);
  BrokeredMessage NewMessage = null;
  while (receiver.TryReceive(TimeSpan.FromSeconds(1), out NewMessage))
  {
    InquiryResponse resp = new InquiryResponse();
    resp.CorrelationID = NewMessage.CorrelationId;
    resp.Message = NewMessage;
    InquiryResponses.Add(resp);
  }
 }

与前面一样,我将使用 TryReceive 方法。虽然这个方法在本例中一样适用,不过我考虑在实际 UI 中采用稍有不同的方法,因为该方法会阻止线程,这可以使它更适于在不同的线程上执行。我希望按 CorrelationId 从列表提取完整 BrokeredMessage,因此我创建了对象并随后使用 CorrelationId 来筛选出该对象。我希望将消息作为 BrokeredMessage,因为我希望属于 BrokeredMessage 封包的数据现在封装我的 InquiryData 对象(请参见图 3)。


图 3 查询请求和响应

修改 ListBox 的 SelectedIndexChanged 代码时,我只需捕获用作项的 CorrelationId,然后在作为队列中的响应而构建的列表中,使用它来获取感兴趣的响应:

string correlationId =
  lbRequests.SelectedItem.ToString();
List<InquiryResponse> CorrelatedList =
  InquiryResponses.FindAll(
  delegate(InquiryResponse resp)
{
  return resp.CorrelationID == correlationId;
});

最后一步是将响应添加到 DataGrid 中,查看对我的库存查询做出响应的商店。 作为对单调的 UI 进行美化的一点说明,您可以看到我使用了 GUID,不过我希望读者能够明白,此处可以使用用户友好的说明来代替,而烦人的 GUID 和 ID 可以留到扩展的详细信息页面上处理。

回顾

我在军人家庭中长大,有许多东西在我脑海中根深蒂固。 其中一点就是“军事化”教育:我们应该说到做到,而且勇于承认。 那么,我现在开始“勇于承认”自己所做的事情。 最初我只想写一篇文章,但发现说明完整往返流程的内容对于一篇专栏而言太多了。 因此我分成了两篇: 一篇用于说明发出请求,另一篇用于说明接收响应。 我的目标是展现 Windows Azure AppFabric ServiceBus 的基本功能,并且提供使用它的基本感受。 在涉及到技术时,我通常希望放在某种类型的环境中予以说明,这一次我使用了商店间的产品查询,因为这是一种切实可行的方案,并且能够说明主题和队列的结合使用情况。

在撰写本文的过程中,五月份 CTP 的 Windows Azure App­Fabric ServiceBus 已经发布,其中一些信息请参见 bit.ly/it5Wo2。 此外,您可以通过 bit.ly/oJyCYx 访问论坛。

Joseph Fultz 是 Hewlett-Packard Co. 的软件架构师,参与 HP.com 全球 IT 的工作。 以前他是 Microsoft 的软件架构师,协助 Microsoft 顶层企业和 ISV 客户定义体系结构和设计解决方案。

衷心感谢以下技术专家对本文的审阅: Jim Keane