你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure 服务总线适用于 .NET 的客户端库 - 版本 7.16.2

Azure 服务总线允许你构建利用异步消息传送模式的应用程序,这些模式使用高度可靠的服务在生成者和使用者之间中转消息。 Azure 服务总线在客户端和服务器之间提供灵活的中转消息传送,以及结构化的先入先出 (FIFO) 消息传递,以及具有复杂路由的发布/订阅功能。 如果想要了解有关Azure 服务总线的详细信息,请查看:什么是Azure 服务总线?

使用用于Azure 服务总线的客户端库可以:

  • 传输业务数据:利用消息传送持久交换信息,例如销售或采购订单、日记帐或库存移动。

  • 分离应用程序:提高应用程序和服务的可靠性和可伸缩性,缓解发送方和接收方同时联机的需求。

  • 控制消息的处理方式:支持使用队列的消息的传统竞争使用者,或者允许每个使用者使用主题和订阅自己的消息实例。

  • 实现复杂的工作流:消息会话支持需要消息排序或消息延迟的方案。

源代码 | 包 (NuGet) | API 参考文档 | 产品文档 | 迁移指南 (Microsoft.Azure.ServiceBus) | WindowsAzure.ServiceBus) ( | 迁移指南故障排除指南

入门

先决条件

  • Microsoft Azure 订阅:若要使用 Azure 服务(包括Azure 服务总线),需要订阅。 如果没有现有的 Azure 帐户,可以在 创建帐户时注册免费试用版或使用 MSDN 订阅者权益。

  • 服务总线命名空间:若要与Azure 服务总线交互,还需要有一个可用的命名空间。 如果不熟悉如何创建 Azure 资源,建议遵循使用 Azure 门户创建服务总线命名空间的分步指南。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建服务总线实体的详细说明。

  • C# 8.0:Azure 服务总线 客户端库使用 C# 8.0 中引入的新功能。 为了利用 C# 8.0 语法,建议使用语言版本的latest.NET Core SDK 3.0 或更高版本进行编译。

    希望充分利用 C# 8.0 语法的 Visual Studio 用户需要使用 Visual Studio 2019 或更高版本。 可在此处下载 Visual Studio 2019(包括免费的 Community Edition)。 Visual Studio 2017 的用户可以通过使用 Microsoft.Net.Compilers NuGet 包 和设置语言版本来利用 C# 8 语法,尽管编辑体验可能并不理想。

    你仍然可以将库与以前的 C# 语言版本一起使用,但需要手动管理异步可枚举和异步可释放成员,而不是受益于新语法。 仍可能面向 .NET Core SDK 支持的任何框架版本,包括早期版本的 .NET Core 或 .NET Framework。 有关详细信息,请参阅: 如何指定目标框架

    重要说明: 为了在不修改的情况下生成或运行 示例示例 ,必须使用 C# 8.0。 如果决定针对其他语言版本调整示例,仍可以运行这些示例。

若要在 Azure 中快速创建所需的服务总线资源并接收其连接字符串,可以通过单击以下操作部署示例模板:

部署到 Azure

安装包

使用 NuGet 安装适用于 .NET 的 Azure 服务总线 客户端库:

dotnet add package Azure.Messaging.ServiceBus

验证客户端

若要使服务总线客户端库与队列或主题交互,需要了解如何与其连接和授权。 执行此操作的最简单方法是使用连接字符串,该连接字符串是在创建服务总线命名空间时自动创建的。 如果不熟悉 Azure 中的共享访问策略,建议按照分步指南获取服务总线连接字符串

拥有连接字符串后,可以使用它对客户端进行身份验证。

// Create a ServiceBusClient that will authenticate using a connection string
string connectionString = "<connection_string>";
await using var client = new ServiceBusClient(connectionString);

若要查看如何使用 Azure.Identity 进行身份验证,请查看此示例。

有关如何对 ASP.NET Core应用程序进行身份验证的示例,请查看此示例。

若要查看如何启动与自定义终结点的连接,请查看 此示例

关键概念

初始化 ServiceBusClient后,可以与服务总线命名空间中的主资源类型进行交互,其中可以存在多个资源类型,并且实际消息传输发生在哪个上,命名空间通常充当应用程序容器:

  • 队列:允许发送和接收消息。 通常用于点到点通信。

  • 主题:与队列不同,主题更适合发布/订阅方案。 主题可以发送到 ,但需要订阅,其中可以有多个并行使用。

  • 订阅:要从主题使用的机制。 每个订阅都是独立的,并接收发送到主题的每条消息的副本。 规则和筛选器可用于定制特定订阅接收的消息。

有关这些资源的详细信息,请参阅什么是Azure 服务总线?

若要与这些资源交互,应熟悉以下 SDK 概念:

  • 服务总线客户端是开发人员与服务总线客户端库交互的主要接口。 它充当与库进行所有交互的网关。

  • 服务总线发送方的范围限定为特定队列或主题,并使用服务总线客户端创建。 发送方允许将消息发送到队列或主题。 它还允许将消息安排在指定日期进行传递。

  • 服务总线接收器的范围限定为特定的队列或订阅,并使用服务总线客户端创建。 接收方允许你从队列或订阅接收消息。 它还允许在收到消息后解决它们。 有四种方法可以解决消息:

    • 完成 - 导致消息从队列或主题中删除。
    • 放弃 - 释放接收方对消息的锁定,允许其他接收方接收消息。
    • 延迟 - 通过正常方式延迟接收消息。 若要接收延迟的消息,需要保留消息的序列号。
    • DeadLetter - 将消息移动到死信队列。 这将阻止再次接收消息。 若要从死信队列接收消息,需要一个作用域为死信队列的接收方。
  • 服务总线会话接收器的范围限定为启用会话的特定队列或订阅,并使用服务总线客户端创建。 会话接收器几乎与标准接收器相同,不同之处在于,公开的会话管理操作仅适用于启用会话的实体。 这些操作包括获取和设置会话状态,以及续订会话锁。

  • 服务总线处理器的范围限定为特定的队列或订阅,并使用服务总线客户端创建。 ServiceBusProcessor可以将 视为围绕一组接收器的抽象。 它使用回调模型允许在收到消息和发生异常时指定代码。 它提供已处理消息的自动完成、自动消息锁续订和用户指定的事件处理程序的并发执行。 由于其功能集,它应该是用于编写从服务总线实体接收的应用程序的工具。 对于处理器无法提供直接使用 ServiceBusReceiver 时所期望的精细控制的复杂情况,建议使用 ServiceBusReceiver。

  • 服务总线会话处理器的范围限定为启用会话的特定队列或订阅,并使用服务总线客户端创建。 会话处理器几乎与标准处理器相同,不同之处在于,公开的会话管理操作仅适用于启用会话的实体。

有关更多概念和更深入的讨论,请参阅: 服务总线高级功能

客户端生存期

ServiceBusClient、发送方、接收方和处理器可以安全地缓存,并在应用程序的生存期内用作单一实例,这是定期发送或接收消息时的最佳做法。 它们负责高效管理网络、CPU 和内存使用,努力在处于非活动状态期间保持低使用率。

这些类型是可释放的,需要调用 DisposeAsyncCloseAsync 来确保正确清理网络资源和其他非托管对象。 请务必注意,当释放实例时 ServiceBusClient ,它将自动关闭并清理使用它创建的任何发送方、接收方和处理器。

线程安全

我们保证所有客户端实例方法都是线程安全的,并且彼此独立 (准则) 。 这可确保重用客户端实例的建议始终是安全的,即使在线程之间也是如此。

其他概念

客户端选项 | 诊断 | 嘲笑

示例

发送和接收消息

消息发送是使用 ServiceBusSender执行的。 接收是使用 ServiceBusReceiver执行的。

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send. UTF-8 encoding is used when providing a string.
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver that we can use to receive the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// get the message body as a string
string body = receivedMessage.Body.ToString();
Console.WriteLine(body);

发送一批消息

可通过两种方法一次发送多个消息。 执行此操作的第一种方法使用安全批处理。 使用安全批处理,可以创建一个 ServiceBusMessageBatch 对象,这样就可以尝试使用 TryAdd 方法一次向批处理添加一条消息。 如果消息无法容纳在批处理中, TryAdd 将返回 false。

// add the messages that we plan to send to a local queue
Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();
messages.Enqueue(new ServiceBusMessage("First message"));
messages.Enqueue(new ServiceBusMessage("Second message"));
messages.Enqueue(new ServiceBusMessage("Third message"));

// create a message batch that we can send
// total number of messages to be sent to the Service Bus queue
int messageCount = messages.Count;

// while all messages are not sent to the Service Bus queue
while (messages.Count > 0)
{
    // start a new batch
    using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

    // add the first message to the batch
    if (messageBatch.TryAddMessage(messages.Peek()))
    {
        // dequeue the message from the .NET queue once the message is added to the batch
        messages.Dequeue();
    }
    else
    {
        // if the first message can't fit, then it is too large for the batch
        throw new Exception($"Message {messageCount - messages.Count} is too large and cannot be sent.");
    }

    // add as many messages as possible to the current batch
    while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))
    {
        // dequeue the message from the .NET queue as it has been added to the batch
        messages.Dequeue();
    }

    // now, send the batch
    await sender.SendMessagesAsync(messageBatch);

    // if there are any remaining messages in the .NET queue, the while loop repeats
}

第二种方法使用 SendMessagesAsync 接受 IEnumerable 的 ServiceBusMessage重载。 使用此方法,我们将尝试将所提供的所有消息放入要发送到服务的单个消息批中。 如果消息太大,无法容纳在单个批处理中,则操作将引发异常。

IList<ServiceBusMessage> messages = new List<ServiceBusMessage>();
messages.Add(new ServiceBusMessage("First"));
messages.Add(new ServiceBusMessage("Second"));
// send the messages
await sender.SendMessagesAsync(messages);

接收一批消息

// create a receiver that we can use to receive the messages
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
// a batch of messages (maximum of 2 in this case) are received
IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await receiver.ReceiveMessagesAsync(maxMessages: 2);

// go through each of the messages received
foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
{
    // get the message body as a string
    string body = receivedMessage.Body.ToString();
}

完成消息

为了从队列或订阅中删除消息,我们可以调用 CompleteMessageAsync 方法。

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver that we can use to receive and settle the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// complete the message, thereby deleting it from the service
await receiver.CompleteMessageAsync(receivedMessage);

放弃邮件

放弃消息会释放接收方的锁,从而允许此接收方或其他接收方接收消息。

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// abandon the message, thereby releasing the lock and allowing it to be received again by this or other receivers
await receiver.AbandonMessageAsync(receivedMessage);

延迟消息

延迟消息将阻止使用 ReceiveMessageAsyncReceiveMessagesAsync 方法再次接收消息。 相反,有单独的方法, ReceiveDeferredMessageAsync 用于 ReceiveDeferredMessagesAsync 接收延迟的消息。

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// defer the message, thereby preventing the message from being received again without using
// the received deferred message API.
await receiver.DeferMessageAsync(receivedMessage);

// receive the deferred message by specifying the service set sequence number of the original
// received message
ServiceBusReceivedMessage deferredMessage = await receiver.ReceiveDeferredMessageAsync(receivedMessage.SequenceNumber);

死信消息

消息的死信类似于延迟,有一个main区别是消息在收到一定次数后,服务会自动死信。 应用程序可以根据要求选择手动死信消息。 当消息为死信时,它实际上会移动到原始队列的子队列中。 请注意,ServiceBusReceiver无论main队列是否已启用会话,都使用 从死信子队列接收消息。

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// Dead-letter the message, thereby preventing the message from being received again without receiving from the dead letter queue.
// We can optionally pass a dead letter reason and dead letter description to further describe the reason for dead-lettering the message.
await receiver.DeadLetterMessageAsync(receivedMessage, "sample reason", "sample description");

// receive the dead lettered message with receiver scoped to the dead letter queue.
ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
{
    SubQueue = SubQueue.DeadLetter
});
ServiceBusReceivedMessage dlqMessage = await dlqReceiver.ReceiveMessageAsync();

// The reason and the description that we specified when dead-lettering the message will be available in the received dead letter message.
string reason = dlqMessage.DeadLetterReason;
string description = dlqMessage.DeadLetterErrorDescription;

有关详细信息,请参阅 ServiceBus 死信队列概述

使用处理器

ServiceBusProcessor可以被视为围绕一组接收器的抽象。 它使用回调模型允许在收到消息和发生异常时指定代码。 它提供已处理消息的自动完成、自动消息锁续订和用户指定的事件处理程序的并发执行。 由于其功能集,它应该是用于编写从服务总线实体接收的应用程序的转到工具。 对于处理器无法在直接使用 ServiceBusReceiver 时预期的精细控制的复杂情况,建议使用 ServiceBusReceiver。

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a set of messages that we can send
ServiceBusMessage[] messages = new ServiceBusMessage[]
{
    new ServiceBusMessage("First"),
    new ServiceBusMessage("Second")
};

// send the message batch
await sender.SendMessagesAsync(messages);

// create the options to use for configuring the processor
var options = new ServiceBusProcessorOptions
{
    // By default or when AutoCompleteMessages is set to true, the processor will complete the message after executing the message handler
    // Set AutoCompleteMessages to false to [settle messages](/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock) on your own.
    // In both cases, if the message handler throws an exception without settling the message, the processor will abandon the message.
    AutoCompleteMessages = false,

    // I can also allow for multi-threading
    MaxConcurrentCalls = 2
};

// create a processor that we can use to process the messages
await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options);

// configure the message and error handler to use
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    Console.WriteLine(body);

    // we can evaluate application logic and use that to determine how to settle the message.
    await args.CompleteMessageAsync(args.Message);
}

Task ErrorHandler(ProcessErrorEventArgs args)
{
    // the error source tells me at what point in the processing an error occurred
    Console.WriteLine(args.ErrorSource);
    // the fully qualified namespace is available
    Console.WriteLine(args.FullyQualifiedNamespace);
    // as well as the entity path
    Console.WriteLine(args.EntityPath);
    Console.WriteLine(args.Exception.ToString());
    return Task.CompletedTask;
}

// start processing
await processor.StartProcessingAsync();

// since the processing happens in the background, we add a Console.ReadKey to allow the processing to continue until a key is pressed.
Console.ReadKey();

使用 Azure.Identity 进行身份验证

Azure 标识库为身份验证提供简单的 Azure Active Directory 支持。

// Create a ServiceBusClient that will authenticate through Active Directory
string fullyQualifiedNamespace = "yournamespace.servicebus.windows.net";
await using var client = new ServiceBusClient(fullyQualifiedNamespace, new DefaultAzureCredential());

使用会话

会话 提供了一种用于对相关消息进行分组的机制。 若要使用会话,需要使用已启用会话的实体。

注册 ASP.NET Core依赖项注入

若要在 ASP.NET Core 应用中作为依赖项进行注入ServiceBusClient,请安装适用于 ASP.NET Core 包的 Azure 客户端库集成。

dotnet add package Microsoft.Extensions.Azure

然后注册配置了服务的客户端。 对于 ASP.NET Core应用程序,这通常直接在 或 StartupConfigureServices 方法中Program.cs出现:

public void ConfigureServices(IServiceCollection services)
{
    services.AddAzureClients(builder =>
    {
        builder.AddServiceBusClient("<< SERVICE BUS CONNECTION STRING >>");
    });

    // Register other services, controllers, and other infrastructure.
}

对于喜欢为其客户端使用共享 Azure.Identity 凭据的应用程序,注册看起来略有不同:

public void ConfigureServices(IServiceCollection services)
 {
     services.AddAzureClients(builder =>
     {
         // This will register the ServiceBusClient using an Azure Identity credential.
         builder.AddServiceBusClientWithNamespace("<< YOUR NAMESPACE >>.servicebus.windows.net");

         // By default, DefaultAzureCredential is used, which is likely desired for most
         // scenarios. If you need to restrict to a specific credential instance, you could
         // register that instance as the default credential instead.
         builder.UseCredential(new ManagedIdentityCredential());
     });

     // Register other services, controllers, and other infrastructure.
 }

还可以使用已注册ServiceBusClient的实例向 DI 注册子客户端,例如 ServiceBusSenderServiceBusReceiver 。 例如,为属于命名空间的每个队列注册发送方:

public async Task ConfigureServicesAsync(IServiceCollection services)
{
    // Query the available queues for the Service Bus namespace.
    var adminClient = new ServiceBusAdministrationClient("<< SERVICE BUS CONNECTION STRING >>");
    var queueNames = new List<string>();

    // Because the result is async, they need to be captured to a standard list to avoid async
    // calls when registering.  Failure to do so results in an error with the services collection.
    await foreach (var queue in adminClient.GetQueuesAsync())
    {
        queueNames.Add(queue.Name);
    }

    // After registering the ServiceBusClient, register a named factory for each
    // queue.  This allows them to be lazily created and managed as singleton instances.

    services.AddAzureClients(builder =>
    {
        builder.AddServiceBusClient("<< SERVICE BUS CONNECTION STRING >>");

        foreach (var queueName in queueNames)
        {
            builder.AddClient<ServiceBusSender, ServiceBusClientOptions>((_, _, provider) =>
                provider
                    .GetService<ServiceBusClient>()
                    .CreateSender(queueName)
            )
            .WithName(queueName);
        }
    });

    // Register other services, controllers, and other infrastructure.
}

由于发送方为其关联的队列命名,因此注入时,不会直接绑定到它们。 相反,你将绑定到可用于检索命名发件人的工厂:

public class ServiceBusSendingController : ControllerBase
{
    private readonly ServiceBusSender _sender;

    public ServiceBusSendingController(IAzureClientFactory<ServiceBusSender> serviceBusSenderFactory)
    {
        // Though the method is called "CreateClient", the factory will manage the sender as a
        // singleton, creating a new instance only on the first use.
        _sender = serviceBusSenderFactory.CreateClient("<< QUEUE NAME >>");
    }
}

有关更多详细信息和示例,请参阅 使用适用于 .NET 的 Azure SDK 进行依赖项注入

疑难解答

请参阅 服务总线故障排除指南

后续步骤

除了讨论的介绍性方案之外,Azure 服务总线客户端库还支持其他方案,以帮助利用Azure 服务总线服务的完整功能集。 为了帮助探索其中一些方案,服务总线客户端库提供了一个示例项目,作为常见方案的插图。 有关详细信息,请参阅 示例自述文件

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。

有关详细信息,请参阅我们的 贡献指南

曝光数