Thoughts on Queue Centric Workflow Design

 

Where did this come from?

I spend a portion of my job working with large partners to help them architect their applications to run in Windows Azure.  Usually they are coming from an on premises application that was not built for scale out.  One of the most helpful patterns I use is the Queue Centric Workflow pattern.  I have collected email snippets that usually occur after the first design session.  I thought I would collect them in one place to share with partners, and to hopefully help others.

A key aspect of building scale systems is to ensure your application state is kept outside the individual application components.  Windows Azure provides a very robust, scalable queue system built on top of redundant Windows Azure Storage.  This becomes a key to almost every application design I work with today.  Taking advantage of storage services is not covered in this post, perhaps I will go into that in a later post.

What is this pattern?

The Queue Centric Workflow pattern is an essential pattern for loose coupling of cloud based applications. This pattern is a subset of the Command Query Responsibility Separation (CQRS). Reliable queues are built into Windows Azure.

To use this pattern an object should generate a queue entry for work to be completed asynchronously from the primary application. The following challenges are overcome by using this pattern:

· Application decoupling across tiers, but all tiers collaborate using external Azure Queue / Tables

· Application needs to guarantee at least once processing for messages

· Ability to scale processing nodes deterministically based upon amount of outstanding work

There are a lot of other patterns which can be used in conjunction with this one, but I am primarily focused on using queues in this paper.

Azure Storage Queues are Reliable

The Azure Storage Queue is a reliable queue due to the underlying Azure Storage service. The queue data is always persisted across three nodes, and allows for hundreds of calls per second.

The queue itself being reliable does not inherently make your application more durable, your application design still must take into account failures. These failures may come from dependent services, or your own software.

Receiver Asynchronous Processing

The Queue Centric Workflow pattern may be used in many situations where loosely coupled asynchronous processing is desirable. Commands are sent as messages via a queue, which are turn processed on receiver nodes. Receivers may be located on separate physical services, or may be running within the main application. A key tenant is that the queue messages are outside your application and will survive the failure of your app.

The queue has a few very simple operations, remember that the service is just a series of REST calls. The queue is then polled periodically by receiver which will then process the messages. The queue itself is a FIFO queue, but the messages are not processed in order. A message that was returned to the queue from a failed node may be processed out of order from other messages. Do not use the queue as an ordered communication device!

Key operations on messages include:

· Enqueue (put) – Add a message to the queue

· Dequeue (get) – Get the next message from the queue without deleting it

· Peek – Look at the next message without processing it

· Update – Messages may be updated to change their invisibility timeout, or the message

Invisibility Window

Each message, upon being pulled from the queue, is invisible to other receivers for a period of time. This is to ensure that the message is successfully processed. In the event a receiver fails/crashes while processing, the pulled message will become visible again after some time and can be picked up for processing again.

Your receiver application should follow a process similar to this while operating:

· Get next message from the queue

· Verify the message has not been dequeued too many times (see poison message handling)

· Periodically update the queue message during processing to extend the invisibility window

· Delete the message from the queue if it was successfully processed

At-least-once processing

By following the above pattern for processing messages you are ensured each message is processed at least one time. That may seem strange at first, how can messages be processed more than one time? If the machine processing the request is rebooted, crashes, or otherwise fails to complete the processing of the message it will be returned to the queue and another receiver will process it.

Idempotent Processing of Repeated Messages

Since a message may be processed more than once, your application will need to handle that scenario. Idempotence is a mathematical property that says an operation must have the same visible result despite having an operator applied to it twice.

For example if an operation must generate a thumbnail of an image it doesn’t matter if the operation is run twice as long as the same thumbnail is output. There are operational wastes in running the process twice, but the end user will never know it was run twice.

This is true of many messages in decoupled systems. If the message was processed, or partially processed, once it may still have been put back on the queue because the original receiver crashed after they completed the task (and had not yet deleted the message).

If an operation is critical to being processed exactly once additional business logic will need to be included in the application to handle this scenario. Most application logic assumes running for the first time, and additional code may be needed to handle validating stages of processing in a larger task. If this is required the original message may be updated at each stage with a checkpoint marker (“step3” for example) to let a subsequent caller know how far the original processor got with the message.

Poison Message Handling

In some cases the message may be causing the receiver application to crash either due to a code bug, or malicious intent. In either case the application must decide how many times to retry the processing before flagging the request as bad.

This topic ties in directly with Idempotence Processing. An application must ensure that processing of a message twice would leave the system in the same state as if it had only been run once. If this cannot be ensured than any message being dequeued twice would need to be flagged as bad.

A typical implementation would flow something like the following:

· The receiver application pulls a message from the queue and checks the dequeue count

· Check if the message has been attempted more than 3 times, or if the receiver finds a previous output with a crash dump. In either case the message is removed from the queue and placed into a bad message queue.

Poison messages are typically deleted from the primary queue and inserted into a bad message queue (this is sometimes called a dead letter queue because it can’t be delivered successfully).

Scaling Processing Nodes

The size of the queue, and the amount of time required to process each message are useful properties to use for scaling receiver nodes. The receiver application can calculate the time required to process a message and place the result in a shared table. Some application patterns do not lend themselves to a deterministic processing time and may not be able to use this as a metric. Many applications are able to use the queue size to estimate the processing required and determine when to scale up more receiver nodes.

Microsoft provides the Microsoft Enterprise Library Autoscaling Application Block (WASABi) to assist developers in scaling their applications. The application has the following key benefits:

· Allows the graphical Enterprise Library configuration tool to manage configuration settings

· Allows configuration of application blocks though storage locations and logging mechanisms

· Allows application block customization by adding custom autoscaling rules and actions

Use of the application block is not required, however. Developers are free to choose another solution for scaling their application. Some form of auto scale is desirable in many situations where the load can vary, simply to save the company money on consuming resources that are not needed.

Watch out for Money Leaks

Most cloud applications need to balance the need for quick processing (performance), with the real world demands of running resources (costs). Most applications will have to strike a balance between these two. Some ideas to look for are deterministic time periods of load (maybe first thing Monday morning), or periods of inactivity that are predictable and can be used to scale back an application.

As of this writing 1 million storage transactions cost $0.10 (see Azure calculator). Each transaction for a queue operation counts towards that storage transaction (remember that queues and tables are both part of Azure Storage).

Attempting to dequeue messages when there is no work to be completed will still cost you money. Consider adding some form of step off after an unsuccessful dequeue operation to ensure you are not leaking money within your application.

Exponential Backoff is a commonly used pattern to wait longer and longer after each unsuccessful attempt. Applications that do not implement some form of delayed retry handling will waste resources on Azure, and cost the user more money than is needed for operations.

Summary

I hope this paper has sparked some thoughts in your mind about your own applications. There are many papers and books on the topic of Cloud Architecture and scaling. I have a few broad recommendations for topics you may want to research further.

Decoupling of application tiers – This is very important for cloud applications to ensure a backlog or outage in one tier does not bring the entire application down. It also allows for bursting of the layers that need additional resources, rather than requiring all tiers be scaled.

Guaranteed Processing – All message based systems need a good way to ensure all messages are processed at least once. Windows Azure has really good reliable queues for this purpose. You may want to research how other distributed systems handle messages.

 

 

Example Application Design

 

Example Processing System 

The above diagram is an example of using an Azure Queue to control Worker Roles during processing of input.  A full write up of this architecture is provided in the blog post Proof of Concept to test scalability of stream data processing in Azure.