您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

Azure 服务总线主题和订阅入门

本快速入门介绍如何使用 Azure.Messaging.ServiceBus .NET 库向服务总线主题发送消息和接收有关该主题订阅的消息。

在本快速入门中,你将执行以下步骤:

  1. 使用 Azure 门户创建服务总线命名空间。
  2. 使用 Azure 门户创建服务总线主题。
  3. 使用 Azure 门户创建该主题的服务总线订阅。
  4. 编写 .NET Core 控制台应用程序,向主题发送一组消息。
  5. 编写 .NET Core 控制台应用程序,从订阅接收这些消息。

备注

本快速入门分步说明将一批消息发送到某个服务总线主题并从该主题的订阅接收这些消息这一简单场景。 可以在 GitHub 上用于 .NET 的 Azure SDK 存储库中找到预生成的 Azure 服务总线的 .NET 示例。

先决条件

如果你是首次使用该服务,请在使用本快速入门之前先参阅服务总线概述

  • Azure 订阅。 若要使用 Azure 服务(包括 Azure 服务总线),需要一个订阅。 如果没有现有的 Azure 帐户,可以注册免费试用帐户,或者在创建帐户时使用 MSDN 订阅者权益。
  • Microsoft Visual Studio 2019。 Azure 服务总线客户端库利用 C# 8.0 中引入的新功能。 你仍可使用以前的 C# 语言版本的库,但新语法将不可用。 若要使用完整语法,建议使用 .NET Core SDK 3.0 或更高版本进行编译,并将语言版本设置为 latest。 如果使用 Visual Studio,Visual Studio 2019 以前的版本与生成 C# 8.0 项目时所需的工具将不兼容。 可在此处下载 Visual Studio 2019(包括免费的 Community Edition)。

在 Azure 门户中创建命名空间

若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于对应用程序中的 Service Bus 资源进行寻址的范围容器。

创建命名空间:

  1. 登录到 Azure 门户

  2. 在门户的左侧导航窗格中,依次选择“+ 创建资源”、“集成”、“服务总线” 。

    图像显示“创建资源”、“集成”以及菜单中的“服务总线”选择。

  3. 在“创建命名空间”页的“基本信息”标记中,执行以下步骤 :

    1. 对于“订阅”,请选择要在其中创建命名空间的 Azure 订阅。

    2. 对于“资源组”,请选择该命名空间驻留到的现有资源组,或创建一个新资源组。

    3. 输入 命名空间的名称。 系统会立即检查该名称是否可用。 若需用于对命名空间进行命名的规则的列表,请参阅创建命名空间 REST API

    4. 对于“位置”,请选择托管该命名空间的区域。

    5. 对于“定价层”,请选择命名空间的定价层(“基本”、“标准”或“高级”)。 对于本快速入门,请选择“标准”。

      若要使用主题和订阅,请选择“标准”或“高级”。 基本定价层不支持主题/订阅。

      如果选择了“高级”定价层,请指定“消息传送单元”数 。 高级层在 CPU 和内存级别提供资源隔离,使每个工作负荷在隔离的环境中运行。 此资源容器称为消息传送单元。 高级命名空间至少具有一个消息传送单元。 可为每个服务总线高级命名空间选择 1、2 或 4 个消息传送单元。 有关详细信息,请参阅服务总线高级消息传送

    6. 选择“查看 + 创建” 。 系统现已创建命名空间并已将其启用。 可能需要等待几分钟,因为系统将为帐户配置资源。

      图像显示“创建命名空间”页

    7. 在“查看 + 创建”页上,查看设置,然后选择“创建” 。

  4. 在“部署”页上选择“转到资源”。

    图像显示“部署成功”页,其中包括“转到资源”链接。

  5. 将会看到服务总线命名空间的主页。

    图像显示已创建的服务总线命名空间的主页。

获取连接字符串

创建新的命名空间会自动生成一个初始共享访问签名 (SAS) 策略,该策略包含主密钥和辅助密钥以及主要连接字符串和辅助连接字符串,每个连接字符串都授予对命名空间所有方面的完全控制权。 请参阅服务总线身份验证和授权,了解如何创建规则来对普通发送者和接收者的权限进行更多限制。

若要复制命名空间的主要连接字符串,请执行以下步骤:

  1. 在“服务总线命名空间”页中的左侧菜单上,选择“共享访问策略” 。

  2. 在“共享访问策略”页,选择“RootManageSharedAccessKey” 。

    屏幕截图显示“共享访问策略”窗口,其中突出显示了策略。

  3. 在“策略: RootManageSharedAccessKey”窗口中,单击“主连接字符串”旁边的复制按钮,将连接字符串复制到剪贴板供稍后使用 。 将此值粘贴到记事本或其他某个临时位置。

    屏幕截图显示了名为 RootManageSharedAccessKey 的 S A S 策略,其中包含密钥和连接字符串。

    可使用此页面复制主密钥、辅助密钥和辅助连接字符串。

使用 Azure 门户创建主题

  1. 在“服务总线命名空间”页面上,选择左侧菜单中的“主题”。

  2. 在工具栏中选择“+ 主题”。

  3. 输入主题名称。 将其他选项保留默认值。

  4. 选择“创建”。

    图像显示了“创建主题”页。

创建主题的订阅

  1. 选择在上一部分创建的主题。

    图像显示主题列表中选择的主题。

  2. 在“服务总线主题”页面上,选择工具栏上的“+ 订阅” 。

    图像显示“添加订阅”按钮。

  3. 在“创建订阅”页上执行以下步骤:

    1. 对于订阅名称,输入“S1” 。

    2. 对于“最大交付数”,输入“3” 。

    3. 然后,选择“创建”以创建订阅。

      图像显示了“创建订阅”页。

重要

记录到命名空间的连接字符串、主题名称和订阅名称。 本教程后面会用到它们。

将消息发送到主题

本部分介绍如何创建一个向服务总线主题发送消息的 .NET Core 控制台应用程序。

创建控制台应用程序

  1. 启动 Visual Studio 2019。
  2. 选择“创建新项目”。
  3. 在“创建新项目”对话框中执行以下步骤:如果看不到此对话框,请在菜单中选择“文件”,然后依次选择“新建”、“项目”。
    1. 选择“C#”作为编程语言。

    2. 选择“控制台”作为应用程序类型。

    3. 从结果列表中选择“控制台应用程序”。

    4. 然后,选择“下一步” 。

      显示使用 C# 和所选的控制台创建新项目对话框的图像

  4. 输入 TopicSender 作为项目名称,输入 ServiceBusTopicQuickStart 作为解决方案名称,然后选择“下一步”。
  5. 在“其他信息”页面,选择“创建”来创建解决方案和项目。

添加服务总线 NuGet 包

  1. 在菜单中选择“工具” > “NuGet 包管理器” > “包管理器控制台”。

  2. 运行以下命令安装 Azure.Messaging.ServiceBus NuGet 包:

    Install-Package Azure.Messaging.ServiceBus
    

添加将消息发送到主题的代码

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前。

    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
  2. Program 类中,在 Main 方法前声明以下属性。 将 <NAMESPACE CONNECTION STRING> 替换为服务总线命名空间的连接字符串。 此外,将 <TOPIC NAME> 替换为你的服务总线主题的名称。

    // connection string to your Service Bus namespace
    static string connectionString = "<NAMESPACE CONNECTION STRING>";
    
    // name of your Service Bus topic
    static string topicName = "<TOPIC NAME>";
    
    // the client that owns the connection and can be used to create senders and receivers
    static ServiceBusClient client;
    
    // the sender used to publish messages to the topic
    static ServiceBusSender sender;
    
    // number of messages to be sent to the topic
    private const int numOfMessages = 3;
    
  3. 将 Program.cs 中的代码替换为以下代码。 下面是代码中的重要步骤。

    1. 使用命名空间的连接字符串创建 ServiceBusClient 对象。
    2. ServiceBusClient 对象调用 CreateSender 方法,从而为特定的“服务总线”主题创建 ServiceBusSender 对象。
    3. 使用 ServiceBusSender.CreateMessageBatchAsync 创建 ServiceBusMessageBatch 对象。
    4. 使用 ServiceBusMessageBatch.TryAddMessage 将消息添加到该批次。
    5. 使用 ServiceBusSender.SendMessagesAsync 方法将批量消息发送到“服务总线”主题。
    static async Task Main()
    {
        // The Service Bus client types are safe to cache and use as a singleton for the lifetime
        // of the application, which is best practice when messages are being published or read
        // regularly.
        //
        // Create the clients that we'll use for sending and processing messages.
        client = new ServiceBusClient(connectionString);
        sender = client.CreateSender(topicName);
    
        // create a batch 
        using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
    
        for (int i = 1; i <= numOfMessages; i++)
        {
            // try adding a message to the batch
            if (!messageBatch.TryAddMessage(new ServiceBusMessage($"Message {i}")))
            {
                // if it is too large for the batch
                throw new Exception($"The message {i} is too large to fit in the batch.");
            }
        }
    
        try
        {
            // Use the producer client to send the batch of messages to the Service Bus topic
            await sender.SendMessagesAsync(messageBatch);
            Console.WriteLine($"A batch of {numOfMessages} messages has been published to the topic.");
        }
        finally
        {
            // Calling DisposeAsync on client types is required to ensure that network
            // resources and other unmanaged objects are properly cleaned up.
            await sender.DisposeAsync();
            await client.DisposeAsync();
        }
    
        Console.WriteLine("Press any key to end the application");
        Console.ReadKey();
    }
    
  4. Program.cs 文件的内容如下所示:

    有关更多信息,请参阅代码注释。

    using System;
    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
    namespace TopicSender
    {
        class Program
        {
            // connection string to your Service Bus namespace
            static string connectionString = "<NAMESPACE CONNECTION STRING>";
    
            // name of your Service Bus topic
            static string topicName = "<TOPIC NAME>";
    
            // the client that owns the connection and can be used to create senders and receivers
            static ServiceBusClient client;
    
            // the sender used to publish messages to the topic
            static ServiceBusSender sender;
    
            // number of messages to be sent to the topic
            private const int numOfMessages = 3;
    
            static async Task Main()
            {
                // The Service Bus client types are safe to cache and use as a singleton for the lifetime
                // of the application, which is best practice when messages are being published or read
                // regularly.
                //
                // Create the clients that we'll use for sending and processing messages.
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(topicName);
    
                // create a batch 
                using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
    
                for (int i = 1; i <= numOfMessages; i++)
                {
                    // try adding a message to the batch
                    if (!messageBatch.TryAddMessage(new ServiceBusMessage($"Message {i}")))
                    {
                        // if it is too large for the batch
                        throw new Exception($"The message {i} is too large to fit in the batch.");
                    }
                }
    
                try
                {
                    // Use the producer client to send the batch of messages to the Service Bus topic
                    await sender.SendMessagesAsync(messageBatch);
                    Console.WriteLine($"A batch of {numOfMessages} messages has been published to the topic.");
                }
                finally
                {
                    // Calling DisposeAsync on client types is required to ensure that network
                    // resources and other unmanaged objects are properly cleaned up.
                    await sender.DisposeAsync();
                    await client.DisposeAsync();
                }
    
                Console.WriteLine("Press any key to end the application");
                Console.ReadKey();
            }
        }
    }    
    
  5. <NAMESPACE CONNECTION STRING> 替换为服务总线命名空间的连接字符串。 此外,将 <TOPIC NAME> 替换为你的服务总线主题的名称。

  6. 生成项目并确保没有错误。

  7. 运行程序并等待出现确认消息。

    A batch of 3 messages has been published to the topic
    
  8. 在 Azure 门户中按照以下步骤操作:

    1. 导航到服务总线命名空间。

    2. 在“概述”页底部居中位置的窗格中,切换到“主题”选项卡,然后选择“服务总线”主题 。 下面的示例中采用的是 mytopic

      选择主题

    3. 在“服务总线主题”页的“消息”图表中的底部“指标”部分中,可以看到主题有三条传入的消息。 如果未看到该值,请等待几分钟,然后刷新页面以查看更新后的图表。

      发送到主题的消息

    4. 在底部窗格中选择订阅。 下面的示例中采用的是 S1。 在“服务总线订阅”页面上,你会看到“活动消息计数”为 3。 订阅已接收你发送到主题的三条消息,但接收方尚未选择它们。

      订阅中接收的消息

从订阅接收消息

在本节中,你将会创建一个 .NET Core 控制台应用程序,用于接收来自服务总线主题订阅的消息。

为接收器创建项目

  1. 在“解决方案资源管理器”窗口中,右键单击“ServiceBusTopicQuickStart”解决方案,指向“添加”,然后选择“新建项目”。
  2. 选择“控制台应用程序”,然后选择“下一步”。
  3. 输入 SubscriptionReceiver 作为“项目名称”,然后选择“下一步”。
  4. 在“其他信息”页上,选择“创建”。
  5. 在“解决方案资源管理器”窗口中,右键单击“SubscriptionReceiver”,然后选择“设为启动项目”。

添加服务总线 NuGet 包

  1. 在菜单中选择“工具” > “NuGet 包管理器” > “包管理器控制台”。

  2. 在“包管理器控制台”窗口中,确认“SubscriptionReceiver”已选定为“默认项目”。 如果不是,请使用下拉列表选择“SubscriptionReceiver”。

    显示在“包管理器控制台”窗口中选择“SubscriptionReceiver”项目的图像。

  3. 运行以下命令安装 Azure.Messaging.ServiceBus NuGet 包:

    Install-Package Azure.Messaging.ServiceBus
    

添加代码以从订阅接收消息

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前。

    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
  2. Program 类中,在 Main 方法前声明以下属性。 将占位符替换为正确的值:

    • <NAMESPACE CONNECTION STRING> 替换为服务总线命名空间的连接字符串
    • <TOPIC NAME> 替换为你的服务总线主题的名称
    • <SERVICE BUS - TOPIC SUBSCRIPTION NAME> 替换为主题的订阅名称。
    // connection string to your Service Bus namespace
    static string connectionString = "<NAMESPACE CONNECTION STRING>";
    
    // name of the Service Bus topic
    static string topicName = "<SERVICE BUS TOPIC NAME>";
    
    // name of the subscription to the topic
    static string subscriptionName = "<SERVICE BUS - TOPIC SUBSCRIPTION NAME>";
    
    // the client that owns the connection and can be used to create senders and receivers
    static ServiceBusClient client;
    
    // the processor that reads and processes messages from the subscription
    static ServiceBusProcessor processor;    
    
  3. 将以下方法添加到 Program 类中以处理接收到的消息和任何错误。

    // handle received messages
    static async Task MessageHandler(ProcessMessageEventArgs args)
    {
        string body = args.Message.Body.ToString();
        Console.WriteLine($"Received: {body} from subscription: {subscriptionName}");
    
        // complete the message. messages is deleted from the subscription. 
        await args.CompleteMessageAsync(args.Message);
    }
    
    // handle any errors when receiving messages
    static Task ErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine(args.Exception.ToString());
        return Task.CompletedTask;
    }
    
  4. 将 Program.cs 中的代码替换为以下代码。 下面是代码中的重要步骤:

    1. 使用命名空间的连接字符串创建 ServiceBusClient 对象。
    2. ServiceBusClient 对象调用 CreateProcessor 方法,从而为指定的“服务总线”队列创建 ServiceBusProcessor 对象。
    3. ServiceBusProcessor 对象的 ProcessMessageAsyncProcessErrorAsync 事件指定处理程序。
    4. 通过对 ServiceBusProcessor 对象调用 StartProcessingAsync 以开始处理消息。
    5. 当用户按下某个键结束处理时,将对 ServiceBusProcessor 对象调用 StopProcessingAsync

    有关更多信息,请参阅代码注释。

    static async Task Main()
    {
        // The Service Bus client types are safe to cache and use as a singleton for the lifetime
        // of the application, which is best practice when messages are being published or read
        // regularly.
        //
        // Create the clients that we'll use for sending and processing messages.
        client = new ServiceBusClient(connectionString);
    
        // create a processor that we can use to process the messages
        processor = client.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
    
        try
        {
            // add handler to process messages
            processor.ProcessMessageAsync += MessageHandler;
    
            // add handler to process any errors
            processor.ProcessErrorAsync += ErrorHandler;
    
            // start processing 
            await processor.StartProcessingAsync();
    
            Console.WriteLine("Wait for a minute and then press any key to end the processing");
            Console.ReadKey();
    
            // stop processing 
            Console.WriteLine("\nStopping the receiver...");
            await processor.StopProcessingAsync();
            Console.WriteLine("Stopped receiving messages");
        }
        finally
        {
            // Calling DisposeAsync on client types is required to ensure that network
            // resources and other unmanaged objects are properly cleaned up.
            await processor.DisposeAsync();
            await client.DisposeAsync();
        }
    }    
    
  5. Program.cs 应该如下所示:

    using System;
    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
    namespace SubscriptionReceiver
    {
        class Program
        {
            // connection string to your Service Bus namespace
            static string connectionString = "<NAMESPACE CONNECTION STRING>";
    
            // name of the Service Bus topic
            static string topicName = "<SERVICE BUS TOPIC NAME>";
    
            // name of the subscription to the topic
            static string subscriptionName = "<SERVICE BUS - TOPIC SUBSCRIPTION NAME>";
    
            // the client that owns the connection and can be used to create senders and receivers
            static ServiceBusClient client;
    
            // the processor that reads and processes messages from the subscription
            static ServiceBusProcessor processor;
    
            // handle received messages
            static async Task MessageHandler(ProcessMessageEventArgs args)
            {
                string body = args.Message.Body.ToString();
                Console.WriteLine($"Received: {body} from subscription: {subscriptionName}");
    
                // complete the message. messages is deleted from the subscription. 
                await args.CompleteMessageAsync(args.Message);
            }
    
            // handle any errors when receiving messages
            static Task ErrorHandler(ProcessErrorEventArgs args)
            {
                Console.WriteLine(args.Exception.ToString());
                return Task.CompletedTask;
            }
    
            static async Task Main()
            {
                // The Service Bus client types are safe to cache and use as a singleton for the lifetime
                // of the application, which is best practice when messages are being published or read
                // regularly.
                //
                // Create the clients that we'll use for sending and processing messages.
                client = new ServiceBusClient(connectionString);
    
                // create a processor that we can use to process the messages
                processor = client.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
    
                try
                {
                    // add handler to process messages
                    processor.ProcessMessageAsync += MessageHandler;
    
                    // add handler to process any errors
                    processor.ProcessErrorAsync += ErrorHandler;
    
                    // start processing 
                    await processor.StartProcessingAsync();
    
                    Console.WriteLine("Wait for a minute and then press any key to end the processing");
                    Console.ReadKey();
    
                    // stop processing 
                    Console.WriteLine("\nStopping the receiver...");
                    await processor.StopProcessingAsync();
                    Console.WriteLine("Stopped receiving messages");
                }
                finally
                {
                    // Calling DisposeAsync on client types is required to ensure that network
                    // resources and other unmanaged objects are properly cleaned up.
                    await processor.DisposeAsync();
                    await client.DisposeAsync();
                }
            }
        }
    }
    
  6. 将占位符替换为正确的值:

    • <NAMESPACE CONNECTION STRING> 替换为服务总线命名空间的连接字符串
    • <TOPIC NAME> 替换为你的服务总线主题的名称
    • <SERVICE BUS - TOPIC SUBSCRIPTION NAME> 替换为主题的订阅名称。
  7. 生成项目并确保没有错误。

  8. 运行接收器应用程序。 你应该会看到接收的消息。 按任意键来停止使用接收器和应用程序。

    Wait for a minute and then press any key to end the processing
    Received: Message 1 from subscription: S1
    Received: Message 2 from subscription: S1
    Received: Message 3 from subscription: S1
    
    Stopping the receiver...
    Stopped receiving messages
    
  9. 再次检查门户。

    • 在“服务总线主题”页上的“消息”图表中,你会看到三条传入消息和三条传出消息。 如果未看到这些值,请等待几分钟,然后刷新页面以查看更新后的图表。

      发送和接收的消息

    • 在“服务总线订阅”页面上,你会看到“活动消息计数”为零 。 这是因为接收方已接收并已完成来自此订阅的消息。

      末尾订阅中的活动消息计数

后续步骤

请参阅以下文档和示例: