编写使用主题来发送和接收消息的代码

已完成

在分布式应用程序中,某些消息需要发送到单个接收方组件。 其他消息需要发送到多个目标。

想想当用户取消自行车订单时会发生什么情况。 取消订单不同于下初始订单。 当下订单时,工作流将等待,直到订单完成付款处理过程,然后再将订单发送到本地店面。 对于取消操作,你将同时通知店面和付款处理器。 这种方法可以尽量减少浪费派送司机的时间。

为了允许多个组件接收同一消息,你将使用 Azure 服务总线主题。 接下来,你将了解有关编写代码的过程和注意事项。

使用主题编码与使用队列编码

若要将每条发送的消息传送到所有订阅组件,请使用主题。 替换队列的一种方式是编写使用主题的代码。 你将使用相同的 Azure.Messaging.ServiceBus NuGet 包,配置连接字符串,并使用异步编程模式。

你还要使用相同的 ServiceBusClient 类和 ServiceBusSender 类来发送消息,并使用 ServiceBusProcessor 类来接收消息。

针对订阅设置筛选器

如果希望将发送到主题的特定消息传递到特定订阅,可在主题中的相应订阅上设置一个或多个筛选器。 例如,在自行车应用程序中,你的店面运行的是通用 Windows 平台 (UWP) 应用程序。 每个店面都可以订阅 OrderCancellation 主题并筛选自己的 StoreId。 你可以节省 Internet 带宽,因为不用向多个店铺发送不必要的消息。 同时,付款处理组件会订阅所有的 OrderCancellation 消息。

筛选器可以是以下三种类型之一:

  • 布尔筛选器:TrueFilter 确保发送到主题的所有消息都会传送到当前订阅。 FalseFilter 确保没有消息传送到当前订阅。 (这会有效地阻止或关闭订阅。)
  • SQL 筛选器:SQL 筛选器通过使用与 SQL 查询中的 WHERE 子句相同的语法来指定条件。 只有针对此筛选器求值时返回 True 的消息才会传送给订阅者。
  • 关联筛选器:关联筛选器包含一组与每个消息的属性进行匹配的条件。 如果筛选器中的属性与消息中的属性具有相同值,则视为匹配。

对于 StoreId 筛选器,可以使用 SQL 筛选器。 尽管 SQL 筛选器最为灵活,但它们的计算承恩也是最高的,并且该筛选器可能会降低服务总线的吞吐量。 在这种情况下,选择关联筛选器。

向主题发送消息

若要将消息发送到主题,需要完成以下步骤。

在任何发送或接收组件中,将以下 using 语句添加到调用服务总线主题的任何代码文件。

using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

若要发送消息,请先创建一个新的 ServiceBusClient 对象,并向其传递连接字符串和主题的名称。

await using var client = new ServiceBusClient(connectionString);

然后,通过对 ServiceBusClient 对象调用 CreateSender 方法并指定主题名称来创建 ServiceBusSender 对象。

ServiceBusSender sender = client.CreateSender(topicName);

可以通过调用 ServiceBusSender.SendMessageAsync() 方法并传递 ServiceBusMessage 将消息发送到主题。 像在队列中一样,消息必须采用 UTF-8 编码字符串的形式。

string message = "Cancel! I have changed my mind!";
var message = new ServiceBusMessage(message);

// Send the message to the topic.
await sender.SendMessageAsync(message);

从订阅接收消息

若要从订阅接收消息,必须创建一个 ServiceBusProcessor 对象,并向其传递主题名称和订阅名称。

processor = client.CreateProcessor(topicName, subscriptionName, options);

然后,注册一个消息处理程序和错误处理程序。

// Specify the handler method for messages.
processor.ProcessMessageAsync += MessageHandler;

// Specify the handler method for errors.
processor.ProcessErrorAsync += ErrorHandler;

在消息处理程序中执行处理工作,然后调用 ProcessMessageEventArgs.CompleteMessageAsync() 方法来从订阅中删除消息。

// Complete the message. The message is deleted from the subscription. 
await args.CompleteMessageAsync(args.Message);