云计算

在 Windows Azure 中同步多个节点

Josh Twist

下载代码示例

云是一项重大的技术变革,业界很多专家预测如此重要的变革大约每 12 年才会发生一次。想想云带来的种种优势,这样的兴奋也就不足为奇了:大幅减少运行成本,高度的可用性以及几近无限的可扩展性,等等。

当然,这样的变革也让业界面临很多挑战,而且面对挑战的不仅仅是如今的开发人员。例如,我们该如何有针对性地构建系统以充分利用云的独特优势?

幸运的是,Microsoft 在二月份推出了 Windows Azure Platform,其中包含很多规模适中的组件,可用于创建既支持海量用户又具备高度可用性的应用程序。但是,要使部署到云中的应用程序完全发挥其潜能,系统的开发人员必须利用可称为云的最大特色的“弹性”。

弹性是云平台的一项属性,可以实现按需配置更多资源(计算能力、存储等等),只需几分钟而不是几个月,就能向 Web 场中添加更多服务器。同样重要的是,移除这些资源也同样迅速。

云计算的一个重要信条就是“即付即用”的业务模型,即您只需为您所用的部分支付费用。使用 Windows Azure,您只需为节点(在虚拟机中运行的 Web 或工作者角色)部署之后的时间段付费,因此可以在不需要时或业务低谷期减少节点数量,从而直接带来成本节约。

因此,开发人员创建富有弹性的系统就变得至关重要,这样的系统可以自动适应配置的更多硬件,而只需系统管理员提供最少的输入或配置。

情景 1:创建订单号

最近,我有幸从事一项概念验证工作,即使用 Windows Azure 将一个现有的 Web 应用程序基础结构转移到云中。

考虑到该应用程序数据的分散性,Windows Azure 表存储是最佳的候选工具。这一简单而高效的存储机制(支持几近无限的可扩展性)是理想的选择,只有一个明显缺点是关于唯一标识符的。

目标应用程序允许客户下订单,以及检索其订单号。使用 SQL Server 或 SQL Azure,很容易生成一个简单的数字格式的唯一标识符,但 Windows Azure 表存储不提供自动递增的主键。相反,使用 Windows Azure 表存储的开发人员需要创建 GUID,并使用它作为表中的“键”:

505EAB78-6976-4721-97E4-314C76A8E47E

使用 GUID 的问题在于很难进行人工处理。想象一下通过电话告诉操作员您的 GUID 订单号,或者在日志中记录这个订单号,该有多么麻烦。当然,GUID 在所有情况下都必须是唯一的,因此它们过于复杂。另一方面,订单号只需要在“订单”表中是唯一的。

在 Windows Azure 中创建简单、唯一的 ID

我考虑了几种相对简单的方法来解决 GUID 问题:

  1. 使用 SQL Azure 生成唯一 ID: 出于多种原因,这项概念验证已经将 SQL Azure 排除在外,不会用于 Windows Azure 表存储,主要是由于需要将系统扩展到很多节点,每个节点上都有很多线程来执行数据操作。
  2. 使用 Blob 存储来管理递增的值: 在 Windows Azure Blob 存储中存储一个集中的计数器。只要提供一个简单、连续的订单号生成机制,供多个节点使用,节点就能读取和更新订单号。但是,这种方法的争议之处在于繁忙的系统每秒都需要很多新的订单号,可能会对系统的可扩展性造成不利影响。
  3. 跨每个节点分隔唯一 ID:创建轻型的内存中计数器,生成唯一的订单号。为了确保在所有节点上的唯一性,每个节点将被分配一个订单号范围,如图 1 所示。

图 1 为每个节点分配订单号范围以确保唯一 ID

节点 Range
0-1,000,000
1,000,001-2,000,000

但是,这种方法伴随着很多问题。当一个节点用完范围内的值时会出现什么情况?当一次向系统中添加数百个节点时会出现什么情况?如果一个节点崩溃并被 Windows Azure 运行时使用新节点替换,将会怎样?管理员需要密切监控这些范围,小心确保配置是正确的,否则就得面对数据损坏。

事实上,我们需要更加完善的方法:不需要在每个节点上进行配置,争用很少,而且始终能保证唯一性。为此,我综合了上述的第二和第三种方法。

我使用的概念相对简单:在 Blob 存储中使用一个小文本文件来存储最后一个订单号。当需要新的订单号时,节点可以访问此 Blob,递增值,然后写回到存储中。当然,在这个“读取-递增-写回”的过程中,很有可能有其他节点也要访问 Blob,进行相同的操作。如果不执行某种并发管理,订单号就不可能是唯一的,数据也会损坏。传统的解决之道是考虑创建一个锁定机制,阻止多个节点同时处理 Blob。但是,锁定的代价高昂,而且如果操作的主要着眼点是吞吐量和高扩展性,就应该避免使用锁定。

相反,能够利用“乐观并发”的方法是最佳的。利用乐观并发,可以实现多个参与者与资源的交互。当资源被一个参与者检索时,参与者会发出一个标记,指示资源的版本。发生更新时,该标记会包含在内,以说明资源的哪个版本正在被修改。如果资源已被另一个参与者修改,则更新将失败,最初的参与者可以检索到最近的版本并尝试再次更新。如果更新很少发生争用,乐观并发就非常有效。这样就可以避免锁定的成本和复杂性,资源也不会被损坏。

让我们假设在高峰时段,系统每秒发出大约 100 个新的订单号。这意味着每秒有 100 次更新 Blob 的请求,发生争用的几率极大,也就意味着要重试很多次,从而使整个情况更加恶化。因此,为了减少这种现象发生的可能性,我决定为订单号分配范围。

我创建了名为 UniqueIdGenerator 的类来封装此行为。这个类通过在可配置的区块中递增值来从 Blob 存储中移除一个范围内的订单号。如果每个 UniqueIdGenerator 每次都预订 1,000 个订单号,则 Blob 可能平均 10 秒更新一次,这样就大大降低了发生争用的几率。每个 UniqueIdGenerator 都可以自由地发出其预订的 1,000 个订单号,确信这个类指向同一 Blob 资源的其他实例不会发出相同的订单号。

为了使这个新的组件可以进行测试,我指定了名为 IOptimisticSyncStore 的接口,使 UniqueIdGenerator 从特定的存储机制分离。这样做有一项额外的好处:在以后,该组件可以在需要时使用不同类型的存储。这就是该接口:

public interface IOptimisticSyncStore
{
  string GetData();
  bool TryOptimisticWrite(string data);
}

如您所见,这是一个相当简单的接口,只有两个方法:一个检索数据,另一个更新数据。后者返回一个布尔值,其中 False 表示乐观并发失败,应该重试过程。

代码下载中包含了使用 Blob 存储的 IOptimisticSyncStore 的实现(详细信息请参见本文结尾)。该实现的大部分内容都很简单,但我们有必要深入研究 TryOptimisticWrite 方法,以了解乐观并发是如何实现的。

借助 Precondition 和实体标记 (ETag),在 Windows Azure Blob 存储中更新资源时,很容易使用乐观并发。Precondition 是开发人员断言为 True 的语句,用以指示 HTTP 请求成功。如果 Web 服务器计算该语句的结果为 False,它应该回应 HTTP 状态代码 412:“Precondition failed”(预分区失败)。ETag 也是 HTTP 规范的一部分,用于标识资源(例如 Blob)的特定版本。如果 Blob 更改,ETag 也应该更改,如下所示:

try
{?
  _blobReference.UploadText(?
    data,?
    Encoding.Default,?
    new BlobRequestOptions { ?
    AccessCondition = AccessCondition.IfMatch(
    _blobReference.Properties.ETag) });?
}

为了在代码中指定 Precondition,我们使用 BlobRequestOptions 类型,并设置 AccessCondition 属性。 如果这一访问条件未得到满足(例如,另一个节点在检索之后的很短时间内更新了 Blob),ETag 将不匹配,并引发 StorageClientException:

catch (StorageClientException exc)
{
  if (exc.StatusCode == HttpStatusCode.PreconditionFailed)
  {
    return false;
  }
  else
  {
    throw;
  }
}
return true;

在本实例中,该实现将检查 PreconditionFailed 状态代码的异常,并返回 False。 其他类型的异常是严重的失败,会被重新引发以进行后续的处理和记录。 没有异常则表示更新成功,该方法会返回 True。 UniqueIdGenerator 类的完整内容如图 2 所示。

图 2 完整的 UniqueIdGenerator 类

public class UniqueIdGenerator
{ 
    private readonly object _padLock = new object();
    private Int64 _lastId;
    private Int64 _upperLimit;
    private readonly int _rangeSize;
    private readonly int _maxRetries;
    private readonly IOptimisticSyncStore _optimisticSyncStore;
    public UniqueIdGenerator(
      IOptimisticSyncStore optimisticSyncStore,
      int rangeSize = 1000,
      int maxRetries = 25)
    {
      _rangeSize = rangeSize;
      _maxRetries = maxRetries;
      _optimisticSyncStore = optimisticSyncStore;?
      UpdateFromSyncStore();
    }
    public Int64 NextId()
    {
      lock (_padLock)
      {
        if (_lastId == _upperLimit)
        {
          UpdateFromSyncStore();
        }
        return _lastId++;
      }
    }
    private void UpdateFromSyncStore()
    {
      int retryCount = 0;
      // maxRetries + 1 because the first run isn't a 're'try.
      while (retryCount < _maxRetries + 1)
      {
        string data = _optimisticSyncStore.GetData();
        if (!Int64.TryParse(data, out _lastId))
        {
          throw new Exception(string.Format(
            "Data '{0}' in storage was corrupt and " +
            "could not be parsed as an Int64", data));
        }
        _upperLimit = _lastId + _rangeSize;
        if (_optimisticSyncStore.TryOptimisticWrite(
          _upperLimit.ToString()))
        {
          return;
        }
        retryCount++;
        // update failed, go back around the loop
      }
      throw new Exception(string.Format(
        "Failed to update the OptimisticSyncStore after {0} attempts",
        retryCount));
    }
}

构造函数使用三个参数。第一个参数是 IOptimisticSyncStore 的实现,例如我们前文讨论的 BlobOptimisticSyncStore。第二个参数是 rangeSize,一个整数值,表示从 Blob 分配的订单号的范围应该有多大。范围越大,发生争用的几率越小。但是,如果此节点崩溃,丢失的订单号也越多。最后一个参数是 maxRetries,一个整数值,表示如果发生乐观并行失败,生成器应该尝试更新 Blob 多少次。超过这个值后,将引发异常。

NextId 方法是 UniqueIdGenerator 类的唯一一个公共成员,用于获取下一个唯一订单号。该方法的主体将进行同步,以确保类的所有实例都是线程安全的,而且在所有运行 Web 应用程序的线程之间共享。一个 if 语句将检查生成器是否达到所分配范围的上限值,如果达到将调用 UpdateFromSyncStore 以从 Blob 存储获取新的范围。

UpdateFromSyncStore 方法是类的最后一部分内容,也是最有趣的内容。IOptimisticSyncStore 的实现用于获取前面发出的分配的上限值。该值按生成器的范围大小递增,并写回到存储。一个简单的“while”循环结束主体,以确保当 TryOptimisticWrite 返回 False 时进行适当次数的重试。

以下代码段显示了所构造的 UniqueIdGenerator,在名为“uniqueids”的容器(注意:Blob 存储中的容器必须使用小写的名称)中使用 BlobOptimisticSyncStore 以及名为“ordernumber.dat”的文件:

IOptimisticSyncStore storage = new BlobOptimisticSyncStore(
  CloudStorageAccount.DevelopmentStorageAccount, 
  "uniqueids", 
  "ordernumber.dat");?UniqueIdGenerator
  generator = new UniqueIdGenerator(storage, 1000, 10);

此实例从集中管理的范围中删除 1,000 个 ID,如果乐观并行失败,在引发异常之前将重试 10 次。

使用 UniqueIdGenerator 甚至更加简单。 无论在哪里,您需要新的唯一订单号,只需调用 NextId:

Int64 orderId = generator.NextId();

示例代码显示了 Windows Azure 工作者角色,它使用多个线程来快速分配唯一订单号,并将订单号写入 SQL 数据库。在此实例中使用 SQL 只是为了证明每个订单号都是唯一的,因为如果不是唯一的,将造成主键冲突,并引发异常。

这种方法(不是创建 Blob 并在应用程序生命周期的开始时将其值设置为 0)的优势在于,系统管理员不需要执行任何工作。UniqueIdGenerator 根据您的设置仔细管理 ID 的分配,失败时能够顺利地恢复,即使在最有弹性的环境中也可以轻松省力地扩展。

情景 2:“放出猎犬!”

该应用程序提出的另一项有趣要求是需要在发生指定事件(在大约已知的时间发生)之后快速处理大量数据。由于处理的性质,必须等到此事件之后才能开始对数据进行处理。

在这种情景中,工作者角色是明显的选择,可以只要求 Windows Azure 配置必要数量的工作者角色,以响应前面提到的事件。但是,配置新角色可能需要长达 30 分钟的时间,而在此情景中,速度是至关重要的。因此,我考虑将角色提前准备好,但处于暂停状态,直到管理员解除暂停(我称之为“放出猎犬!”)。有两种可能的实现方式,我将依次介绍。

应该引起注意的是,因为 Windows Azure 工作者角色会基于其部署的时间(而不是它们如何有效使用 CPU)计算负载,所以此方式与简单地创建响应事件的工作者角色相比,成本要更高。但是,客户会意识到这项投资是值得的,因为它可以确保处理尽快开始。

 方式 I:轮询

第一种方式(如图 3 所示)让每个节点按一定的时间间隔轮询一个集中的状态标志(还是存储在 Windows Azure Blob 中)以确定工作是否能够开始。

图 3 节点轮询集中的状态标志

若要取消节点的暂停状态,客户端应用程序只需将此标志设置为 True,从而使得在随后的轮询中,每个节点都被释放。此方式最主要的缺点是存在延迟,延迟最大可能等于轮询的时间间隔。而另一方面,这又是一种非常简单、可靠的机制,很容易实现。

这种设计可通过示例代码中的 PollingRelease 类演示。为了支持可测试性,标志存储机制抽象在一个接口之后,与 UniqueIdGenerator 类相似。图 4 显示了接口 IGlobalFlag 和相应的 Blob 存储实现。

图 4 IGlobalFlag 接口和 Blob 存储实现

public interface IGlobalFlag
{
  bool GetFlag();
  void SetFlag(bool status);
}
public class BlobGlobalFlag : IGlobalFlag
{
  private readonly string _token = "Set";
  private readonly CloudBlob _blobReference;
  public BlobGlobalFlag(CloudStorageAccount account, string container,    
    string address)
  {
    var blobClient = account.CreateCloudBlobClient();
    var blobContainer =   
      blobClient.GetContainerReference(container.ToLower());
    _blobReference = blobContainer.GetBlobReference(address);
  }
  public void SetFlag(bool status)
  {
    if (status)
   {
      _blobReference.UploadText(_token);
    }
    else
    {
      _blobReference.DeleteIfExists();
    }
  }
  public bool GetFlag()?  {
    try
    {
      _blobReference.DownloadText();
      return true;
    }
    catch (StorageClientException exc)
    {
      if (exc.StatusCode == System.Net.HttpStatusCode.NotFound)
      {
        return false;
      }
      throw;
    }
  }
}

请注意,在本示例中,Bob 存储中存在的文件无论内容是什么都指示 True。

PollingRelease 类本身很简单,如图 5 所示,其中只有一个公共方法,名为 Wait。

图 5 PollingRelease 类

public class PollingRelease 
{
  private readonly IGlobalFlag _globalFlag;
  private readonly int _intervalMilliseconds;
  public PollingRelease(IGlobalFlag globalFlag, 
    int intervalMilliseconds)
  {
    _globalFlag = globalFlag;
    _intervalMilliseconds = intervalMilliseconds;
  }
  public void Wait()
  {
    while (!_globalFlag.GetFlag())?    {
      Thread.Sleep(_intervalMilliseconds);
    }
  }
}

只要 IGlobalFlag 实现指示其状态为 False,此方法就会阻止任何调用者。 以下代码段显示了 PollingRelease 类的使用:

BlobGlobalFlag globalFlag = new BlobGlobalFlag(
  CloudStorageAccount.DevelopmentStorageAccount,
  "globalflags",
  "start-order-processing.dat");
PollingRelease pollingRelease = new PollingRelease(globalFlag, 2500);
pollingRelease.Wait();

创建一个 BlobGlobalFlag 实例,指向名为“globalflags”的容器。PollingRelease 类将每隔 2.5 秒轮询一次,查看是否存在名为“start-order-processing.dat”的文件。在此文件存在之前,所有对 Wait 方法的调用都将被阻止。

 方式 II:侦听

第二种方式使用 Windows Azure AppFabric 服务总线同时与所有工作者角色直接通信并释放它们(请参见图 6)。 

图 6 使用 Windows Azure AppFabric 服务总线同时与所有工作者角色通信

服务总线是大型消息传送和连接服务,也构建在 Windows Azure 之上。它可以实现分布式应用程序的不同组件之间的安全通信。如果两个应用程序位于网络地址转换 (NAT) 边界之后或者经常更改 IP 地址,则它们之间将很难相互通信,而服务总线就是一种理想的连接方式。本文不会详细探讨 Windows Azure AppFabric 服务总线,但您可以从 MSDN 的 msdn.microsoft.com/library/ee706736 获得一份优秀的教程。

为了演示这种方式,我创建了一个名为 ListeningRelease 的类,它与 PollingRelease 类似,也有一个名为 Wait 的公共方法。此方法连接到服务总线,使用 ManualResetEvent 来阻止线程,直到其收到信号:

public void Wait()
{
  using (ConnectToServiceBus())
  {
    _manualResetEvent.WaitOne();
  }
}

图 7 列出了完整的 ConnectToServiceBus 方法。它使用来自 System.ServiceModel 和 Microsoft.ServiceBus 程序集的类型,通过 Windows Azure AppFabric 服务总线向云提供一个名为 UnleashService 的类,如图 8 所示。

图 7 ConnectToServiceBus 方法

private IDisposable ConnectToServiceBus()
{
  Uri address = ServiceBusEnvironment.CreateServiceUri("sb",  
    _serviceNamespace, _servicePath);
  TransportClientEndpointBehavior sharedSecretServiceBusCredential =  
    new TransportClientEndpointBehavior();
  sharedSecretServiceBusCredential.CredentialType =  
    TransportClientCredentialType.SharedSecret;
  sharedSecretServiceBusCredential.Credentials.SharedSecret.
    IssuerName = _issuerName;
  sharedSecretServiceBusCredential.Credentials.SharedSecret.
    IssuerSecret = _issuerSecret;
  // Create the single instance service, which raises an event
  // when the signal is received.
  UnleashService unleashService = new UnleashService();
  unleashService.Unleashed += new  
    EventHandler(unleashService_Unleashed);
  // Create the service host reading the configuration.
  ServiceHost host = new ServiceHost(unleashService, address);
  IEndpointBehavior serviceRegistrySettings = 
    new ServiceRegistrySettings(DiscoveryType.Public);
  foreach (ServiceEndpoint endpoint in host.Description.Endpoints)
  {
    endpoint.Behaviors.Add(serviceRegistrySettings);
    endpoint.Behaviors.Add(sharedSecretServiceBusCredential);
  }
  host.Open();
  return host;
}

图 8 UnleashService 类

[ServiceBehavior(InstanceContextMode= InstanceContextMode.Single)]
public class UnleashService : IUnleashContract
{
  public void Unleash()
  {
    OnUnleashed();
  }
  protected virtual void OnUnleashed()
  {
    EventHandler temp = Unleashed;
    if (temp != null)
    {
      temp(this, EventArgs.Empty);
    }
  }
  public event EventHandler Unleashed;
}

UnleashService 由 Windows Communication Foundation (WCF) 作为单个实例托管,实现 IUnleashService 约定,只有一个方法:Unleash。 ListeningRelease 通过前面所示的 Unleashed 事件侦听此方法的调用。 当 ListeningRelease 类观察到此事件时,将设置目前阻止对 Wait 所有调用的 ManualResetEvent,并且所有被阻止的线程都将释放。

在该服务的配置中,我使用了 NetEventRelayBinding,它支持通过服务总线的多播,允许任意数量的发布者和订阅者通过单个端点通信。 这种性质的广播通信要求所有操作都是单向的,如 IUnleashContract 接口所演示的:

[ServiceContract]
public interface IUnleashContract
{
  [OperationContract(IsOneWay=true)]
  void Unleash();
}

端点使用“共享机密”(用户名和复杂密码)进行保护。有了这些细节信息,任何能够访问 Internet 的客户端都可以调用 Unleash 方法,包括示例中提供的管理员控制台(如图 9 所示)。

图 9 管理员控制台

尽管 ListeningRelease 方式避免了 PollingRelease 类中固有的延迟问题,但还是会存在某种延迟。但是,侦听方式的主要弊端在于它的无状态性质,任何在释放信号传递之后配置的节点都将看不到这个事件,因而保持暂停状态。当然,显而易见的解决方法就是综合利用服务总线和 Blob 存储中的全局标志,但我把这个作为练习留给读者。

示例代码

本文随附的示例解决方案可从 code.msdn.microsoft.com/mag201011Sync 获得,它包括一个 ReadMe 文件,其中列出了各种先决条件以及设置和配置说明。该示例在一个工作者角色中使用了 ListeningRelease、PollingRelease 和 UniqueIdGenerator。

Josh Twist  是英国开发技术支持服务团队的首席应用程序开发经理,可以在 thejoyofcode.com 上找到他撰写的博客文章。

衷心感谢以下技术专家对本文的审阅:David Goon、Morgan SkinnerWade Wegner