November 2010

Volume 25 Number 11

Cloud Computing - Synchronizing Multiple Nodes in Microsoft Azure

By Josh Twist | November 2010

The cloud represents a major technology shift, and many industry experts predict this change is of a scale we see only every 12 years or so. This level of excitement is hardly surprising when you consider the many benefits the cloud promises: significantly reduced running costs, high availability and almost infinite scalability, to name but a few.

Of course, such a shift also presents the industry with a number of challenges, not least those faced by today’s developers. For example, how do we build systems that are optimally positioned to take advantage of the unique features of the cloud?

Fortunately, Microsoft in February launched Azure, which contains a number of right-sized pieces to support the creation of applications that can support enormous numbers of users while remaining highly available. However, for any application to achieve its full potential when deployed to the cloud, the onus is on the developers of the system to take advantage of what is arguably the cloud’s greatest feature: elasticity.

Elasticity is a property of cloud platforms that allows additional resources (computing power, storage and so on) to be provisioned on-demand, providing the capability to add additional servers to your Web farm in a matter of minutes, not months. Equally important is the ability to remove these resources just as quickly.

A key tenet of cloud computing is the pay-as-you-go business model, where you only pay for what you use. With Azure, you only pay for the time a node (a Web or Worker Role running in a virtual machine) is deployed, thereby reducing the number of nodes when they’re no longer required or during the quieter periods of your business, which results in a direct cost savings.

Therefore, it’s critically important that developers create elastic systems that react automatically to the provision of additional hardware, with minimum input or configuration required from systems administrators.

Scenario 1: Creating Order Numbers

Recently, I was lucky enough to work on a proof of concept that looked at moving an existing Web application infrastructure into the cloud using Azure.

Given the partitioned nature of the application’s data, it was a prime candidate for Azure Table Storage. This simple but high-performance storage mechanism—with its support for almost infinite scalability—was an ideal choice, with just one notable drawback concerning unique identifiers.

The target application allowed customers to place an order and retrieve their order number. Using SQL Server or SQL Azure, it would’ve been easy to generate a simple, numeric, unique identifier, but Azure Table Storage doesn’t offer auto-incrementing primary keys. Instead, 
developers using Azure Table Storage might create a GUID and use this as the “key” in the table: 

505EAB78-6976-4721-97E4-314C76A8E47E

The problem with using GUIDs is that they’re difficult for humans to work with. Imagine having to read your GUID order number out to an operator over the telephone—or make a note of it in your diary. Of course, GUIDs have to be unique in every context simultaneously, so they’re quite complex. The order number, on the other hand, only has to be unique in the Orders table.

Creating a Simple Unique ID in Azure

A number of relatively simple solutions to the GUID problem were considered:

  1. Use SQL Azure to Generate Unique IDs: For a number of reasons, the proof of concept had already discounted SQL Azure in favor of Azure Table Storage—primarily due to the need for the system to scale out to many nodes, each with many threads executing against the data.
  2. Use Blob Storage to Manage an Incrementing Value: Store a central counter in Azure Blob Storage. Nodes could read and update the order number, providing a simple sequential order number generation mechanism for use by multiple nodes. However, the contention at this point for a busy system requiring many new order numbers per second would likely impede the scalability of the system.
  3. Partition Unique IDs Across Each Node: Create a lightweight in-memory counter that generates unique order numbers. In order to ensure uniqueness across all nodes, each node would be allocated a range of order numbers, as shown in Figure 1.

Figure 1 Allocating a Range of Order Numbers to Each Node to Ensure Unique IDs

Node Range
A 0-1,000,000
B 1,000,001-2,000,000

However, this approach raises a number of questions. What happens when a node exhausts a range? What happens when hundreds of nodes are added to the system at one time? What if a node crashes and is replaced by the Azure runtime with a fresh node? Administrators would need to closely monitor these ranges and be careful to ensure the configuration is correct or face data corruption.

Instead, a much more elegant approach was needed—a solution that required no per-node configuration, demonstrated little contention and guaranteed uniqueness at all times. To achieve this, I created a hybrid of the second and third options. 

The concept was relatively straightforward: use a small text file in blob storage to store the last order number. When a new order number is required, a node can access this blob, increment the value and write it back to storage. Of course, there’s a reasonable chance that another node will have accessed the blob with the same intention during this read-increment-write process. Without some kind of concurrency management, the order numbers wouldn’t be unique and the data would be corrupt. Traditionally, I might have considered creating a locking mechanism that prevents multiple nodes from working with the blob simultaneously. However, locks are expensive, and if throughput and massive scalability are guiding themes for the implementation, they’re to be avoided.

Instead, an approach using optimistic concurrency is favorable. With optimistic concurrency, we allow multiple actors to interact with a resource. When the resource is retrieved by an actor, the actor is also issued a token that indicates the version of the resource. When an update takes place, the token can be included to indicate which version of the resource is being modified. If the resource has already been modified by another actor, then the update will fail and the original actor can retrieve the latest version and try the update again. Optimistic concurrency works well provided the chances of contention on updates are low. The cost and complexity of a lock is avoided and the resource is protected from corruption.

Imagine that, during peak times, the system issues about 100 new order numbers per second. This would mean 100 requests to update the blob every second, causing an extremely high chance of contention, which would mean many retries, exacerbating the situation. Therefore, to reduce the possibility of this occurring, I decided to allocate order numbers in ranges.

A class called the UniqueIdGenerator was created to encapsulate this behavior. The class removes a range of order numbers from blob storage by incrementing the value in configurable chunks. If each UniqueIdGenerator was to reserve 1,000 order numbers at a time, the blob would only be updated on average every 10 seconds, significantly reducing the chances of contention. Each UniqueIdGenerator is free to issue its reserved 1,000 order numbers at will, confident that no other instance of the class pointing to the same blob resource will issue the same order number.

In order to make this new component testable, an interface called IOptimisticSyncStore was specified that de-coupled the UniqueIdGenerator from the specific storage mechanism. This has an added advantage: In the future, the component could use a different type of storage where appropriate. Here’s the interface:

public interface IOptimisticSyncStore
{
  string GetData();
  bool TryOptimisticWrite(string data);
}

As you can see, it’s quite a simple interface with just two methods: one to retrieve the data and another to update it, the latter returning a Boolean where false indicates that there was an optimistic concurrency failure and the process should be retried.

An implementation of IOptimisticSyncStore that uses blob storage is available in the code download (details at the end of the article). For the most part, the implementation is simple; however, it’s worth looking at the TryOptimisticWrite method in more detail to understand how optimistic concurrency has been implemented.

It’s simple to use optimistic concurrency when updating resources in Azure Blob Storage, thanks to Preconditions and Entity Tags (ETags). A Precondition is a statement a developer asserts must be true for an HTTP request to succeed. If the Web server evaluates the statement to false, it should respond with an HTTP Status Code 412: “Precondition failed.” ETags are also part of the HTTP specification and identify a particular version of a resource, such as a blob. If the blob changes, the ETag should change also, as shown here:

try
{?
  _blobReference.UploadText(?
    data,?
    Encoding.Default,?
    new BlobRequestOptions { ?
    AccessCondition = AccessCondition.IfMatch(
    _blobReference.Properties.ETag) });?
}

To specify a Precondition in code, we use the BlobRequestOptions type and set the AccessCondition property. If this access condition isn’t satisfied (for example, if another node updated the blob in the short time since it was retrieved), the ETags wouldn’t match and a StorageClientException would be thrown:

catch (StorageClientException exc)
{
  if (exc.StatusCode == HttpStatusCode.PreconditionFailed)
  {
    return false;
  }
  else
  {
    throw;
  }
}
return true;

The implementation checks the exception for the PreconditionFailed status code and returns false in this instance. Any other type of exception is a serious failure and is rethrown for handling and logging further on. No exception means the update took place and the method returns true. The full listing for the UniqueIdGenerator class is shown in Figure 2.

Figure 2 The Full UniqueIdGenerator Class

public class UniqueIdGenerator
{ 
    private readonly object _padLock = new object();
    private Int64 _lastId;
    private Int64 _upperLimit;
    private readonly int _rangeSize;
    private readonly int _maxRetries;
    private readonly IOptimisticSyncStore _optimisticSyncStore;
    public UniqueIdGenerator(
      IOptimisticSyncStore optimisticSyncStore,
      int rangeSize = 1000,
      int maxRetries = 25)
    {
      _rangeSize = rangeSize;
      _maxRetries = maxRetries;
      _optimisticSyncStore = optimisticSyncStore;?
      UpdateFromSyncStore();
    }
    public Int64 NextId()
    {
      lock (_padLock)
      {
        if (_lastId == _upperLimit)
        {
          UpdateFromSyncStore();
        }
        return _lastId++;
      }
    }
    private void UpdateFromSyncStore()
    {
      int retryCount = 0;
      // maxRetries + 1 because the first run isn't a 're'try.
      while (retryCount < _maxRetries + 1)
      {
        string data = _optimisticSyncStore.GetData();
        if (!Int64.TryParse(data, out _lastId))
        {
          throw new Exception(string.Format(
            "Data '{0}' in storage was corrupt and " +
            "could not be parsed as an Int64", data));
        }
        _upperLimit = _lastId + _rangeSize;
        if (_optimisticSyncStore.TryOptimisticWrite(
          _upperLimit.ToString()))
        {
          return;
        }
        retryCount++;
        // update failed, go back around the loop
      }
      throw new Exception(string.Format(
        "Failed to update the OptimisticSyncStore after {0} attempts",
        retryCount));
    }
}

The constructor takes three parameters. The first is an implementation of IOptimisticSyncStore, such as our BlobOptimisticSyncStore discussed previously. The second parameter is rangeSize, an integer value that indicates how large the range of numbers allocated from the blob should be. The larger this range, the less chance of contention. However, more numbers would be lost if this node were to crash. The final parameter is maxRetries, an integer value that indicates how many times the generator should attempt to update the blob in the event of an optimistic concurrency failure. Beyond this point, an exception is raised.

The NextId method is the only public member of the UniqueIdGenerator class and is used to fetch the next unique number. The body of the method is synchronized to ensure that any instance of the class is thread-safe and could, for example, be shared among all the threads running your Web application. An if statement checks to see if the generator has reached the upper limit of its range allocation and, if so, calls UpdateFromSyncStore to fetch a new range from blob storage.

The UpdateFromSyncStore method is the final but most interesting part of the class. The implementation of IOptimisticSyncStore is used to fetch the upper limit of the previous allocation issued. The value is incremented by the generator’s range size, and this is written back to storage. A simple “while” loop encloses the body to ensure that the appropriate number of retries takes place if TryOptimisticWrite returns false.

The following code snippet shows a UniqueIdGenerator being constructed, using a BlobOptimisticSyncStore with a file called “ordernumber.dat” in a container called “uniqueids” (note: containers in blob storage must have lowercase names):

IOptimisticSyncStore storage = new BlobOptimisticSyncStore(
  CloudStorageAccount.DevelopmentStorageAccount, 
  "uniqueids", 
  "ordernumber.dat");?UniqueIdGenerator
  generator = new UniqueIdGenerator(storage, 1000, 10);

This instance removes 1,000 IDs from the central range and will retry 10 times in the event of an optimistic concurrency failure before throwing an exception.

Using the UniqueIdGenerator is even simpler. Wherever you need a new unique order number, simply call NextId:

Int64 orderId = generator.NextId();

The sample code shows a Azure Worker Role that uses multiple threads to quickly allocate unique order numbers and write them to a SQL database. The use of SQL in this instance is simply to prove that every order number is unique—any violation of this would result in a Primary Key violation and throw an exception. 

The advantage to this approach—other than creating the blob and setting its value to 0 at the very start of the application’s lifetime—is that no effort is required of the systems administrator. The UniqueIdGenerator carefully manages the allocation of IDs based on your settings, recovers gracefully in the event of a failure and scales effortlessly even in the most elastic of environments.

Scenario 2: Release the Hounds!

Another interesting requirement posed by the application was the need to rapidly process large amounts of data following a specified event that would occur at an approximately known time. Due to the nature of the processing, work couldn’t commence on any of the data until after this event.

Worker Roles are an obvious choice in this scenario, and it would have been possible to simply ask Azure to provision the necessary number of Worker Roles in response to the aforementioned event. However, provisioning new roles can take as long as 30 minutes, and speed was of the essence in this scenario. 
Therefore, it was decided that the roles would be hydrated in advance but in a paused state until released by an administrator—I called this “Release the Hounds!” Two possible approaches were considered, and I’ll review each in turn.

It should be noted that, because Azure Worker Roles are charged based on the time they’re deployed (not on how actively they use the CPU), this approach would cost more compared to simply creating the Worker Roles in response to the event. However, the customer was clear that this investment was worthwhile to ensure that processing could begin as quickly as possible.

 Approach I: Polling

The first approach, shown in Figure 3, had each node poll a central status flag at a regular interval (again, stored in a Azure blob) to determine whether work could yet commence.

image: Nodes Polling a Central Status Flag

Figure 3 Nodes Polling a Central Status Flag

To un-pause the nodes, a client application simply had to set this flag to true, and with the subsequent poll, each node would be released. The primary disadvantage of this approach is latency, potentially as large as the polling interval. On the other hand, this is a very simple and reliable mechanism to implement.

This design is demonstrated by the PollingRelease class available in the sample code. In order to support testability, the flag-storage mechanism was abstracted behind an interface in much the same way as for the UniqueIdGenerator class. The interface IGlobalFlag and accompanying implementation for blob storage are shown in Figure 4.

Figure 4 The IGlobalFlag Interface and Implementation for Blob Storage

public interface IGlobalFlag
{
  bool GetFlag();
  void SetFlag(bool status);
}
public class BlobGlobalFlag : IGlobalFlag
{
  private readonly string _token = "Set";
  private readonly CloudBlob _blobReference;
  public BlobGlobalFlag(CloudStorageAccount account, string container,    
    string address)
  {
    var blobClient = account.CreateCloudBlobClient();
    var blobContainer =   
      blobClient.GetContainerReference(container.ToLower());
    _blobReference = blobContainer.GetBlobReference(address);
  }
  public void SetFlag(bool status)
  {
    if (status)
   {
      _blobReference.UploadText(_token);
    }
    else
    {
      _blobReference.DeleteIfExists();
    }
  }
  public bool GetFlag()?  {
    try
    {
      _blobReference.DownloadText();
      return true;
    }
    catch (StorageClientException exc)
    {
      if (exc.StatusCode == System.Net.HttpStatusCode.NotFound)
      {
        return false;
      }
      throw;
    }
  }
}

Notice that in this example, the mere existence of a file in blob storage indicates true, no matter what the content.

The PollingRelease class itself is straightforward, as shown in **Figure 5,**with just one public method called Wait.

Figure 5 The PollingRelease Class

public class PollingRelease 
{
  private readonly IGlobalFlag _globalFlag;
  private readonly int _intervalMilliseconds;
  public PollingRelease(IGlobalFlag globalFlag, 
    int intervalMilliseconds)
  {
    _globalFlag = globalFlag;
    _intervalMilliseconds = intervalMilliseconds;
  }
  public void Wait()
  {
    while (!_globalFlag.GetFlag())?    {
      Thread.Sleep(_intervalMilliseconds);
    }
  }
}

This method blocks any caller as long as the IGlobalFlag implementation indicates that its status is false. The following code snippet shows the PollingRelease class in use:

BlobGlobalFlag globalFlag = new BlobGlobalFlag(
  CloudStorageAccount.DevelopmentStorageAccount,
  "globalflags",
  "start-order-processing.dat");
PollingRelease pollingRelease = new PollingRelease(globalFlag, 2500);
pollingRelease.Wait();

A BlobGlobalFlag instance is created pointing at a container called “globalflags.” The PollingRelease class will poll every 2.5 seconds for the presence of a file called “start-order-processing.dat”; any call to the Wait method will be blocked until this file exists.

 Approach II: Listening

The second approach uses the Azure Service Bus to simultaneously communicate with all worker roles directly and release them (see Figure 6). 

image: Using the Azure Service Bus to Simultaneously Communicate with All Worker Roles

Figure 6 Using the Azure Service Bus to Simultaneously Communicate with All Worker Roles

The Service Bus is a large-scale messaging and connectivity service, also built on Azure. It facilitates secure communication among different components of a distributed application. The Service Bus provides an ideal way to connect two applications that would otherwise find it difficult to communicate, due to their location behind a network address translation (NAT) boundary or a frequently changing IP address, for example. It’s beyond the scope of this article to give a detailed overview of the Azure Service Bus, but an excellent tutorial is available on MSDN at msdn.microsoft.com/library/ee706736.

To demonstrate this approach, a class called ListeningRelease was created that, like PollingRelease, has one public method called Wait. This method connects to the Service Bus and uses a ManualResetEvent to block the thread until a signal is received:

public void Wait()
{
  using (ConnectToServiceBus())
  {
    _manualResetEvent.WaitOne();
  }
}

The full ConnectToServiceBus method is listed in **Figure 7.**It uses types from the System.ServiceModel and Microsoft.ServiceBus assemblies to expose a class called UnleashService to the cloud via the Azure Service Bus, shown in Figure 8.

Figure 7 The ConnectToServiceBus Method

private IDisposable ConnectToServiceBus()
{
  Uri address = ServiceBusEnvironment.CreateServiceUri("sb",  
    _serviceNamespace, _servicePath);
  TransportClientEndpointBehavior sharedSecretServiceBusCredential =  
    new TransportClientEndpointBehavior();
  sharedSecretServiceBusCredential.CredentialType =  
    TransportClientCredentialType.SharedSecret;
  sharedSecretServiceBusCredential.Credentials.SharedSecret.
    IssuerName = _issuerName;
  sharedSecretServiceBusCredential.Credentials.SharedSecret.
    IssuerSecret = _issuerSecret;
  // Create the single instance service, which raises an event
  // when the signal is received.
  UnleashService unleashService = new UnleashService();
  unleashService.Unleashed += new  
    EventHandler(unleashService_Unleashed);
  // Create the service host reading the configuration.
  ServiceHost host = new ServiceHost(unleashService, address);
  IEndpointBehavior serviceRegistrySettings = 
    new ServiceRegistrySettings(DiscoveryType.Public);
  foreach (ServiceEndpoint endpoint in host.Description.Endpoints)
  {
    endpoint.Behaviors.Add(serviceRegistrySettings);
    endpoint.Behaviors.Add(sharedSecretServiceBusCredential);
  }
  host.Open();
  return host;
}

Figure 8 The UnleashService Class

[ServiceBehavior(InstanceContextMode= InstanceContextMode.Single)]
public class UnleashService : IUnleashContract
{
  public void Unleash()
  {
    OnUnleashed();
  }
  protected virtual void OnUnleashed()
  {
    EventHandler temp = Unleashed;
    if (temp != null)
    {
      temp(this, EventArgs.Empty);
    }
  }
  public event EventHandler Unleashed;
}

The UnleashService is hosted by Windows Communication Foundation (WCF) as a single instance and implements the IUnleashService contract, which has just one method: Unleash. ListeningRelease listens for an invocation of this method through the Unleashed event shown earlier. When the ListeningRelease class observes this event, the ManualResetEvent that’s currently blocking any calls to Wait is set and all blocked threads are released.

In the configuration for the service, I used the NetEventRelayBinding, which supports multicasting through the Service Bus, allowing any number of publishers and subscribers to communicate through a single endpoint. The nature of this broadcast communication requires that all operations are one-way, as demonstrated by the IUnleashContract interface:

[ServiceContract]
public interface IUnleashContract
{
  [OperationContract(IsOneWay=true)]
  void Unleash();
}

The endpoint is secured using a Shared Secret (username and complex password). With these details, any client with access to the Internet could invoke the Unleash method—including, for example, the Administrator Console provided in the sample (see Figure 9).

image: The Administrator Console

Figure 9 The Administrator Console

Although the ListeningRelease approach does away with the inherent latency in the PollingRelease class, there’s still some latency to deal with. However, the main disadvantage with the listening approach is its stateless nature, such that any nodes provisioned after the release signal has been transmitted wouldn’t see this event and would remain paused. Of course, an obvious solution might be to combine both the Service Bus and a global flag in blob storage, but I’ll leave that as an exercise for the reader.

Sample Code

The accompanying sample solution is available at code.msdn.microsoft.com/mag201011Sync and includes a ReadMe file that lists the prerequisites and includes the setup and configuration instructions. The sample uses the ListeningRelease, PollingRelease and UniqueIdGenerator in a single Worker Role.


Josh Twist  is a principal application development manager with the Premier Support for Developers team in the United Kingdom.

Thanks to the following technical experts for reviewing this article: David Goon, Morgan Skinner and Wade Wegner