Azure Service探索—存储之队列(Queues)存储

del.icio.us Tags: Windows Azure,Queues

之前的一篇探索是有关Azure服务的文件系统存储的,我们看到了LocalStorage的不可共享性和临时性,同样也是在那篇文章中我们也涉及到了Azure编程模型中两个比较重要的模型和角色,Web Role和Worker Role,了解Web Role和Worker role之间的关系,是使用和掌握队列存储的关键。

根据MSDN的官方定义:

Web role : A web role is a Web application accessible via an HTTP and/or an HTTPS endpoint. A web role is hosted in an environment designed to support a subset of ASP.NET and Windows Communication Foundation (WCF) technologies.

Worker role : A worker role is a background processing application. A worker role may communicate with storage services and with other Internet-based services. It does not expose any external endpoints. A worker role can read requests from a queue defined in the Queue service.

简单的说,一个Web Role是一个能够通过HTTP/HTTS访问的Web应用,而Worker role 则是一个无法通过HTTP/HTTPS访问的后台处理应用。一般根据Azure的定义一个Azure服务至少有一个Web Role,而Worker Role则是可选的,不一定要有。 你也可以理解成Web Role是前台应用,比如一个你熟悉的网上商店,Web Role负责展示商店的商品,进行商品的搜索,以及接受用户的订购申请。而订购申请产生后,是否被接受和处理了,则是由Worker Role这样的后台应用在进行处理,比如检查库存是否有货,用户的信用情况,货物发送到物流公司的状态更新。

就像下面这幅图显示的, WebRole是负责卖咖啡的,而Worker Role就是那个负责制造咖啡的。在Windows Azure的设想里,其实这个卖咖啡的和这个制造咖啡的可以是处于不同生产环节上的两个厂商,他们分别将其应用托管到Windows Azure上,对于这连个应用之间的协作和消息通讯,Windows Azure可以提供一个很好的解决方案,比如.NET Services 中的Service Bus 和 Workflow Service,这是针对两个独立的应用(可能是坐落在不同云朵上的两个云应用)之间,对于我们单独的一个咖啡应用之间,卖咖啡的和做咖啡之间的消息传递就可以考虑使用队列服务或队列存储,这是一个非常典型的生产者-消费者的模型。

clip_image002

也就是相对复杂一些的应用场景,我们一个单独Azure服务会同时需要Web 角色和Worker角色,甚至可能是一个Web角色,多个后台Worker角色,队列存储其实解决的就是前台角色和后台Worker角色消息共享和传递的问题。解决消息共享,管理消息的就是消息存储负责的,至于消息传递,消息的可靠性,则是由消息服务来负责的。

除了一个Azure服务的内部各个组件和角色之间通讯外,还有应用组合的场景中也可以考虑使用Azure的队列存储和服务。比如下面的这个例子:

clip_image004

Pictures source: https://blog.smarx.com/posts/windows-azure-worker-role-to-deal-with-spam

Steve Marx举的这个例子场景就是,他现在想做一个云计算版本的Weblog应用,但他也需要解决垃圾评论的问题。而TypePad AntiSpam对于垃圾评论已经实现了一个很好的解决方案,而且TypePad AntiSpam也提供了一个公开的API和接口,那么Steve Marx的Weblog应用就完全可以将TypePad的垃圾评论能力聚合到自己的Weblog应用中。

大致的流程如下:

1. 某个读者访问Steve Marx的云版Weblog应用(Web Role),并且写了自己的评论。

2. Web Role 先将读者的评论保存在Azure 的表存储中,将其状态设置为审核中不显示。

3. 然后Web Role 再通过Azure队列,放一个消息,告知一个编号的评论需要审批,看是否是恶意或垃圾评论

4. Worker Role一般在Web Role启动后不久也会开始工作,它检查队列里的消息,一旦检查到了,它便可以通过消息里的编号,去Azure 的表存储中找到具体的评论信息,用户信息以及用户是针对哪篇文章发表的评论。

5. 之后Worker Role 带着这些信息(TypePad AntiSpam服务需要的信息),调用TypePad的垃圾评论检测服务的接口,从而确定这个用户的这个评论的有效性。

6. 获得TypePad AntiSpam返回的结果,Worker Role更新Azure 的表存储中这个评论的状态,比如设置成,审核通过可显示,或审核未通过不显示。

了解了设计层面和使用的场景,技术和代码层面就比较简单了。我修改了上篇文章的项目设计一个简单的场景来模拟,并探索Azure 队列的功能。

应用界面如下:

clip_image006

前台的Send,负责产生需求和输入,并将其放到队列中,Refresh则读取队列,看看队列中有哪些需求和输入没有处理,后台的Worker Role则读取队列中的消息,并且将其处理(从队列中删除)

这次我们就直接创建一个同时包含Web role和 Worker Role的项目:

clip_image008

创建项目后,在写代码之前,我们还需要做两件事,一是配置和定义Azure 服务,另外一个是确定访问/存取Azure 队列存储的组件。

在目前的Azure运行环境中,队列存储,表存储和Blob存储服务都是以REST的协议提供了访问接口,基本上你如果要使用这些服务,只要有一个访问/存取Azure 队列存储的组件/客户端,以及访问的用户和密码就可以了。

访问的用户和密码我们可以配置在Azure 服务的配置文件中,而一个访问/存取Azure 队列存储的组件/客户端,现在流行的是直接使用SDK中的StorageClient 类库,这个类库在 Azure SDK的\StorageClient\Lib 目录下,我喜欢将其编译后,放到单独的一个目录,以后再项目中直接添加文件引用。命名空间是:Microsoft.Samples.ServiceHosting.StorageClient

首先是要在服务定义文件ServiceDefinition.csdef中定义队列相关的用户名和密码,队列访问地址,因为Web role和Work role 都要存取和访问队列,所以都需要进行定义

 <?xml version="1.0" encoding="utf-8"?>
  
 <ServiceDefinition name="WorkingWithQueues" xmlns="https://schemas.microsoft.com/ServiceHosting/2008/10/ServiceDefinition">
  
 <WebRole name="WebRole">
  
 <ConfigurationSettings>
  
 <Setting name="QueueStorageEndpoint"/>
  
 <Setting name="AccountName"/>
  
 <Setting name="AccountSharedKey"/>
  
 </ConfigurationSettings>
  
 <InputEndpoints>
  
 <!-- Must use port 80 for http and port 443 for https when running in the cloud -->
  
 <InputEndpoint name="HttpIn" protocol="http" port="80" />
  
 </InputEndpoints>
  
 </WebRole>
  
 <WorkerRole name="WorkerRole">
  
 <ConfigurationSettings>
  
 <Setting name="QueueStorageEndpoint"/>
  
 <Setting name="AccountName"/>
  
 <Setting name="AccountSharedKey"/>
  
 </ConfigurationSettings>
  
 </WorkerRole>
  
 </ServiceDefinition>

之后在服务配置文件ServiceConfiguration.cscfg中,定义在服务定义文件中各个参数的值

 <?xml version="1.0"?>
  
 <ServiceConfiguration serviceName="WorkingWithQueues" xmlns="https://schemas.microsoft.com/ServiceHosting/2008/10/ServiceConfiguration">
  
 <Role name="WebRole">
  
 <Instances count="1"/>
  
 <ConfigurationSettings>
  
 <Setting name="QueueStorageEndpoint" value="https://127.0.0.1:10001"/>
  
 <Setting name="AccountName" value="devstoreaccount1"/>
  
 <Setting name="AccountSharedKey" value="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="/>
  
 </ConfigurationSettings>
  
 </Role>
  
 <Role name="WorkerRole">
  
 <Instances count="1"/>
  
 <ConfigurationSettings>
  
 <Setting name="QueueStorageEndpoint" value="https://127.0.0.1:10001"/>
  
 <Setting name="AccountName" value="devstoreaccount1"/>
  
 <Setting name="AccountSharedKey" value="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="/>
  
 </ConfigurationSettings>
  
 </Role>
  
 </ServiceConfiguration>
  

然后分别在Web Role和Worker Role的项目中添加StorageClient的引用

clip_image010

这里可以看一下主要的几个代码

创建消息队列和添加到消息的代码:

    1: StorageAccountInfo account = StorageAccountInfo.GetDefaultQueueStorageAccountFromConfiguration();
    2:  
    3: QueueSvc = QueueStorage.Create(account);
    4:  
    5: queue = QueueSvc.GetQueue("mymessage090120");
    6:  
    7: if (!queue.DoesQueueExist())
    8:  
    9: {
   10:  
   11: queue.CreateQueue();
   12:  
   13: }
   14:  
   15: if (queue != null)
   16:  
   17: {
   18:  
   19: Message msg = new Message(InputBox.Text);
   20:  
   21: queue.PutMessage(msg); 
   22:  
   23: InputBox.Text = "";
   24:  
   25: }
   26:  

刷新功能,主要是使用PeekMessages方法来读取消息队列:

    1: MsgBox.Items.Clear();
    2:  
    3: IEnumerable<Message> msgs = queue.PeekMessages(10);
    4:  
    5: if (msgs != null)
    6:  
    7: {
    8:  
    9: foreach (Message item in msgs)
   10:  
   11: {
   12:  
   13: MsgBox.Items.Add(item.ContentAsString());
   14:  
   15: }
   16:  
   17: }

Worker Role的处理消息的代码:

    1: public override void Start()
    2:  
    3: {
    4:  
    5: // initialize the account information
    6:  
    7: Uri baseUri = new Uri(RoleManager.GetConfigurationSetting("QueueStorageEndpoint"));
    8:  
    9: string accountName = RoleManager.GetConfigurationSetting("AccountName");
   10:  
   11: string accountKey = RoleManager.GetConfigurationSetting("AccountSharedKey");
   12:  
   13: StorageAccountInfo account = new StorageAccountInfo(
   14:  
   15: baseUri,
   16:  
   17: null,
   18:  
   19: accountName,
   20:  
   21: accountKey);
   22:  
   23: //retrieve a reference to the messages queue.
   24:  
   25: QueueStorage service = QueueStorage.Create(account);
   26:  
   27: MessageQueue queue = service.GetQueue("");
   28:  
   29: while (true)
   30:  
   31: {
   32:  
   33: Thread.Sleep(10000);
   34:  
   35: if (queue.DoesQueueExist())
   36:  
   37: {
   38:  
   39: Message msg = queue.GetMessage();
   40:  
   41: if (msg != null)
   42:  
   43: {
   44:  
   45: RoleManager.WriteToLog("Information",
   46:  
   47: string.Format("Message '{0}' processed.", msg.ContentAsString()));
   48:  
   49: queue.DeleteMessage(msg);
   50:  
   51: }
   52:  
   53: }
   54:  
   55: }
   56:  
   57: }
   58:  

 

最后的有关队列的简单规则

看到“mymessage090120”这个队列名了吗? 这是我想告诉的另外一个注意事项,那就是Azure Container的命名规则,队列名,表名这些在Azure中都算是一个容器(Container)的标示,容器名必须是一个有效的DNS (Domain Name System)的名称,并遵守下面规则

 

规则1 队列、Blob容器和表的名称的命名规则

· 名称必须以数字或字母开头,并且有字母,数字,. 符号(period)和-符号(dash)组成

· 所有的字母必须是小写

· 名称的长度在3-63个字符之间

· 在. 符号后面不能直接跟-符号

简单说来,就是必须符合这个正则表达式 ^([a-z]|\d){1}([a-z]|-|\d){1,61}([a-z]|\d){1}$,另外一个是表名的约束ValidTableNameRegex = @"^([a-z]|[A-Z]){1}([a-z]|[A-Z]|\d){2,62}$"

我一开始没有注意到这些,使用了大写字母,结果 Debug了折腾了一圈才找到错误。

 

规则2 一个队列能够包括无数个消息,每个消息的大小不能超过8K(MaxMessageSize = 8 * 1024),消息在队列中的最大存活时间为7天(MaxTimeToLive = 7 * 24 * 60 * 60)

相关的信息,可以在Queue.cs 和RESThelpers.cs 中找到一些。

 

规则3:队列存储是跟随和绑定在一个Azure平台的帐号下的,队列存储中包含无限量的消息。从REST的路径上可以看出https://<Account>.queue.core.windows.net/<QueueName>

规则4:Azure队列服务提供的是可靠的队列化的消息服务,它能确保消息至少被接收到一次,适合在异步的任务分发和查询场景中,并且只要Azure平台的帐号不变,队列存储本身是可靠和持久性的存储(Durable Storage)。

 

相关的示范代码可以在我的MSDN代码库中下载获得。代码在在Azure SDK and Tools Jan 2009 CTP环境下测试通过。