April 2011

Volume 26 Number 04

Azure Development - CQRS on Microsoft Azure

By Mark Seemann | April 2011

Microsoft Azure offers unique opportunities and challenges. There are opportunities in the form of elastic scalability, cost reduction and deployment flexibility, but also challenges because the Azure environment is different from the standard Windows servers that host most Microsoft .NET Framework services and applications today.

One of the most compelling arguments for putting applications and services in the cloud is elastic scalability: You can turn up the power on your service when you need it, and you can turn it down again when demand trails off. On Azure, the least disruptive way to adjust power is to scale out instead of up—adding more servers instead of making existing servers more powerful. To fit into that model of scalability, an application must be dynamically scalable. This article describes an effective approach to building scalable services and demonstrates how to implement it on Azure.

Command Query Responsibility Segregation (CQRS) is a new approach to building scalable applications. It may look different from the kind of .NET architecture you’re accustomed to, but it builds on tried and true principles and solutions for achieving scalability. There’s a vast body of knowledge available that describes how to build scalable systems, but it requires a shift of mindset.

Taken figuratively, CQRS is nothing more than a statement about separation of concerns, but in the context of software architecture, it often signifies a set of related patterns. In other words, the term CQRS can take on two meanings: as a pattern and as an architectural style. In this article, I’ll briefly outline both views, as well as provide examples based on a Web application running on Azure.

Understanding the CQRS Pattern

The underlying terminology for CQRS originates in object-oriented pattern language. A Command is an operation that changes the state of something, whereas a Query is an operation that retrieves information about state. More informally, Commands are writes and Queries are reads.

The CQRS pattern simply states that reads and writes must be explicitly modeled as segregated responsibilities. Writing data is one responsibility, and reading data is another. Most applications need to do both, but as Figure 1 shows, each responsibility must be treated separately.

image: Segregating Reads from Writes

Figure 1 Segregating Reads from Writes

The application writes to a different conceptual system than it reads from.

Obviously, the data that the application writes should eventually end up being available for reading. The CQRS pattern says nothing about how this could happen, but in the simplest possible implementation, the read and write systems could use the same underlying data store.

In this world view, reads and writes are strictly segregated; writes never return data. That seemingly innocuous statement opens up a rich set of opportunities for creating massively scalable applications.

CQRS Architecture Style

In addition to the CQRS pattern, the foundation of the CQRS architectural style is a simple but profound realization about displaying data. Consider Figure 2, which shows the UI for a booking app—imagine that it’s a restaurant reservation system.

image: Display Data Is Stale at the Moment It’s Rendered

Figure 2 Display Data Is Stale at the Moment It’s Rendered

The calendar shows dates in a given month, but some dates are disabled because they’re already fully booked.

How up-to-date is the data in such a UI? In the time it takes to render the data, transport it over the network, and for the user to interpret it and react to it, it may already have changed in the underlying data store. The longer the user looks at it before reacting, the staler it becomes. The user may be interrupted by a phone call or similar distraction before continuing, so think times (the time spent by a user perusing a Web page) may be measured in minutes.

A common way to address this issue is by using optimistic concurrency to handle the cases where conflicts occur. Application developers must write the code to handle such situations, but instead of treating them as exceptional cases, the CQRS architectural style embraces this basic condition. When display data is stale the moment it’s rendered, it doesn’t have to reflect the data in the central data store. Instead, an application can display data from a denormalized data source that may lag a bit behind the “real” data store.

The realization that display data is always stale, coupled with the CQRS principle that writes never return data, results in scalability opportunities. UIs don’t have to wait for data to be written, but can instead just send an asynchronous message and return a view to the user. Background workers pick up the messages and process them at their own pace. Figure 3 shows a more comprehensive view of the CQRS-style architecture.

image: CQRS-Style Architecture

Figure 3 CQRS-Style Architecture

Whenever the application needs to update data, it sends a Command as an asynchronous message—most likely via a durable queue. As soon as the Command is sent, the UI is free to return a view to the user. A background worker picks up the Command message in a separate process and writes any appropriate changes to the data store. As part of this operation, it also raises an event as another asynchronous message. Other message handlers can subscribe to such events and update a denormalized view of the data store accordingly.

Although the view data will lag behind the “real” data, this event propagation often happens so fast that users never notice it. But even if the system slows down due to excessive load, the view data will eventually be consistent.

This sort of architecture can be implemented on many different systems, but with its explicit concepts of Worker Roles and queues, Azure is quite well-suited for it. However, Azure also presents some unique challenges in relation to CQRS; the remainder of this article explores both opportunities and challenges through a sample application.

A Reservation Booking Application

A simple booking application serves as an excellent example of how to implement CQRS on Azure. Imagine that the application takes reservation requests for a restaurant. The first page that meets the user is a date picker, as shown in Figure 2—notice again that some dates are already disabled to indicate that these dates are sold out.

When the user clicks an available date, a reservation form and subsequent receipt is displayed, as shown in Figure 4.

image: Reservation UI Flow

Figure 4 Reservation UI Flow

Notice that the receipt page makes an effort to inform the user that the reservation isn’t guaranteed at this point. The final decision will be communicated via e-mail.

In CQRS, the UI plays an important part in setting expectations because processing happens in the background. However, during normal load, a receipt page buys enough time that when the user moves on, the request has already been handled.

I’ll now demonstrate key points about implementing the sample booking app. As there are many moving parts in even this simple application, I’ll focus on the most interesting code snippets here; the full code is available in the download accompanying this article.

Submitting Commands

The Web Role is implemented as an ASP.NET MVC 2 application. When the user submits the form shown in Figure 4, the appropriate Controller Action is invoked:

[HttpPost] 
public ViewResult NewBooking(BookingViewModel model) 
{ 
  this.channel.Send(model.MakeNewReservation()); 
  return this.View("BookingReceipt", model); 
}

The channel field is an injected instance of this simple IChannel interface:

public interface IChannel 
{ 
  void Send(object message); 
}

The command that the NewBooking method sends through the channel is just the HTML form’s data encapsulated in a Data Transfer Object. The MakeNewReservation method simply transforms the posted data into a MakeReservationCommand instance, as shown here:

public MakeReservationCommand MakeNewReservation() 
{ 
  return new MakeReservationCommand(this.Date, 
    this.Name, this.Email, this.Quantity); 
}

Because the Send method returns void, the UI is free to return an HTML page to the user as soon as the Command is successfully sent. Implementing the IChannel interface on top of a queue ensures that the Send method returns as quickly as possible.

On Azure, we can implement the IChannel interface on top of the built-in queues that are part of Azure Storage. To put messages on such a durable queue, the implementation must serialize the messages. There are many different ways to do this, but to keep things simple I’ve chosen to use the binary serializer built in to the .NET Framework. However, in a production application, you should seriously consider alternatives, as the binary serializer makes it difficult to handle versioning issues. For example, what happens when a new version of your code attempts to deserialize a blob serialized by an old version? Possible alternatives include XML, JSON or Protocol Buffers.

With this technology stack, the implementation of IChannel.Send is simple:

public void Send(object command) 
{ 
  var formatter = new BinaryFormatter(); 
  using (var s = new MemoryStream()) 
  { 
    formatter.Serialize(s, command); 
    var msg = new CloudQueueMessage(s.ToArray()); 
    this.queue.AddMessage(msg); 
  } 
}

The Send method serializes the Command and creates a new CloudQueueMessage out of the resulting byte array. The queue field is an injected instance of the CloudQueue class from the Azure SDK. Initialized with the correct address information and credentials, the AddMessage method adds the message to the appropriate queue. This usually happens surprisingly fast, so when the method returns, the caller is free to perform other work. At the same time, the message is now in the queue, waiting for a background processor to pick it up.

Processing Commands

While the Web Roles are happily displaying HTML and accepting data that they can send via the IChannel interface, Worker Roles receive and process messages from the queue at their own pace. These background workers are stateless, autonomous components, so if they can’t keep up with the incoming messages you can add more instances. This is what provides the massive scalability of messaging-based architecture.

As was previously demonstrated, sending messages over Azure queues is easy. Consuming them in a safe and consistent manner is a bit trickier. Each Command encapsulates an intention to change the state of the application, so the background worker must make sure that no message is lost and that underlying data is changed in a consistent way.

This might be fairly easy to ensure for a queuing technology that supports distributed transactions (such as Microsoft Message Queuing). Azure queues aren’t transactional, but they do come with their own set of guarantees. Messages aren’t lost when read, but rather made invisible for a period of time. Clients should pull a message off the queue, perform the appropriate operations and delete the message as the last step in the process. This is what the sample booking application’s general-purpose Worker Role does; it executes the PollForMessage method shown in Figure 5 in an endless loop.

Figure 5 PollForMessage Method

public void PollForMessage(CloudQueue queue) 
{ 
  var message = queue.GetMessage(); 
  if (message == null) 
  { 
    Thread.Sleep(500); 
    return; 
  } 
  
  try 
  { 
    this.Handle(message); 
    queue.DeleteMessage(message); 
  } 
  catch (Exception e) 
  { 
    if (e.IsUnsafeToSuppress()) 
    { 
      throw; 
    } 
    Trace.TraceError(e.ToString()); 
  } 
}

The GetMessage method may return null if no message is currently in the queue. In that case, the method simply waits 500 milliseconds and returns, in which case it will immediately be invoked again by the endless outer loop. When a message is received, the method handles the message by invoking the Handle method. This is where all the real work is supposed to happen, so if that method returns without throwing an exception, it’s safe to delete the message.

On the other hand, if an exception happens while handling the message, it’s important to suppress the exception; an unhandled exception will crash the entire worker instance and it will stop pulling messages off the queue.

A production-ready implementation needs to be more sophisticated to handle so-called poison messages, but I decided to leave this out of the sample code to keep it simpler.

If an exception is thrown while a message is being processed, it won’t be deleted. After a timeout, it will become available for processing once again. This is the guarantee that Azure queues provide: A message can be processed at least once. As a corollary, it may be replayed several times. Thus, all background workers must be able to handle message replays. It’s essential that all durable write operations are idempotent.

Making Write Operations Idempotent

Every method that handles a message must be able to deal with replays without compromising the state of the application. Handling a MakeReservationCommand is a good example. Figure 6 provides an overview of the message flow.

image: Workflow for Handling the Make Reservation Command

Figure 6 Workflow for Handling the Make Reservation Command

The first thing the application must do is check if the restaurant has enough capacity for the requested date; all tables may already be reserved for the given date, or there may only be a few places left. To answer the question about available capacity, the application tracks the current capacity in durable storage. There are several options for doing this. Tracking all reservation data in a SQL Azure database is one possibility, but as there are limits to the size of SQL Azure databases, a more scalable option is to use either Azure blob or table storage.

The booking sample application uses blob storage to store a serialized idempotent Value Object. This Capacity class keeps track of accepted reservations so that it can detect message replays. To answer the question about remaining capacity, the application can load a Capacity instance for the appropriate day and invoke the CanReserve method with the correct reservation ID:

public bool CanReserve(int quantity, Guid id) 
{ 
  if (this.IsReplay(id)) 
  { 
    return true; 
  } 
  return this.remaining >= quantity; 
} 
  
private bool IsReplay(Guid id) 
{ 
  return this.acceptedReservations.Contains(id); 
}

Each MakeReservationCommand has an associated ID. To ensure idempotent behavior, the Capacity class saves each accepted reservation ID so that it can detect replays. Only if the method call isn’t a replay does it invoke the actual business logic, comparing the requested quantity against the remaining capacity.

The application serializes and stores a Capacity instance for each date, so to answer the question of whether the restaurant has remaining capacity, it downloads the blob and invokes the CanReserve method:

public bool HasCapacity(MakeReservationCommand reservation) 
{ 
  return this.GetCapacityBlob(reservation) 
    .DownloadItem() 
    .CanReserve(reservation.Quantity, reservation.Id); 
}

If the answer is “true,” the application invokes the set of operations associated with that outcome, as shown in Figure 6. The first step is to decrement the capacity, which involves invoking the Capacity.Reserve method shown in Figure 7.

Figure 7 The Capacity.Reserve Method

public Capacity Reserve(int quantity, Guid id) 
{ 
  if (!this.CanReserve(quantity, id)) 
  { 
    throw new ArgumentOutOfRangeException(); 
  } 
  
  if (this.IsReplay(id)) 
  { 
    return this; 
  } 
  
  return new Capacity(this.Remaining - quantity,  
    this.acceptedReservations 
      .Concat(new[] { id }).ToArray()); 
}

This is another idempotent operation that first invokes the CanReserve and IsReplay methods as guards. If the method call represents a genuinely new request to reserve some capacity, a new Capacity instance is returned with the decremented capacity and the ID is added to the list of accepted IDs.

The Capacity class is just a Value Object, so it must be committed back to the Azure blob storage before the operation is complete. Figure 8 shows how the original blob is initially downloaded from Azure blob storage.

Figure 8 Decrementing Capacity and Committing to Storage

public void Consume(MakeReservationCommand message) 
{ 
  var blob = this.GetCapacityBlob(message); 
  var originalCapacity = blob.DownloadItem(); 
  
  var newCapacity = originalCapacity.Reserve( 
    message.Quantity, message.Id); 
  
  if (!newCapacity.Equals(originalCapacity)) 
  { 
    blob.Upload(newCapacity); 
    if (newCapacity.Remaining <= 0) 
    { 
      var e = new SoldOutEvent(message.Date); 
      this.channel.Send(e); 
    } 
  } 
}

This is the serialized Capacity instance that corresponds to the date of the requested reservation. If the capacity changed (that is, it wasn’t a replay) the new Capacity is uploaded back to blob storage.

What happens if exceptions are thrown along the way? One way this could happen would be if the Capacity instance has changed since the CanReserve method was invoked. This is not unlikely in high-volume scenarios where many competing requests are being handled concurrently. In such cases, the Reserve method might throw an exception because there’s not enough remaining capacity. That’s OK; this simply means that this particular reservation request has lost a concurrent race. The exception will be caught by the exception handler in Figure 5, but because the message was never deleted, it will later reappear to be handled once more. When this happens, the CanReserve method will immediately return false and the request can be politely rejected.

However, another potential concurrency conflict lurks in Figure 8. What happens when two background workers update the capacity for the same date at the same time?

Using Optimistic Concurrency

The Consume method in Figure 8 downloads the Capacity blob from blob storage and uploads a new value if it changed. Many background workers may be doing this concurrently, so the application must make sure that one value doesn’t overwrite another.

Because Azure Storage is REST-based, the recommended way to deal with such concurrency issues is to use ETags. The first time the application creates a Capacity instance for a given date, the ETag will be null, but when an existing blob is downloaded from storage, it will have an ETag value available via CloudBlob.Properties.ETag. When the application uploads the Capacity instance, it must correctly set the correct AccessCondition on a BlobRequestOptions instance:

options.AccessCondition = etag == null ?  
  AccessCondition.IfNoneMatch("*") :  
  AccessCondition.IfMatch(etag);

When the application creates a new instance of Capacity, the ETag is null and the AccessCondition must be set to IfNoneMatch(“*”). This ensures that an exception will be thrown if the blob already exists. On the other hand, if the current write operation represents an update, the AccessCondition must be set to IfMatch, which ensures that an exception is thrown if the ETag in blob storage doesn’t match the supplied ETag.

Optimistic concurrency based on ETags is an important tool in your toolbox, but you must explicitly enable it by supplying the appropriate BlobRequestOptions.

If no exception is thrown while decrementing the capacity, the application can move on to the next step in Figure 6: writing the reservation to table storage. This follows roughly the same principles as decrementing the capacity, so I’ll skip it here. The code is available in the accompanying download, but the main point is that, once again, the write operation should be idempotent.

The last step in the workflow is to raise an event that signifies the reservation was accepted. This is done by sending another asynchronous message via the Azure queue. Any other background workers that care about this Domain Event can pick it up and handle it. A relevant action would be to send a confirmation e-mail to the user, but the application also needs to close the loop to the UI by updating the view data store.

Updating View Data

Events that happen during processing of a Command are sent as asynchronous messages via the IChannel interface. As an example, the Consume method in Figure 8 raises a new SoldOutEvent if the capacity is reduced to zero. Other message handlers can subscribe to such events to properly update view data, as shown here:

public void Consume(SoldOutEvent message) 
{ 
  this.writer.Disable(message.Date); 
}

The injected writer implements the Disable method by updating an array of disabled dates for the appropriate month in blob storage:

public void Disable(DateTime date) 
{ 
  var viewBlob = this.GetViewBlob(date); 
  DateTime[] disabledDates = viewBlob.DownloadItem(); 
  viewBlob.Upload(disabledDates 
    .Union(new[] { date }).ToArray()); 
}

This implementation simply downloads an array of disabled DateTime instances from blob storage, appends the new date to the array and uploads it again. Because the Union method is used, the operation is idempotent, and once more the Upload method encapsulates ETag-based optimistic concurrency.

Querying View Data

The UI can now query directly from the view data. This is an efficient operation because the data is static—no calculation is required. For example, to update the date picker in Figure 2 with disabled dates, the date picker sends an AJAX request to the controller to get the array.

The Controller can simply handle the request like this:

public JsonResult DisabledDays(int year, int month) 
{ 
  var data = this.monthReader.Read(year, month); 
  return this.Json(data, JsonRequestBehavior.AllowGet); 
}

The injected reader implements the Read method by reading the blob that the SoldOutEvent handler writes:

public IEnumerable<string> Read(int year, int month) 
{ 
  DateTime[] disabledDates =  
    this.GetViewBlob(year, month).DownloadItem(); 
  return (from d in disabledDates 
    select d.ToString("yyyy.MM.dd")); 
}

Thus the loop is closed. The user browses the site based on current view data and fills out a form to submit data that’s handled via asynchronous messaging. Finally, view data is updated based on the Domain Events raised during the workflow.

Denormalizing Data

Summing up, most applications read data a lot more than they write data, so optimizing the read side enables scalability—particularly when data can be read from static resources such as blobs. Data rendered on a screen is always disconnected, which means that it’s stale from the moment it’s rendered. CQRS embraces this staleness by disconnecting the reading and writing of data. The data being read doesn’t have to come directly from the same data source as the data being written. Instead, data can be asynchronously transported from the store to which it’s written to view-specific data stores where the cost of projecting and manipulating the data is paid only once.

With its built-in queues and scalable, denormalized data stores, Azure is a great fit for this kind of architecture. Even though distributed transactions aren’t supported, queues guarantee that messages are never lost, but are served at least once. To handle potential replays, all asynchronous write operations must be idempotent. For denormalized data such as blob and table storage, ETags must be used to implement optimistic concurrency. Using these simple techniques, Eventual Consistency can be ensured.


Mark Seemann is the Azure technical lead for Commentor A/S, a Danish consulting company based in Copenhagen. He is also the author of the book “Dependency Injection in .NET” (Manning Publications, 2011) and the creator of the open source project AutoFixture. His blog is available at blog.ploeh.dk.

Thanks to the following technical experts for reviewing this article: Rinat Abdullin and Karsten Strøbæk