клиентская библиотека Служебная шина Azure для .NET версии 7.13.1

Служебная шина Azure позволяет создавать приложения, использующие асинхронные шаблоны обмена сообщениями, используя высоконадежную службу для обмена сообщениями между производителями и потребителями. Служебная шина Azure обеспечивает гибкий обмен сообщениями между клиентом и сервером через посредника, а также структурированный обмен сообщениями по первому входу, первому выходу (FIFO), а также возможности публикации и подписки со сложной маршрутизацией. Если вы хотите узнать больше о Служебная шина Azure, вы можете ознакомиться с разделом Что такое Служебная шина Azure?

Используйте клиентную библиотеку для Служебная шина Azure, чтобы:

  • Передача бизнес-данных: используйте обмен сообщениями для долгосрочного обмена информацией, такой как заказы на продажу или покупку, журналы или перемещение запасов.

  • Разделение приложений: повышение надежности и масштабируемости приложений и служб, избавляя отправителей и получателей от необходимости одновременного подключения к сети.

  • Управление обработкой сообщений: поддержка традиционных конкурирующих потребителей для сообщений с помощью очередей или предоставление каждому потребителю собственного экземпляра сообщения с помощью разделов и подписок.

  • Реализация сложных рабочих процессов: сеансы сообщений поддерживают сценарии, требующие упорядочивания сообщений или отсрочки сообщений.

Исходный код | Пакет (NuGet) | Справочная документация по | API Документация по продукту | Руководство по | миграции Руководство по устранению неполадок

Начало работы

Предварительные требования

  • Подписка Microsoft Azure: Чтобы использовать службы Azure, включая Служебная шина Azure, вам потребуется подписка. Если у вас нет учетной записи Azure, вы можете зарегистрироваться для получения бесплатной пробной версии или использовать преимущества подписчика MSDN при создании учетной записи.

  • Пространство имен служебной шины: Для взаимодействия с Служебная шина Azure вам также потребуется доступное пространство имен. Если вы не знакомы с созданием ресурсов Azure, вам может потребоваться выполнить пошаговые инструкции по созданию пространства имен служебной шины с помощью портал Azure. Здесь также можно найти подробные инструкции по использованию Azure CLI, Azure PowerShell или шаблонов Azure Resource Manager (ARM) для создания сущности служебной шины.

  • C# 8.0: Клиентская библиотека Служебная шина Azure использует новые функции, появившиеся в C# 8.0. Чтобы воспользоваться преимуществами синтаксиса C# 8.0, рекомендуется выполнить компиляцию с помощью пакета SDK для .NET Core 3.0 или более поздней версии с языковой версиейlatest.

    Пользователям Visual Studio, желающим использовать все преимущества синтаксиса C# 8.0, потребуется использовать Visual Studio 2019 или более поздней версии. Скачать Visual Studio 2019, в том числе бесплатный выпуск Community, можно здесь. Пользователи Visual Studio 2017 могут воспользоваться синтаксисом C# 8, используя пакет NuGet Microsoft.Net.Compilers и задав языковую версию, хотя возможности редактирования могут быть не идеальными.

    Вы по-прежнему можете использовать библиотеку с предыдущими версиями языка C#, но потребуется управлять асинхронными перечисляемыми и асинхронными удаляемыми элементами вручную, а не использовать преимущества нового синтаксиса. Вы по-прежнему можете использовать любую версию платформы, поддерживаемую пакетом SDK для .NET Core, включая более ранние версии .NET Core или .NET Framework. Дополнительные сведения см. в статье Как указать целевые платформы.

    Важное примечание. Чтобы создать или запустить примеры и примеры без изменений, использование C# 8.0 является обязательным. Вы по-прежнему можете запускать примеры, если решите настроить их для других языковых версий.

Чтобы быстро создать необходимые ресурсы служебной шины в Azure и получить для них строку подключения, можно развернуть наш пример шаблона, щелкнув:

Развертывание в Azure

Установка пакета

Установите клиентную библиотеку Служебная шина Azure для .NET с помощью NuGet:

dotnet add package Azure.Messaging.ServiceBus

Аутентификация клиента

Чтобы клиентская библиотека служебной шины взаимодействовала с очередью или разделом, ей необходимо понимать, как подключиться к ней и выполнить авторизацию. Самый простой способ сделать это — использовать строку подключения, которая создается автоматически при создании пространства имен служебной шины. Если вы не знакомы с политиками общего доступа в Azure, вам может потребоваться получить строку подключения служебной шины из этого пошагового руководства.

Получив строку подключения, вы можете проверить подлинность клиента с помощью нее.

// Create a ServiceBusClient that will authenticate using a connection string
string connectionString = "<connection_string>";
await using var client = new ServiceBusClient(connectionString);

Чтобы узнать, как выполнить проверку подлинности с помощью Azure.Identity, ознакомьтесь с этим примером.

Чтобы узнать, как инициировать подключение к пользовательской конечной точке, просмотрите этот пример.

ASP.NET Core

Чтобы внедрить ServiceBusClient как зависимость в приложение ASP.NET Core, установите интеграцию клиентской библиотеки Azure для пакета ASP.NET Core.

dotnet add package Microsoft.Extensions.Azure

Затем зарегистрируйте клиент в методе Startup.ConfigureServices :

public void ConfigureServices(IServiceCollection services)
{
    services.AddAzureClients(builder =>
    {
        builder.AddServiceBusClient(Configuration.GetConnectionString("ServiceBus"));
    });
  
    services.AddControllers();
}

Чтобы использовать приведенный выше код, добавьте его в конфигурацию:

{
  "ConnectionStrings": {
    "ServiceBus": "<connection_string>"
  }
}

Дополнительные сведения см. в статье Внедрение зависимостей с помощью пакета Azure SDK для .NET.

Основные понятия

После инициализации можно взаимодействовать с основными типами ресурсов в пространстве имен служебной ServiceBusClientшины, из которых может существовать несколько и где происходит фактическая передача сообщений, пространство имен часто выступает в качестве контейнера приложения:

  • Очередь: позволяет отправлять и получать сообщения. Часто используется для обмена данными типа "точка — точка".

  • Тема. В отличие от очередей, темы лучше подходят для сценариев публикации и подписки. Раздел можно отправить, но для использования из требуется подписка, из которой может быть несколько параллельных.

  • Подписка: механизм использования из раздела. Каждая подписка является независимой и получает копию каждого сообщения, отправленного в раздел. Правила и фильтры можно использовать для настройки сообщений, получаемых определенной подпиской.

Дополнительные сведения об этих ресурсах см. в статье Что такое Служебная шина Azure?.

Для взаимодействия с этими ресурсами необходимо ознакомиться со следующими понятиями пакета SDK:

  • Клиент служебной шины — это основной интерфейс для разработчиков, взаимодействующих с клиентской библиотекой служебной шины. Он служит шлюзом, из которого будет происходить все взаимодействие с библиотекой.

  • Отправитель служебной шины ограничен определенной очередью или разделом и создается с помощью клиента служебной шины. Отправитель позволяет отправлять сообщения в очередь или раздел. Он также позволяет планировать доставку сообщений на указанную дату.

  • Получатель служебной шины ограничен определенной очередью или подпиской и создается с помощью клиента служебной шины. Получатель позволяет получать сообщения из очереди или подписки. Он также позволяет урегулировать сообщения после их получения. Существует четыре способа урегулировать сообщения:

    • Complete — приводит к удалению сообщения из очереди или раздела.
    • Отказаться — снимает блокировку получателя сообщения, позволяя получать сообщение другими получателями.
    • Отложить — откладывает получение сообщения обычными средствами. Чтобы получать отложенные сообщения, необходимо сохранить порядковый номер сообщения.
    • DeadLetter — перемещает сообщение в очередь недоставленных сообщений. Это предотвратит повторное получение сообщения. Для получения сообщений из очереди недоставленных сообщений требуется получатель, ограниченный очередью недоставленных сообщений.
  • Приемник сеанса служебной шины ограничен определенной очередью или подпиской с поддержкой сеанса и создается с помощью клиента служебной шины. Приемник сеанса почти идентичен стандартному приемнику, с отличием от того, что доступны операции управления сеансами, которые применяются только к сущностям с поддержкой сеанса. Эти операции включают получение и настройку состояния сеанса, а также продление блокировок сеанса.

  • Обработчик служебной шины ограничен определенной очередью или подпиской и создается с помощью клиента служебной шины. Можно ServiceBusProcessor рассматривать как абстракцию вокруг набора приемников. Она использует модель обратного вызова, чтобы разрешить указывать код при получении сообщения и при возникновении исключения. Он обеспечивает автоматическое завершение обработанных сообщений, автоматическое продление блокировки сообщений и параллельное выполнение обработчиков событий, указанных пользователем. Из-за набора функций он должен быть средством перехода к написанию приложений, которые получают от сущностей служебной шины. ServiceBusReceiver рекомендуется использовать для более сложных сценариев, в которых процессор не может обеспечить точное управление, которое можно ожидать при непосредственном использовании ServiceBusReceiver.

  • Обработчик сеансов служебной шины ограничен определенной очередью или подпиской с поддержкой сеанса и создается с помощью клиента служебной шины. Обработчик сеансов почти идентичен стандартному процессору, с отличием от того, что доступны операции управления сеансами, которые применяются только к сущностям с поддержкой сеанса.

Дополнительные понятия и более подробное обсуждение см. в статье Дополнительные функции служебной шины.

Время существования клиента

Отправители ServiceBusClient, получатели и обработчики безопасно кэшируются и используются в качестве отдельного элемента в течение всего времени существования приложения, что рекомендуется при регулярной отправке или получении сообщений. Они отвечают за эффективное управление сетью, ЦП и использованием памяти, работая для поддержания низкого уровня использования в периоды бездействия.

Эти типы являются удаляемыми и требуют DisposeAsync вызова или CloseAsync , чтобы обеспечить правильную очистку сетевых ресурсов и других неуправляемых объектов. Важно отметить, что при ServiceBusClient удалении экземпляра он автоматически закрывает и очищает всех отправителей, получателей и процессоров, созданных с его помощью.

Потокобезопасность

Мы гарантируем, что все методы экземпляра клиента являются потокобезопасны и независимы друг от друга (руководство). Это гарантирует, что рекомендация по повторному использованию экземпляров клиента всегда будет безопасной, даже в разных потоках.

Дополнительные понятия

Параметры | клиента Диагностики | Насмешливый

Примеры

Отправка и получение сообщения

Отправка сообщения выполняется с помощью ServiceBusSender. Получение выполняется с помощью ServiceBusReceiver.

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send. UTF-8 encoding is used when providing a string.
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver that we can use to receive the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// get the message body as a string
string body = receivedMessage.Body.ToString();
Console.WriteLine(body);

Отправка пакета сообщений

Существует два способа отправки нескольких сообщений одновременно. Первый способ этого — безопасная пакетная обработка. С помощью безопасной ServiceBusMessageBatch пакетной обработки можно создать объект , который позволит пытаться добавлять сообщения по одному в пакет с помощью TryAdd метода . Если сообщение не может поместиться в пакет, TryAdd возвращает значение false.

// add the messages that we plan to send to a local queue
Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();
messages.Enqueue(new ServiceBusMessage("First message"));
messages.Enqueue(new ServiceBusMessage("Second message"));
messages.Enqueue(new ServiceBusMessage("Third message"));

// create a message batch that we can send
// total number of messages to be sent to the Service Bus queue
int messageCount = messages.Count;

// while all messages are not sent to the Service Bus queue
while (messages.Count > 0)
{
    // start a new batch
    using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

    // add the first message to the batch
    if (messageBatch.TryAddMessage(messages.Peek()))
    {
        // dequeue the message from the .NET queue once the message is added to the batch
        messages.Dequeue();
    }
    else
    {
        // if the first message can't fit, then it is too large for the batch
        throw new Exception($"Message {messageCount - messages.Count} is too large and cannot be sent.");
    }

    // add as many messages as possible to the current batch
    while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))
    {
        // dequeue the message from the .NET queue as it has been added to the batch
        messages.Dequeue();
    }

    // now, send the batch
    await sender.SendMessagesAsync(messageBatch);

    // if there are any remaining messages in the .NET queue, the while loop repeats
}

Второй способ использует перегрузкуSendMessagesAsync, которая принимает IEnumerable .ServiceBusMessage С помощью этого метода мы попытаемся разместить все предоставленные сообщения в одном пакете сообщений, который мы будем отправлять в службу. Если сообщения слишком велики, чтобы поместиться в один пакет, операция вызовет исключение.

IList<ServiceBusMessage> messages = new List<ServiceBusMessage>();
messages.Add(new ServiceBusMessage("First"));
messages.Add(new ServiceBusMessage("Second"));
// send the messages
await sender.SendMessagesAsync(messages);

Получение пакета сообщений

// create a receiver that we can use to receive the messages
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
// a batch of messages (maximum of 2 in this case) are received
IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await receiver.ReceiveMessagesAsync(maxMessages: 2);

// go through each of the messages received
foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
{
    // get the message body as a string
    string body = receivedMessage.Body.ToString();
}

Завершение сообщения

Чтобы удалить сообщение из очереди или подписки, можно вызвать CompleteAsync метод .

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver that we can use to receive and settle the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// complete the message, thereby deleting it from the service
await receiver.CompleteMessageAsync(receivedMessage);

Отказ от сообщения

Отказ от сообщения снимает блокировку получателя, которая позволяет получать сообщение этим или другими получателями.

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// abandon the message, thereby releasing the lock and allowing it to be received again by this or other receivers
await receiver.AbandonMessageAsync(receivedMessage);

Откладывание сообщения

Откладывание сообщения предотвратит его повторное получение с помощью ReceiveMessageAsync методов или ReceiveMessagesAsync . Вместо этого существуют отдельные методы ReceiveDeferredMessageAsync и ReceiveDeferredMessagesAsync для получения отложенных сообщений.

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// defer the message, thereby preventing the message from being received again without using
// the received deferred message API.
await receiver.DeferMessageAsync(receivedMessage);

// receive the deferred message by specifying the service set sequence number of the original
// received message
ServiceBusReceivedMessage deferredMessage = await receiver.ReceiveDeferredMessageAsync(receivedMessage.SequenceNumber);

Недоставленные сообщения

Отправка недоставленных сообщений аналогична задержке с одним main разница заключается в том, что служба автоматически получает недоставленные сообщения после их получения определенного количества раз. Приложения могут вручную отправлять недоставленные сообщения в зависимости от своих требований. Если сообщение является недоставленным, оно фактически перемещается во вложенную очередь исходной очереди. Обратите внимание, что ServiceBusReceiver используется для получения сообщений из подзапроса недоставленных сообщений независимо от того, включена ли очередь main сеанса.

ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// Dead-letter the message, thereby preventing the message from being received again without receiving from the dead letter queue.
// We can optionally pass a dead letter reason and dead letter description to further describe the reason for dead-lettering the message.
await receiver.DeadLetterMessageAsync(receivedMessage, "sample reason", "sample description");

// receive the dead lettered message with receiver scoped to the dead letter queue.
ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
{
    SubQueue = SubQueue.DeadLetter
});
ServiceBusReceivedMessage dlqMessage = await dlqReceiver.ReceiveMessageAsync();

// The reason and the description that we specified when dead-lettering the message will be available in the received dead letter message.
string reason = dlqMessage.DeadLetterReason;
string description = dlqMessage.DeadLetterErrorDescription;

Дополнительные сведения см. в обзоре очередей недоставленных сообщений Служебной шины.

Использование процессора

Можно ServiceBusProcessor рассматривать как абстракцию вокруг набора приемников. Она использует модель обратного вызова, чтобы разрешить указывать код при получении сообщения и при возникновении исключения. Он обеспечивает автоматическое завершение обработанных сообщений, автоматическое продление блокировки сообщений и параллельное выполнение обработчиков событий, указанных пользователем. Из-за набора функций он должен быть средством перехода к написанию приложений, которые получают от сущностей служебной шины. ServiceBusReceiver рекомендуется использовать для более сложных сценариев, в которых процессор не может обеспечить точное управление, которое можно ожидать при непосредственном использовании ServiceBusReceiver.

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a set of messages that we can send
ServiceBusMessage[] messages = new ServiceBusMessage[]
{
    new ServiceBusMessage("First"),
    new ServiceBusMessage("Second")
};

// send the message batch
await sender.SendMessagesAsync(messages);

// create the options to use for configuring the processor
var options = new ServiceBusProcessorOptions
{
    // By default or when AutoCompleteMessages is set to true, the processor will complete the message after executing the message handler
    // Set AutoCompleteMessages to false to [settle messages](/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock) on your own.
    // In both cases, if the message handler throws an exception without settling the message, the processor will abandon the message.
    AutoCompleteMessages = false,

    // I can also allow for multi-threading
    MaxConcurrentCalls = 2
};

// create a processor that we can use to process the messages
await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options);

// configure the message and error handler to use
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    Console.WriteLine(body);

    // we can evaluate application logic and use that to determine how to settle the message.
    await args.CompleteMessageAsync(args.Message);
}

Task ErrorHandler(ProcessErrorEventArgs args)
{
    // the error source tells me at what point in the processing an error occurred
    Console.WriteLine(args.ErrorSource);
    // the fully qualified namespace is available
    Console.WriteLine(args.FullyQualifiedNamespace);
    // as well as the entity path
    Console.WriteLine(args.EntityPath);
    Console.WriteLine(args.Exception.ToString());
    return Task.CompletedTask;
}

// start processing
await processor.StartProcessingAsync();

// since the processing happens in the background, we add a Console.ReadKey to allow the processing to continue until a key is pressed.
Console.ReadKey();

Проверка подлинности с помощью Azure.Identity

Библиотека удостоверений Azure обеспечивает простую поддержку azure Active Directory для проверки подлинности.

// Create a ServiceBusClient that will authenticate through Active Directory
string fullyQualifiedNamespace = "yournamespace.servicebus.windows.net";
await using var client = new ServiceBusClient(fullyQualifiedNamespace, new DefaultAzureCredential());

Работа с сеансами

Сеансы предоставляют механизм для группировки связанных сообщений. Чтобы использовать сеансы, необходимо работать с сущностью с поддержкой сеанса.

Устранение неполадок

См. руководство по устранению неполадок служебной шины.

Дальнейшие действия

Помимо вводных сценариев, клиентская библиотека Служебная шина Azure предлагает поддержку дополнительных сценариев, которые помогут воспользоваться полным набором функций службы Служебная шина Azure. Чтобы помочь изучить некоторые из этих сценариев, клиентская библиотека служебной шины предлагает проект примеров, которые будут служить иллюстрацией для распространенных сценариев. Дополнительные сведения см. в примерах сведений .

Участие

На этом проекте приветствуются публикации и предложения. Для участия в большинстве процессов по разработке документации необходимо принять лицензионное соглашение участника (CLA), в котором указывается, что вы предоставляете нам права на использование ваших публикаций. Для получения подробных сведений посетите веб-страницу https://cla.microsoft.com.

При отправке запроса на включение внесенных изменений CLA-бот автоматически определит необходимость предоставления соглашения CLA и соответствующего оформления запроса на включение внесенных изменений (например, добавление метки, комментария). Просто следуйте инструкциям бота. Будет достаточно выполнить их один раз для всех репозиториев, поддерживающих соглашение CLA.

В рамках этого проекта действуют правила поведения в отношении продуктов с открытым исходным кодом Майкрософт. Дополнительные сведения см. в разделе часто задаваемых вопросов о правилах поведения или обратитесь к opencode@microsoft.com с любыми дополнительными вопросами или комментариями.

Дополнительные сведения см. в нашем руководстве по участию .

Просмотры