你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用服务总线消息传递改进性能的最佳实践

本文介绍如何使用 Azure 服务总线在交换中转消息时优化性能。 本文的第一部分介绍提高性能的不同机制。 第二部分指导用户针对给定的场景以能够提供最佳性能的方式使用服务总线。

在本文中,术语“客户端”是指任何访问服务总线的实体。 客户端可以充当发送方或接收方的角色。 术语“发送方”用于向服务总线队列或主题发送消息的服务总线队列客户端或主题客户端。 术语“接收方”是指从服务总线队列或订阅接收消息的服务总线队列客户端或订阅客户端。

资源计划和注意事项

与任何技术资源一样,谨慎的计划是确保 Azure 服务总线提供应用程序所需性能的关键所在。 服务总线命名空间的正确配置或拓扑取决于涉及应用程序体系结构的各种因素,以及如何使用每种服务总线功能。

定价层

服务总线提供各种定价层。 建议根据应用程序要求选取相应的层。

  • 标准层 - 适用于开发人员/测试环境或低吞吐量场景,其中应用程序对限制不敏感。

  • 高级层 - 适用于具有各种吞吐量要求的生产环境,其中需要可预测的延迟和吞吐量。 此外,服务总线高级命名空间可以自动缩放,也可以启用,以适应吞吐量高峰。

注意

如果未选择相应的层,则服务总线命名空间可能会产生巨大的风险,从而导致限制

限制不会导致数据丢失。 利用服务总线 SDK 的应用程序可以利用默认的重试策略,以确保服务总线最终接受数据。

计算高级版的吞吐量

发送到服务总线的数据串行化为二进制,然后在接收方收到时反序列化。 因此,当应用程序将消息视为原子工作单元时,服务总线会以字节(或兆字节)为单位来度量吞吐量。

计算吞吐量要求时,请考虑发送到服务总线的数据(流入量)以及从服务总线接收的数据(流出量)。

与预期一样,可一起批处理的较小消息有效负载的吞吐量会更高。

基准

下面是一个 GitHub 示例,可以运行该示例来查看 SB 命名空间接收的预期吞吐量。 在基准测试中,我们观察到每个消息传送单元 (MU) 的流入量和流出量大约为 4 MB/秒。

基准测试示例不使用任何高级功能,因此,应用程序观察到的吞吐量会因场景而有所不同。

计算注意事项

使用某些服务总线功能可能需要计算利用率,这可能会降低预期吞吐量。 其中一些功能是:

  1. 会话。
  2. 在单个主题上展开多个订阅。
  3. 在单个订阅上运行多个筛选器。
  4. 计划的消息。
  5. 延迟消息。
  6. 事务。
  7. 重复数据删除和回溯时间范围。
  8. 转发到(从一个实体转发到另一个)。

如果应用程序利用了上述任何功能,但没有收到预期的吞吐量,可以查看 CPU 使用量指标,并考虑纵向扩展服务总线高级命名空间。

还可以利用 Azure Monitor 自动缩放服务总线命名空间

跨命名空间分片

虽然纵向扩展分配到命名空间的计算(消息传送单元)是更简单的解决方案,但这可能不会使吞吐量线性增长。 这是因为服务总线内部(存储、网络等)可能会限制吞吐量。

在这种情况下,清理解决方案是在不同服务总线高级命名空间中分片实体(队列和主题)。 还可以考虑在不同 Azure 区域中跨不同命名空间分片。

协议

服务总线支持客户端通过以下三种协议之一发送和接收消息:

  1. 高级消息队列协议 (AMQP)
  2. 服务总线邮件协议 (SBMP)
  3. 超文本传输协议 (HTTP)

AMQP 最有效,因为它可以保持与服务总线的连接。 它还实现批处理预提取。 除非明确提到,本文中的所有内容都假定使用 AMQP 或 SBMP。

重要

SBMP 仅适用于 .NET Framework。 AMQP 是 .NET Standard 的默认设置。

选择适当的服务总线 .NET SDK

Azure.Messaging.ServiceBus 包是自 2020 年 11 月起可用的最新 Azure 服务总线 .NET SDK。 有两个较旧的 .NET SDK 将继续收到严重的 bug 修补程序,但我们强烈建议改用最新的 SDK。 若要详细了解如何从较旧的 SDK 迁移,请阅读迁移指南

NuGet 包 主命名空间 平台最低版本 协议
Azure.Messaging.ServiceBus(最新) Azure.Messaging.ServiceBus
Azure.Messaging.ServiceBus.Administration
.NET Core 2.0
.NET Framework 4.6.1
Mono 5.4
Xamarin.iOS 10.14
Xamarin.Mac 3.8
Xamarin.Android 8.0
通用 Windows 平台 10.0.16299
AMQP
HTTP
Microsoft.Azure.ServiceBus Microsoft.Azure.ServiceBus
Microsoft.Azure.ServiceBus.Management
.NET Core 2.0
.NET Framework 4.6.1
Mono 5.4
Xamarin.iOS 10.14
Xamarin.Mac 3.8
Xamarin.Android 8.0
通用 Windows 平台 10.0.16299
AMQP
HTTP
WindowsAzure.ServiceBus(旧版) Microsoft.ServiceBus
Microsoft.ServiceBus.Messaging
.NET Framework 4.6.1 AMQP
SBMP
HTTP

若要详细了解最低的 .NET Standard 平台支持,请参阅 .NET 实现支持

重用工厂和客户端

与服务交互的服务总线对象(例如 ServiceBusClientServiceBusSenderServiceBusReceiverServiceBusProcessor)应作为单一实例为依赖项注入而注册(或实例化一次并共享)。 可以通过 ServiceBusClientBuilderExtensions 为依赖项注入而注册 ServiceBusClient。

建议你在发送或接收每条消息后不要关闭或释放这些对象。 关闭或释放特定于实体的对象 (ServiceBusSender/Receiver/Processor) 会导致指向服务总线服务的链接断开。 释放 ServiceBusClient 会导致与服务总线服务的连接断开。

以下注意事项适用于所有 SDK:

注意

建立连接是一项成本高昂的操作,可通过针对多个操作重复使用相同的工厂或客户端对象来避免这一操作。 这些客户端对象可安全地用于并发异步操作及从多个线程安全地使用。

并发操作

发送、接收、删除等操作需要一段时间。 这一时间包括服务总线服务处理该操作的时间,外加延迟处理请求和响应的时间。 若要增加每次操作的数目,操作必须同时执行。

客户端通过执行异步操作来计划并发操作。 前一个请求完成之前便启动下一个请求。 以下代码片段是异步发送操作的示例:

var messageOne = new ServiceBusMessage(body);
var messageTwo = new ServiceBusMessage(body);

var sendFirstMessageTask =
    sender.SendMessageAsync(messageOne).ContinueWith(_ =>
    {
        Console.WriteLine("Sent message #1");
    });
var sendSecondMessageTask =
    sender.SendMessageAsync(messageTwo).ContinueWith(_ =>
    {
        Console.WriteLine("Sent message #2");
    });

await Task.WhenAll(sendFirstMessageTask, sendSecondMessageTask);
Console.WriteLine("All messages sent");

以下代码是异步接收操作的示例。

var client = new ServiceBusClient(connectionString);
var options = new ServiceBusProcessorOptions 
{

      AutoCompleteMessages = false,
      MaxConcurrentCalls = 20
};
await using ServiceBusProcessor processor = client.CreateProcessor(queueName,options);
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;

static Task ErrorHandler(ProcessErrorEventArgs args)
{
    Console.WriteLine(args.Exception);
    return Task.CompletedTask;
};

static async Task MessageHandler(ProcessMessageEventArgs args)
{
    Console.WriteLine("Handle message");
    await args.CompleteMessageAsync(args.Message);
}

await processor.StartProcessingAsync();

接收模式

在创建队列或订阅客户端时,可以指定接收模式:扫视-锁定接收和删除。 默认接收模式是 PeekLock。 在默认模式下操作时,客户端会发送从服务总线接收消息的请求。 客户端收到消息后,将发送完成消息的请求。

将接收模式设置为 ReceiveAndDelete 时,这两个步骤会合并到单个请求中。 这些步骤减少了操作的总体数目,并可以提高总消息吞吐量。 性能提高的同时也出现丢失消息的风险。

服务总线不支持“接收并删除”操作的事务。 此外,在客户端想要延迟消息或将其放入死信队列的情况下,需要使用扫视-锁定语义。

客户端批处理

客户端批处理允许队列或主题客户端延迟一段时间发送消息。 如果客户端在这段时间内发送其他消息,则会将这些消息以单个批次传送。 客户端批处理还会导致队列或订阅客户端将多个完成请求批处理为单个请求。 批处理仅适用于异步发送完成操作。 同步操作会立即发送到服务总线服务。 不会针对扫视或接收操作执行批处理,也不会跨客户端执行批处理。

.NET Standard SDK 的批处理功能尚未公开可供操作的属性。

批处理存储访问

为了增加队列、主题或订阅的吞吐量,服务总线在写入其内部存储时会对多条消息进行批处理。

  • 如果对队列启用了批处理,则将消息写入存储以及从存储中删除消息都将批量进行。
  • 如果对主题启用了批处理,则会将消息批量写入存储。
  • 如果对订阅启用了批处理,则会从存储区批量删除消息。
  • 如果对实体启用了批量存储访问,服务总线会将此实体的存储写入操作延迟多达 20 毫秒的时间。

注意

使用批处理不存在丢失消息的风险,即使在 20 毫秒的批处理间隔结束时出现服务总线故障,也是如此。

在此间隔期间发生的其他存储操作会被添加到此批中。 批量存储访问仅影响发送和完成操作;接收操作不会受到影响 。 批量存储访问是实体上的一个属性。 将跨所有启用了批量存储访问的实体实施批处理。

在创建新队列、主题或订阅时,默认情况下启用批量存储访问。

若要禁用批量存储访问,需要 ServiceBusAdministrationClient 的实例。 根据队列说明创建 CreateQueueOptions,以便将 EnableBatchedOperations 属性设置为 false

var options = new CreateQueueOptions(path)
{
    EnableBatchedOperations = false
};
var queue = await administrationClient.CreateQueueAsync(options);

批量存储访问不影响可计费的消息传送操作的数目。 它是队列、主题或订阅的一个属性。 它不依赖于接收模式以及客户端和服务总线服务之间所使用的协议。

预提取

预提取允许队列或订阅客户端在接收消息时从服务加载其他消息。 客户端将这些消息存储在本地缓存中。 缓存的大小取决于 QueueClient.PrefetchCountSubscriptionClient.PrefetchCount 属性。 启用预提取的每个客户端维护其自己的缓存。 客户端之间不共享缓存。 如果客户端启动接收操作,而其缓存是空的,则服务会传输一批消息。 批的大小等于缓存的大小或 256 KB,以二者中较小者为准。 如果客户端启动接收操作,并且缓存中包含一条消息,则从缓存中提取该消息。

预提取一条消息后,服务将锁定此预提取的消息。 通过此锁定操作,其他接收方则无法接收到此预提取的消息。 如果接收方在锁定过期之前无法完成此消息,则该消息便对其他接收方可用。 预提取的消息的副本则保留在缓存中。 使用过期的缓存副本的接收方会在尝试完成该消息时接收到一个异常。 默认情况下,消息锁定在 60 秒后过期。 这一值可延长到 5 分钟。 若要防止使用过期消息,请将缓存大小设置为小于客户端可在锁超时间隔内使用的消息数。

使用 60 秒的默认锁定时限时,PrefetchCount 的合理值是工厂所有接收方最大处理速率的 20 倍。 例如,某个工厂创建了 3 个接收方,并且每个接收方每秒可以处理最多 10 个消息。 预提取计数不应超过 20 X 3 X 10 = 600。 默认情况下,PrefetchCount 设置为 0,这表示不会从服务中提取额外消息。

预提取消息会增加队列或订阅的总体吞吐量,因为它减少了消息操作或往返行程的总数。 但是,提取第一条消息会耗用更长的时间(消息大小增加所致)。 由于预提取的消息已由客户端下载,因此从缓存接收这些消息的速度将变快。

服务器会在向客户端发送消息时检查消息的“生存时间 (TTL)”属性。 收到消息时,客户端不检查消息的 TTL 属性。 即使消息由客户端缓存时该消息的 TTL 已结束,仍可接收该消息。

预提取不会影响可计费的消息传送操作的数目,且仅适用于服务总线客户端协议。 HTTP 协议不支持预提取。 预提取可用于同步和异步接收操作。

预提取和 ReceiveBatch

尽管同时预提取多个消息的概念与成批处理消息 (ReceiveBatch) 的语义类似,但在将这些方法结合使用时,必须注意一些细微的差异。

预提取是客户端(QueueClientSubscriptionClient)上的配置(或模式),ReceiveBatch 是一个操作(具有请求-响应语义)。

将这些方法结合使用时,请考虑以下情况:

  • 预提取的消息数应大于或等于预期从 ReceiveBatch 接收的消息数。
  • 预提取的消息数最多可以是每秒处理的消息数的 n/3 倍,其中 n 为默认的锁定持续时间。

使用贪婪方法(即让预提取计数保持在高水平)时存在一些挑战,因为这意味着消息被锁定到特定接收方。 建议在上面提到的阈值之间尝试预提取值,凭经验识别适合的值。

多个队列

如果单个队列或主题无法满足预期,则使用多个消息实体。 在使用多个实体时,为每个实体创建专用客户端,而不是针对所有实体使用同一个客户端。

开发和测试功能

注意

此部分仅适用于 WindowsAzure.ServiceBus SDK,因为 Microsoft.Azure.ServiceBus 和 Azure.Messaging.ServiceBus 不公开此功能。

服务总线有一项专用于开发但永远不应在生产配置中使用的功能:

将新的规则或筛选器添加到主题时,可通过 TopicDescription.EnableFilteringMessagesBeforePublishing 验证新的筛选器表达式是否可以按预期使用。

方案

以下各节介绍典型的消息传递方案,并概述首选服务总线设置。 吞吐速率分为小(小于 1 条消息/秒)、中等(1 条消息/秒或更大,但不超过 100 条消息/秒)和高(100 条消息/秒或更大)。 客户端数分为小(5 个或更少)、中等(5 个以上但小于或等于 20 个)和大(超过 20 个)。

高吞吐量队列

目标:将单个队列的吞吐量最大化。 发送方和接收方的数目较小。

  • 如要增加面向队列的总发送速率,则使用多个消息工厂来创建发送方。 为每个发送方使用异步操作或多个线程。
  • 如要增加从队列接收的总体接收速率,则使用多个消息工厂来创建接收方。
  • 使用异步操作可利用客户端批处理。
  • 将批处理间隔时间设置为 50 毫秒以减少服务总线客户端协议传输的数量。 如果使用多个发送方,则将批处理间隔时间增加到 100 毫秒。
  • 将批量存储访问保留为启用状态。 该访问会增加可将消息写入队列的总速率。
  • 将预提取计数设置为工厂所有接收方最大处理速率的 20 倍。 此计数会减少服务总线客户端协议传输的数量。

多个高吞吐量队列

目标:将多个队列的整体吞吐量最大化。 单个队列的吞吐量中等或高。

要在多个队列之间获得最大的吞吐量,则使用所述设置将单个队列的吞吐量最大化。 此外,使用不同工厂创建向不同的队列发送或从其接收的客户端。

低延迟队列

目标:将队列或主题的延迟时间最小化。 发送方和接收方的数目较小。 队列的吞吐量较小或为中等。

  • 禁用客户端批处理。 客户端会立即发送一条消息。
  • 禁用批量存储访问。 该服务会立即将消息写入存储。
  • 如果使用单个客户端,将预提取计数设置为接收方处理速率的 20 倍。 如果多条消息同时到达队列,服务总线客户端协议会将这些消息全部同时传输。 当客户端收到下一条消息时,该消息便已存在于本地缓存中。 缓存应较小。
  • 如果使用多个客户端,则将预提取计数设置为 0。 通过设置此计数,在第一个客户端仍在处理第一条消息时,第二个客户端可以接收第二条消息。

包含大量发送方的队列

目标:使包含大量发件人的队列或主题的吞吐量最大化。 每个发送方均以中等速率发送消息。 接收方的数目较小。

服务总线允许最多 1000 个与消息传送实体之间的并发连接。 该限制在命名空间级别强制实施,队列数、主题数或订阅数的上限受到单个命名空间的并发连接数的限制。 就队列而言,此数值在发送方和接收方之间共享。 如果发件人需要所有 1000 个连接,则将队列替换为主题和单个订阅。 主题接受来自发送方的最多 1000 个并发连接。 订阅接受来自接收方的其他 1000 个并发连接。 如果需要超过 1000 个并发发送方,则发送方应通过 HTTP 向服务总线协议发送消息。

若要使吞吐量最大化,请执行以下步骤:

  • 如果每个发送方处于不同进程中,则每个进程仅使用单个工厂。
  • 使用异步操作可利用客户端批处理。
  • 使用 20 毫秒的默认批处理间隔时间以减少服务总线客户端协议传输的数量。
  • 将批量存储访问保留为启用状态。 此访问会增加可将消息写入队列或主题的总速率。
  • 将预提取计数设置为工厂所有接收方最大处理速率的 20 倍。 此计数会减少服务总线客户端协议传输的数量。

包含大量接收方的队列

目标:使包含大量接收方的队列或订阅的接收速率最大化。 每个接收方以中等接收速率接收消息。 发送方的数目较小。

服务总线允许最多 1000 个与实体之间的并发连接。 如果队列需要超过 1000 个接收方,则将队列替换为主题和多个订阅。 每个订阅可支持最多 1000 个并发连接。 或者,接收方可通过 HTTP 协议访问队列。

若要使吞吐量最大化,请遵循以下准则:

  • 如果每个接收方处于不同进程中,每个进程仅使用单个工厂。
  • 接收方可使用同步或异步操作。 如果独立接收方的接收速率给定为中等级别,客户端对“完成”请求的批处理不会影响接收方吞吐量。
  • 将批量存储访问保留为启用状态。 此访问会减少实体的总负载。 这还将降低可将消息写入队列或主题的总速率。
  • 将预提取计数设置为较小值(例如,PrefetchCount = 10)。 此计数可防止接收方在其他接收方已缓存大量消息时处于闲置状态。

带有多个订阅的主题

目标:将带有多个订阅的主题的吞吐量最大化。 消息由多个订阅接收,这意味着对所有订阅的组合接收速率比发送速率要大得多。 发送方的数目较小。 每个订阅的接收方的数目较小。

若要使吞吐量最大化,请遵循以下准则:

  • 如要增加面向主题的总发送速率,则使用多个消息工厂来创建发送方。 为每个发送方使用异步操作或多个线程。
  • 如要增加从订阅接收的总体接收速率,则使用多个消息工厂来创建接收方。 为每个接收方使用异步操作或多个线程。
  • 使用异步操作可利用客户端批处理。
  • 使用 20 毫秒的默认批处理间隔时间以减少服务总线客户端协议传输的数量。
  • 将批量存储访问保留为启用状态。 此访问会增加可将消息写入主题的总写入速率。
  • 将预提取计数设置为工厂所有接收方最大处理速率的 20 倍。 此计数会减少服务总线客户端协议传输的数量。

包含大量订阅的主题

目标:使包含大量订阅的主题的吞吐量最大化。 消息由多个订阅接收,这意味着对所有订阅的组合接收速率比发送速率要大得多。 发送方的数目较小。 每个订阅的接收方的数目较小。

如果所有消息都路由到所有订阅,具有大量订阅的主题则通常会公开低的总吞吐量。 这是因为每个消息均被接收了许多次,且主题及其全部订阅中的所有消息均存储在相同的存储内。 此处的假设是,每个订阅的发送方和接收方的数目都较小。 服务总线支持每个主题最多 2,000 个订阅。

若要使吞吐量最大化,则尝试执行以下步骤:

  • 使用异步操作可利用客户端批处理。
  • 使用 20 毫秒的默认批处理间隔时间以减少服务总线客户端协议传输的数量。
  • 将批量存储访问保留为启用状态。 此访问会增加可将消息写入主题的总写入速率。
  • 将预提取计数设置为预期接收速率的 20 倍(以秒为单位)。 此计数会减少服务总线客户端协议传输的数量。