预测: 多云

Windows Azure AppFabric 服务总线: 主题

Joseph Fultz

 

这篇文章基于 Windows Azure AppFabric CTP 6 月更新。文中的所有信息均有可能发生变更。

Joseph Fultz
它是在我的 Windows Azure 服务总线功能确实没有获得很多支持从我的同事之间没有那么神秘。但是,与 Windows Azure AppFabric CTP 6 月更新,Microsoft 已最后添加足够功能将从什么我认为不远不只是占位符,以真正有用技术是移动服务总线。为我达到目的,重要的 AppFabric 服务总线现在提供了消息传递技术是主题,丰富的发行和预订功能。我将重点主题在本文章和局,我因此经常清空操作,从我零售行业经验,看看如何使用技术促进 inter-store 库存检查状态。

您是否曾转到购买东西,并且找到刚才已售出的最后一个,或所需的项目是以某种方式搞乱了吗?这种情况下,售货员经常将进入 POS 系统,并检查附近商店库存。而多数情况,该检查有违都保存在中央数据库或企业资源规划 (ERP) 系统的某些类型的库存盘点,职员通常检查存储区使用按编号附近商店她经验知识。通常情况下,数据是上载和由公司系统处理有点过时,因为事务日志和其他数据仅有一部分结束日处理时刷新它。

更理想的方案应为存储可能在任何时候日期引发到经不起考验的产品可用性有关请求和附近商店会响应,指示是否他们拥有它。这就是我要使用设置 Windows Azure AppFabric 服务总线,如中所示图 1

An Inventory-Check Message
图 1 的库存检查邮件

主题提供了持久的机制,使我能将出内容推送到每个主题的最多 2000 个订阅服务器。订阅服务器上限制是否很不幸,因为它可能会强制解决方案体系结构 (如我将介绍一个) 来解决它以某种方式创建主题中的段。例如,美国代替库存检查主题筛选依据地区的订阅服务器,我可能要创建 SouthCentral 美国清单检查主题并进一步筛选到特定的本地分支。与该警告,我将继续我单个库存检查主题,如我可以保证,我将不会在我 wonderland 个以上的少数几个特许经营位置有。

 

 

正在获取的邮件

您可以下载 Windows Azure AppFabric CTP 6 月更新从 bit.ly/p3DhAU,并查找管理门户网站,网址 portal.appfabriclabs.com。我将访问管理门户网站检索几条我需要我的代码中使用的信息 (请参阅图 2)。

Getting Information from the Azure Management Portal
图 2 从 Windows Azure AppFabric 管理门户中获取信息

我需要抓住服务网关、 默认颁发者 (始终"所有者"中 CTP) 和默认密钥。因为我还需要这些整个我的示例中,我将在对象作用域来创建它们:

private string sbNamespace = "jofultz";
private string issuerName = "owner";
private string issuerKey = "25vtsCkUPBtB39RdiBbUtCQglYRPOdHHC4MvX5fvxxxx";

我将我的函数中使用这些变量来创建该主题,然后将邮件投递到该主题。 没有为那些希望使用它,谁或谁不允许客户端库使用的平台上部署其余界面。 因为我正在生成 Windows 客户端,我将使用磁带库。 要做到这一点,我需要将引用添加到 Microsoft.ServiceBus、 Microsoft.ServiceBus.Message 和 System.ServiceModel,如中所示图 3

Adding References
图 3 添加引用

完成后,几个简单行代码将是所有所需设置的主题:

SharedSecretCredential credential =
  TransportClientCredentialBase.CreateSharedSecretCredential(
  issuerName, issuerKey);
 
Uri sbUri = ServiceBusEnvironment.CreateServiceUri(
  "sb", sbNamespace, String.Empty);
ServiceBusNamespaceClient sbNSClient =
  new ServiceBusNamespaceClient(sbUri, credential);
Topic newTopic = sbNSClient.CreateTopic(topicName);

什么将成为重要,因为我开始世上还有引发消息是要过滤的消息并获得只与基于地区的接收位置相关的那些存储的能力。 API 支持关联和订购 Id 作为一等公民,但我想使用我的知识的数据和基于请求的内容筛选进行筛选。 因此,我想要寻找并响应对方的请求根据区域的库存的地理位置关闭存储。 同时,要确保原始存储区不会拿自己的请求。 请求将包括相关项,在其中进行查询,该区域,而且 RequesterID 的 SKU,以便在原始存储可以过滤掉自己请求的主题:

[Serializable]
class InventoryQueryData
{
  public string Region;
  public string ResponderID;
  public string RequesterID;
  public string Sku;
  public int Count;
}

请注意我已经向该类添加 [可序列化] 属性。 您可以将属性标记为 DataMembers,同样,使用 DataContract 点但很快便会发送到该主题的任何类型的需要是可序列化。

我创建了一个简单的表单,允许我的 sku 输入任意字符串,然后从过帐存储的列表中选择的。 我的查询按钮背后的代码看起来类似于连接并创建主题的代码,类似于您发现时使用消息传递的 Api,如中所示的典型构造图 4

图 4 获取的数据值

// Assign data values.
InventoryQueryData data = new InventoryQueryData();
data.Sku = txtSKU.Text;
data.StoreID = cboStore.SelectedText;
data.Region = cboStore.SelectedValue.ToString();
 
Uri sbUri = ServiceBusEnvironment.CreateServiceUri("sb", sbNamespace, string.Empty);
SharedSecretCredential credential = TransportClientCredentialBase.CreateSharedSecretCredential(issuerName, issuerKey);
 
MessagingFactory msgFactory = MessagingFactory.Create(sbUri, credential);
TopicClient topicClient = msgFactory.CreateTopicClient(topicName);
MessageSender MsgSender = topicClient.CreateSender();
 
BrokeredMessage msg = BrokeredMessage.CreateMessage(data);
// Add props to message for filtering.
msg.Properties["Region"] = data.Region;
msg.Properties["RequesterID"] = data.RequesterID;
 
msg.TimeToLive = TimeSpan.FromSeconds(60);
 
MsgSender.Send(data);

有此处需注意两件事。 首先,我已经添加了需要使用筛选来 BrokeredMessage.Properties 集合的名称-值对。 其次,作为一种运行时维护,我已赋予 TimeToLive (TTL) 值为 60 秒,应防止订阅主题获取过备份。 当然,您通常需要一种更为明智的做法为领料 TTL,但我图如果请求不会到达订阅服务器上的任何时期内,则可能是太长因为没有一个客户那里竖起等待。 再说,这是一个示例。

有一个工厂方法 CreateMessage BrokeredMessage 的形式发送到总线的任何邮件。 这只是包装到一个包含所有所需的充分发挥作用的消息传递系统的构造 BrokeredMessage 类型的实例的数据。

这却显示我只需要到库存检查主题的订阅服务器收到一条消息,因此,现在我将开始设置订阅客户端获取消息并作出响应。

点击主题流和响应

与我的客户端在的地方,我已准备好发送请求,稳定的数据流,但右现在这将是很少超过使位松动中野生经不起考验。 我创建 Windows 窗体应用程序,并重新使用 XML 存储列表,并从第一个 (发件人) 应用程序 InventoryQueryData。 我需要每个客户端侦听到主题创建唯一的订阅。 通过订阅名称结合存储数我想聆听有足够轻松完成此操作。 我很小的测试应用程序让我从组合框中,选择存储编号,因此我只是跟踪到基订阅名称的值来创建唯一的订阅名称。 务必要确保每个客户端都有唯一的订阅; 两个或多个使用相同的订阅将创建竞态条件位置的那些订阅服务器消息和接收并删除第一个将争夺将获胜:

Topic invTopic = sbNSClient.GetTopic(topicName);
SubscriptionName = "InventoryQuerySubscription" + this.cboStore.Text;
SqlFilterExpression RegionFilter = new SqlFilterExpression("Region = '" +
  cboStore.SelectedValue + "' AND RequesterID <> '" + cboStore.Text + "'");
 
Subscription sub = invTopic.AddSubscription(SubscriptionName, RegionFilter);

如将订阅添加到该主题,我还可以传递一个筛选器中,以便每个存储接收仅库存检查请求自己的区域。 请注意您可以开发自己的 FilterExpression 类型的基类型,但该 API 包含四种类型,应涵盖大多数情况下,尤其是当一起使用: CorrelationFilterExpression,MatchAllFilterExpression,MatchNoneFilterExpression 和 SqlFilterExpression。 我使用了 SqlFilterExpression,这很容易地允许我 instinctively 写入邮件获取德克萨斯区域,例如,此表达式并排除来自我自己的存储区的消息:

"Region = '[Region]' AND RequesterID <> '[StoreID]'"

我只需要进行筛选,通过的传入请求,但在某些情况下我无法从理论上讲"整理"中使用 RuleDescription,说,组合 SqlFilterAction 与 SqlFilterExpression 的方法上的某些数据。 前者标识我正在目标,而后者定义应采取的操作的消息。 适用于收件人和总线的两面数据进出需要转化成 massaged 不能或不会更改时,这种类型的功能非常有用。

一旦设置订阅时它将在最多保留即使客户端将关闭。 这非常适合这种情况下; 每次启动监视,而它将连接到现有的连接时,我只需将创建 SubscriptionClient。 但是,并非所有人将需要这种行为。 我可以设想一下,您可能希望删除的预订,客户端关闭时的情况:

SubscriptionClient subClient = msgFactory.CreateSubscriptionClient(
  topicName, SubscriptionName);
MessageReceiver msgReceiver = subClient.CreateReceiver(ReceiveMode.ReceiveAndDelete);
msgReceiver.Open();

请注意,在调用 CreateReceiver 我已经将 ReceiveMode 设置为 ReceiveAndDelete。 您也可以使用 PeekLock 的 ReceiveMode。 在这里,我只是想抓取的消息和过程,并且我也无需确保正确接收和处理才能删除,因为它将丢失该消息,如果不是完成一大笔交易。 如果我需要更多保证,行为和可靠,我很可能会做两件事。 我不为消息、 主题或订阅中,设置 TTL,而是让无限期地实时的消息。 或者,我将给它一个非常高的 TTL,以便接收方有足够的时间来处理或移动到异常队列,以便只有真正未送达的邮件会到头来死信队列中。 此外,我将使用 PeekLock 接收器上读取和处理数据和仅删除相关的过程完成后的消息。 手动创建此分布式的事务行为会很快导致其他问题,如病毒队列,但我们将简要介绍该行为和问题设置另一个时间。

我已经打开接收器后输入一个循环来检查邮件。 API 具有将返回 BrokeredMessage 直接接收方法。 但是,我将使用 TryReceive 方法中,将返回一个布尔值,用于指示成功 (请参阅图 5)。 我会将相对较短的超时传递给此方法,应足够长,要检查并接收消息。 如果收到一条消息,我将使用它,并立即检查另一条消息。 如果不收到任何消息,我将睡眠状态位的线程,并再次检查。

正在检查邮件的图 5

while (MonitorOn)
{
  BrokeredMessage NewMsg;
  bool recvSuccess = msgReceiver.TryReceive(TimeSpan.FromSeconds(3), out NewMsg);
 
  if (recvSuccess)
  {
    this.lbMessages.Items.Add(NewMsg.MessageId);
    InventoryQueryData RequestObject =
      NewMsg.GetBody<CreateInquiry.InventoryQueryData>();
    ProcessMessage(RequestObject);
  }
  else
  {
    System.Threading.Thread.Sleep(3000);
  }
}

我请求对象包含在 BrokeredMessage,并检索它,我将调用传递对象类型 GetBody <T>。 此处的含意是对象类型需要与总线的两面上匹配。 您可以使用代理服务器类型实现此类情况的目的,或在这种不确定性或在随后的普通旧 XML,无法将对象作为字符串传递和处理为 XML。

直到下一次 …

此时,所有的工作是为了创建邮件并将其取出所有感兴趣的节点,并为它们从主题获取消息。 我已经证明已订阅的接收器适当地允许我不仅广播消息,而且还进行筛选的功能。 下个月我来解决通过以展示互为补充的队列和相关函数来完成该方案并基本了解 Windows Azure AppFabric 服务总线中的新功能,完善的行程。 我还将有供您下载的完整代码。

约瑟夫山 Fultz是软件架构师在惠普有限公司,工作作为一部分 HP.com 全球 IT 组。 之前,他是 Microsoft 的软件架构师,协助 Microsoft 顶层企业和 ISV 客户定义体系结构和设计解决方案。

这要归功于以下的技术专家审阅这篇文章: Jim Keane