Windows Azure 开发

Windows Azure 上的 CQRS

Mark Seemann

下载代码示例

Microsoft Windows Azure 提供了独特的机会和挑战。虽然在灵活可伸缩性、成本降低和部署灵活性等方面存在机会,但同时也存在挑战,原因是 Windows Azure 环境与当前承载大多数 Microsoft .NET Framework 服务和应用程序的标准 Windows 服务器不同。

将应用程序和服务部署在云中的最有说服力的理由之一是灵活的可伸缩性:您可以在需要时提高服务规模,可以在需求减少时再降低规模。在 Windows Azure 中,影响最小的规模调整方式是外延式扩展而不是提升式扩展,即添加更多服务器而不是使现有服务器的功能更强。若要适合该可伸缩性模型,应用程序必须可以动态伸缩。本文介绍构建可伸缩服务的有效方法,并演示如何在 Windows Azure 上实现这种方法。

命令查询职责分离 (CQRS) 是一种构建可伸缩应用程序的新方法。该方法看似与您习惯的 .NET 体系结构类型不同,但是它能够以行之有效的原则和解决方案为基础来实现可伸缩性。有大量介绍如何构建可伸缩系统的知识可供使用,但这需要改变思维定势。

形象地说,CQRS 只不过是关于分离问题的一种声明,但是在软件体系结构环境中,它通常表示一组相关模式。换句话说,CQRS 这一术语可以具有两种含义:作为模式和作为体系结构风格。在本文中,我将简要介绍这两种观点,并提供基于在 Windows Azure 上运行的 Web 应用程序的示例。

了解 CQRS 模式

CQRS 的基础技术源自面向对象的模式语言。命令是更改某种状态的操作,而查询是检索有关状态的信息的操作。更通俗地说,命令就是写入,而查询是读取

CQRS 模式简单地声明必须将读取和写入显式建模为分离的职责。写入数据是一种职责,而读取数据是另一种职责。大多数应用程序需要同时执行这两种操作,但是如图 1 所示,必须单独处理每种职责。

图 1 将读取与写入分离

应用程序所写入的概念系统与它所读取的概念系统不同。

显然,应用程序写入的数据最终可用于读取。CQRS 模式并未对如何实现这一点做出任何描述,但是在最简单的实现中,读取和写入系统可以使用相同的基础数据存储。

在这种情况下,读取和写入会严格分离;写入决不会 返回数据。这一看似无关紧要的声明为创建可大规模伸缩的应用程序提供了大量机会。

CQRS 体系结构风格

除了 CQRS 模式之外,CQRS 体系结构风格还基于有关数据显示的简单但深刻的认识。请参见图 2,该图显示了一个预订应用程序的 UI(假设这是一个餐厅预订系统)。

图 2 显示数据在呈现时已过时

该日历显示出指定月份中的日期,但是某些日期已禁用,因为这些日期已经订满。

这类 UI 中的数据新旧程度如何?在呈现数据、通过网络传输数据、用户对数据进行解释并做出响应的过程中,数据可能已在基础数据存储中发生更改。用户在响应前查看数据的时间越长,数据就变得越过时。用户可能会在继续操作之前被电话或类似干扰所打断,因此考虑时间(用户仔细阅读网页所用的时间)可以用分钟度量。

解决此问题的常见方法是使用开放式并发来应对发生冲突的情况。应用程序开发人员必须编写代码来处理此类情况,但是 CQRS 体系结构风格已包含此基本情况,而不是将其视为异常情况。如果显示数据在呈现时已过时,则它不必 反映中心数据存储中的数据。相反,应用程序可以显示来自不规范数据源的数据,可能要略微滞后于“实际”数据存储。

认识到显示数据总会过时并遵循写入永远不会返回数据的 CQRS 原则,就会产生实现可伸缩性的机会。UI 不必等待数据写入,而是可以只发送异步消息并向用户返回视图。后台辅助线程选择这些消息并按其自己的节奏对这些消息进行处理。图 3 显示了 CQRS 风格体系结构的更全面视图。

图 3 CQRS 风格体系结构

每当应用程序需要更新数据时,它都会发送一个异步消息形式的命令,而这一操作很可能会通过持久队列进行。命令发送后,UI 立即向用户返回视图。后台辅助线程在单独进程中选择该命令消息,并向数据存储写入相应更改。在此操作过程中,它还引发一个事件作为另一个异步消息。其他消息处理程序可以订阅这种事件,并相应地更新数据存储的不规范视图。

虽然视图数据会滞后于“实际”数据,但此事件传播的速度通常非常快,以致于用户不会注意到它。不过,即使系统由于负载过大而速度减慢,视图数据也最终会变得一致。

这种体系结构可以在许多不同系统上实现,但由于它在辅助角色和队列方面具有明确的概念,Windows Azure 十分适合该体系结构。但是,Windows Azure 在 CQRS 方面也带来了一些独特挑战;本文其余部分会通过一个示例应用程序来探讨机会和挑战。

一个预订应用程序

要介绍如何在 Windows Azure 上实现 CQRS,一个简单的预订应用程序是很好的示例。假设该应用程序处理某个餐厅的预订请求。用户看到的第一页是日期选取器,如图 2 所示。请再次注意,某些日期已禁用,表示这些日期已经订满。

当用户单击某个可用日期时,会显示一个预订表单和随后的收据,如图 4 所示。

图 4 预订 UI 流

请注意,收据页会通知用户此时并不保证可完成预订。最终决定将通过电子邮件来通知。

在 CQRS 中,由于处理是在后台进行的,因此 UI 将在设置预期值时发挥重要作用。但是在正常加载过程中,收据页会在用户往下进行之前获得足够时间,请求已得到处理。

我现在会演示有关实现示例预订应用程序的要点。因为即使是这种简单应用程序也会有许多移动部分,所以我在此处会重点介绍最有意义的代码段;本文附带的下载中提供了完整代码。

提交命令

Web 角色将以 ASP.NET MVC 2 应用程序的形式实现。当用户提交图 4 中所示的表单时,会调用相应控制器操作:

[HttpPost] 
public ViewResult NewBooking(BookingViewModel model) 
{ 
  this.channel.Send(model.MakeNewReservation()); 
  return this.View("BookingReceipt", model); 
}

通道字段是此简单 IChannel 接口的已注入实例:

public interface IChannel 
{ 
  void Send(object message); 
}

NewBooking 方法通过通道发送的命令只是封装在数据传输对象中的 HTML 表单数据。 MakeNewReservation 方法只是将发布的数据转换为 MakeReservationCommand 实例,如下所示:

public MakeReservationCommand MakeNewReservation() 
{ 
  return new MakeReservationCommand(this.Date, 
    this.Name, this.Email, this.Quantity); 
}

因为 Send 方法返回 void,UI 会在命令成功发送后立即向用户返回一个 HTML 页。 在队列顶部实现 IChannel 接口可确保 Send 方法尽快返回。

在 Windows Azure 上,我们可以在属于 Windows Azure 存储的内置队列的顶部实现 IChannel 接口。 若要将消息置于这种持久队列中,该实现必须对消息进行序列化。 可通过许多不同方式做到这一点,不过为简单起见,我选择使用内置到 .NET Framework 中的二进制序列化程序。 但是在生产应用程序中,您应认真考虑替代方法,因为使用二进制序列化程序会难以处理版本控制问题。 例如,当新版本代码尝试对由旧版本序列化的 blob 进行反序列化时,会发生什么情况? 可能的替代方法包括 XML、JSON 或协议缓冲区。

借助于此技术堆栈,可十分简单地实现 IChannel.Send:

public void Send(object command) 
{ 
  var formatter = new BinaryFormatter(); 
  using (var s = new MemoryStream()) 
  { 
    formatter.Serialize(s, command); 
    var msg = new CloudQueueMessage(s.ToArray()); 
    this.queue.AddMessage(msg); 
  } 
}

Send 方法将对 Command 进行序列化,并从生成的字节数组创建新的 CloudQueueMessage。队列字段是来自于 Windows Azure SDK 的 CloudQueue 类的注入实例。使用正确地址信息和凭据进行初始化后,AddMessage 方法将消息添加到相应队列。这一过程通常以惊人的速度发生,因此当该方法返回时,调用方可立即执行其他工作。同时,该消息现在处于队列中,正在等待后台处理器选择它。

处理命令

Web 角色会恰当地显示 HTML 并接受其可以通过 IChannel 接口发送的数据,而辅助角色会按自己的节奏从队列接收和处理消息。这些后台辅助线程是无状态的自治组件,因此如果它们无法跟上传入的消息,则您可以添加更多实例。基于消息传送的体系结构的巨大可伸缩性正是来源于此。

如前所示,通过 Windows Azure 队列发送消息十分简单。而以安全且一致的方式使用这些消息会有些棘手。每个命令都具有一个更改应用程序状态的意图,因此后台辅助线程必须确保不丢失任何消息,并且基础数据以一致的方式改变。

对于支持分布式事务的队列技术(如 Microsoft 消息队列),确保这一点可能相当简单。Windows Azure 队列不是事务性的,但是它们提供了自己的一组保证。消息在读取时不会丢失,而是在一段时间内不可见。客户端应从队列提取消息,执行相应操作,然后删除该消息以作为该过程的最后一步。这便是示例预订应用程序的通用辅助角色所执行的任务;该辅助角色以无限循环的方式执行图 5 中显示的 PollForMessage 方法。

图 5 PollForMessage 方法

public void PollForMessage(CloudQueue queue) 
{ 
  var message = queue.GetMessage(); 
  if (message == null) 
  { 
    Thread.Sleep(500); 
    return; 
  } 
  
  try 
  { 
    this.Handle(message); 
    queue.DeleteMessage(message); 
  } 
  catch (Exception e) 
  { 
    if (e.IsUnsafeToSuppress()) 
    { 
      throw; 
    } 
    Trace.TraceError(e.ToString()); 
  } 
}

如果队列中目前没有任何消息,则 GetMessage 方法可能返回 null。这种情况下,该方法只是等待 500 毫秒,然后返回,此时将通过无限外部循环再次立即调用该方法。当收到消息时,该方法会通过调用 Handle 方法来处理该消息。这是应该完成所有实际工作的位置,因此如果该方法在没有引发异常的情况下返回,便可安全地删除该消息。

另一方面,如果在处理消息期间发生异常,则抑制该异常十分重要;未处理的异常会使整个辅助角色实例崩溃,并会停止从队列中提取消息。

可立即投入生产运行的实现需要处理起来更加复杂的所谓中毒消息,但是我决定在示例代码中不涉及这种消息,以使代码更加简单。

如果在处理消息期间引发了异常,则不会删除该消息。超时之后,该消息可供再次处理。这是 Windows Azure 队列所提供的保证:可对一个消息至少 处理一次。因此,可以将消息重播多次。这样,所有后台辅助线程必须能够处理消息重播。所有持久写入操作都必须是幂等的。

使写入操作变为幂等

每个处理消息的方法都必须能够处理重播,而不会影响应用程序的状态。处理 MakeReservationCommand 就是一个很好的示例。图 6 提供了消息流概览。

图 6 处理预订命令的工作流

应用程序必须执行的第一个操作是检查餐厅对于请求的日期是否具有足够的容量;所有桌位都可能已为指定日期保留,或者可能仅剩下几个桌位。为了回答有关可用容量的问题,应用程序会跟踪持久存储中的当前容量。可通过多种方法执行此操作。跟踪 SQL Azure 数据库中的所有预订数据是一种可能方法,但是因为 SQL Azure 数据库大小方面存在限制,所以更具伸缩性的方法是使用 Windows Azure blob 或表存储。

预订示例应用程序使用 blob 存储来存储序列化的幂等值对象。此 Capacity 类会跟踪接受的预订,以便可以检测消息重播。为了回答有关剩余容量的问题,应用程序可以加载相应日期的 Capacity 实例,并使用正确的预订 ID 调用 CanReserve 方法:

public bool CanReserve(int quantity, Guid id) 
{ 
  if (this.IsReplay(id)) 
  { 
    return true; 
  } 
  return this.remaining >= quantity; 
} 
  
private bool IsReplay(Guid id) 
{ 
  return this.acceptedReservations.Contains(id); 
}

每个 MakeReservationCommand 都有一个关联 ID。为确保幂等行为,Capacity 类会保存每个接受的预订 ID,以便其可以检测重播。仅当方法调用不是重播时,它才会调用实际业务逻辑,从而将请求的数量与剩余容量进行比较。

应用程序为每个日期序列化并存储一个 Capacity 实例,因此,为了回答餐厅是否具有剩余容量的问题,它会下载 blob 并调用 CanReserve 方法:

public bool HasCapacity(MakeReservationCommand reservation) 
{ 
  return this.GetCapacityBlob(reservation) 
    .DownloadItem() 
    .CanReserve(reservation.Quantity, reservation.Id); 
}

如果结果为“true”,则应用程序会调用与该结果关联的一组操作,如图 6 所示。 第一步是将容量递减,这涉及调用图 7 中显示的 Capacity.Reserve 方法。

图 7 Capacity.Reserve 方法

public Capacity Reserve(int quantity, Guid id) 
{ 
  if (!this.CanReserve(quantity, id)) 
  { 
    throw new ArgumentOutOfRangeException(); 
  } 
  
  if (this.IsReplay(id)) 
  { 
    return this; 
  } 
  
  return new Capacity(this.Remaining - quantity,  
    this.acceptedReservations 
      .Concat(new[] { id }).ToArray()); 
}

这是另一个幂等操作,该操作先调用 CanReserve 和 IsReplay 方法以作为保护。 如果该方法调用表示预订一些容量的真正新的请求,则会返回一个新的 Capacity 实例并将容量递减,并会将 ID 添加到已接受 ID 的列表中。

Capacity 类只是一个值对象,因此必须在操作完成之前将其提交回 Windows Azure blob 存储。 图 8 显示了最初如何从 Windows Azure blob 存储下载原始 blob。

图 8 递减容量并提交给存储

public void Consume(MakeReservationCommand message) 
{ 
  var blob = this.GetCapacityBlob(message); 
  var originalCapacity = blob.DownloadItem(); 
  
  var newCapacity = originalCapacity.Reserve( 
    message.Quantity, message.Id); 
  
  if (!
newCapacity.Equals(originalCapacity)) 
  { 
    blob.Upload(newCapacity); 
    if (newCapacity.Remaining <= 0) 
    { 
      var e = new SoldOutEvent(message.Date); 
      this.channel.Send(e); 
    } 
  } 
}

这是序列化的 Capacity 实例,它对应于所请求的预订的日期。 如果容量发生改变(即不是重播),则会将新的 Capacity 上载回 blob 存储。

如果在此过程中引发了异常,则会发生什么情况? 可能发生这种情况的一种方式是 Capacity 实例自调用 CanReserve 方法以来已更改。 这在大容量方案中并非是不可能发生的,因为在种方案中,会同时处理许多竞争请求。 在这种情况下,由于没有足够剩余容量,Reserve 方法可能会引发异常。 这很正常;这仅意味着此特定预订请求在并发争用中失败。 异常将由图 5 中的异常处理程序捕获,但是因为从不会删除消息,所以该消息稍后会重新出现以供再次处理。 发生这种情况时,CanReserve 方法会立即返回 false,并且可以温和地拒绝该请求。

但是,存在着另一种潜在的并发冲突,如图 8 所示。 当两个后台辅助线程同时更新同一个日期的容量时,会发生什么情况?

使用开放式并发

图 8 中的 Consume 方法从 blob 存储下载 Capacity blob,并在其改变时上载新值。 可能会有许多后台辅助线程同时执行此操作,因此应用程序必须确保一个值不会覆盖另一个值。

Windows Azure 存储基于 REST,因此建议使用 ETag 来处理这种并发问题。 应用程序首次为指定日期创建 Capacity 实例时,ETag 将为 null,但是从存储下载现有 blob 后,该 blob 会具有可通过 CloudBlob.Properties.ETag 提供的 ETag 值。 当应用程序上载 Capacity 实例时,它必须对 BlobRequestOptions 实例正确设置正确的 AccessCondition:

options.AccessCondition = etag == null ?
AccessCondition.IfNoneMatch("*") :  
  AccessCondition.IfMatch(etag);

当应用程序创建 Capacity 的新实例时,ETag 为 null,且 AccessCondition 必须设置为 IfNoneMatch(“*”)。 这可确保在 blob 已存在时会引发异常。 另一方面,如果当前写入操作表示一个更新,则 AccessCondition 必须设置为 IfMatch,这可确保在 blob 存储中的 ETag 与提供的 ETag 不匹配时引发异常。

基于 ETag 的开放式并发是您工具箱中的重要工具,但是必须通过提供相应的 BlobRequestOptions 来显式启用它。

如果在递减容量时未引发异常,则应用程序可以前进到下一步,如图 6 所示:将预订写入表存储。 这会遵循与递减容量基本相同的原理,因此我在此处将其跳过。 附带的下载中提供了代码,但是要点在于,写入操作同样应是幂等的。

该工作流中的最后一步是引发一个表示预订已接受的事件。 这是通过使用 Windows Azure 队列发送另一个异步消息实现的。 关注这个域事件的任何其他后台辅助线程都可以选择该事件并进行处理。 相关操作是向用户发送确认电子邮件,但是应用程序还需要通过更新视图数据存储来结束 UI 循环。

更新视图数据

在处理命令过程中发生的事件将通过 IChannel 接口以异步消息的形式发送。 例如,图 8 中的 Consume 方法在容量减少到零时会引发新的 SoldOutEvent。 其他消息处理程序可以订阅这类事件以正确更新视图数据,如下所示:

public void Consume(SoldOutEvent message) 
{ 
  this.writer.Disable(message.Date); 
}

注入的编写器通过在 blob 存储中更新相应月份的已禁用日期数组来实现 Disable 方法:

public void Disable(DateTime date) 
{ 
  var viewBlob = this.GetViewBlob(date); 
  DateTime[] disabledDates = viewBlob.DownloadItem(); 
  viewBlob.Upload(disabledDates 
    .Union(new[] { date }).ToArray()); 
}

此实现只是从 blob 存储下载已禁用 DateTime 实例的数组,将新日期附加到该数组,然后将该数组再次上载。 因为使用了 Union 方法,所以操作是幂等的,并且 Upload 方法会再次封装基于 ETag 的开放式并发。

查询视图数据

UI 现在可以直接查询视图数据。 这是一种高效操作,因为数据是静态的,无需进行计算。 例如,为了用已禁用的日期更新图 2 中的数据选取器,该数据选取器向控制器发送 AJAX 请求以获取数组。

控制器可以简单地处理该请求如下:

public JsonResult DisabledDays(int year, int month) 
{ 
  var data = this.monthReader.Read(year, month); 
  return this.Json(data, JsonRequestBehavior.AllowGet); 
}

注入的读取器通过读取 SoldOutEvent 处理程序写入的 blob 来实现 Read 方法:

public IEnumerable<string> Read(int year, int month) 
{ 
  DateTime[] disabledDates =  
    this.GetViewBlob(year, month).DownloadItem(); 
  return (from d in disabledDates 
    select d.ToString("yyyy.MM.dd")); 
}

这样,循环结束。用户基于当前视图数据浏览站点,并填写表单以提交通过异步消息传送处理的数据。最后,将基于在工作流过程中引发的域事件对视图数据进行更新。

反规范化数据

综上所述,大多数应用程序读取的数据要远多于写入的数据,因此,可以通过对读取端进行优化来实现可伸缩性,尤其是在可以从静态资源(如 blob)读取数据时。屏幕上呈现的数据总是会断开,这意味着这些数据从呈现时起已经过时。CQRS 通过断开数据的读取和写入来实现这种过时。读取的数据不必直接来自与写入的数据相同的数据源。可以将数据从其写入到的存储异步传输到视图专用数据存储,在此情况下,将仅支付计划和操作数据的费用一次。

借助于这种内置队列和可伸缩的不规范数据存储,Windows Azure 非常适合这种体系结构。虽然不支持分布式事务,但队列可保证绝不会丢失消息,并且将对消息至少处理一次。为了处理潜在重播,所有异步写入操作都必须是幂等的。对于不规范数据(如 blob 和表存储),必须使用 ETag 来实现开放式并发。使用这些简单技术,可以确保最终一致性。

本文只涉及了 CQRS 的一些皮毛。如果您要了解有关 CQRS 的更多信息,可在 Internet 上的众多资源当中涉猎这一知识体系;但是,您不妨先从 Rinat Abdullin 的“CQRS 入门页”(abdullin.com/cqrs) 开始。

Mark Seemann是 Commentor A/S(一家位于哥本哈根的丹麦咨询公司)的 Windows Azure 技术负责人。他还是《Dependency Injection in .NET》(Manning Publications, 2011) 一书的作者,并且是开源项目 AutoFixture 的创建者。其博客网址为 blog.ploeh.dk

衷心感谢以下技术专家对本文的审阅:Rinat AbdullinKarsten Strøbæk