Создание бизнес-приложений на основе сообщений с помощью NServiceBus и Служебной шины Azure

NServiceBus — это коммерческая платформа обмена сообщениями, предоставляемая Particular Software. Созданное на основе Служебной шины Azure, это решение помогает разработчикам сосредоточиться на бизнес-логике, абстрагируя вопросы, связанные с инфраструктурой. В этом руководстве показано, как создать решение, которое выполняет обмен сообщениями между двумя службами. Вы также узнаете, как автоматически повторять отправку недоставленных сообщений и рассмотрите варианты размещения этих служб в Azure.

Примечание

Код для этого руководства доступен на веб-сайте Particular Software Docs.

Предварительные требования

В примере предполагается, что вы создали пространство имен Служебной шины Azure.

Важно!

Для NServiceBus требуется по меньшей мере уровень "Стандартный". Уровень "Базовый" не подойдет.

Скачивание и подготовка решения

  1. Скачайте код с веб-сайта Particular Software Docs. Решение SendReceiveWithNservicebus.sln состоит из трех проектов:

    • Sender — консольное приложение, которое отправляет сообщения.
    • Receiver — консольное приложение, которое получает сообщения от отправителя и отвечает на них.
    • Shared — библиотека классов, содержащая контракты сообщений, совместно используемые отправителем и получателем.

    На следующей схеме, созданной ServiceInsight, средством визуализации и отладки от Particular Software, показан поток сообщений:

    Изображение: схема последовательностей

  2. Откройте SendReceiveWithNservicebus.sln в удобном для вас редакторе кода, например Visual Studio 2019.

  3. Откройте appsettings.json в проектах Receiver и Sender и задайте для AzureServiceBusConnectionString строку подключения для пространства имен Служебной шины Azure.

Определение общих контрактов сообщений

Библиотека классов Shared позволяет определить контракты, используемые для отправки сообщений. Она включает ссылку на пакет NuGet NServiceBus с интерфейсами, которые можно использовать для определения сообщений. Интерфейсы не являются обязательными, но они предоставляют дополнительную проверку от NServiceBus и обеспечивают самодокументируемость кода.

Сначала рассмотрим класс Ping.cs.

public class Ping : NServiceBus.ICommand
{
    public int Round { get; set; }
}

Класс Ping определяет сообщение, которое Sender отправляет Receiver. Это простой класс C#, реализующий интерфейс NServiceBus.ICommand из пакета NServiceBus. Это сообщение информирует читателя и NServiceBus, что это команда, хотя существуют и другие способы определения сообщений без использования интерфейсов.

Другой класс сообщений в проектах Shared — Pong.cs:

public class Pong : NServiceBus.IMessage
{
    public string Acknowledgement { get; set; }
}

Pong также является простым объектом C#, хотя реализует NServiceBus.IMessage. Интерфейс IMessage представляет общее сообщение, которое не является ни командой, ни событием, и часто используется для ответов. В нашем примере это ответ, который Receiver отправляет обратно Sender, чтобы указать, что сообщение было получено.

Вы будете использовать типы сообщения Ping и Pong. Следующим шагом будет настройка Sender для использования Служебной шины Azure и отправки сообщения Ping.

Настройка отправителя

Sender — это конечная точка, которая отправляет сообщение Ping. Вам нужно настроить Sender для использования Служебной шины Azure в качестве механизма транспортировки, а затем создать и отправить экземпляр Ping.

В методе Main файла Program.cs нужно настроить конечную точку Sender:

var host = Host.CreateDefaultBuilder(args)
    // Configure a host for the endpoint
    .ConfigureLogging((context, logging) =>
    {
        logging.AddConfiguration(context.Configuration.GetSection("Logging"));

        logging.AddConsole();
    })
    .UseConsoleLifetime()
    .UseNServiceBus(context =>
    {
        // Configure the NServiceBus endpoint
        var endpointConfiguration = new EndpointConfiguration("Sender");

        var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
        var connectionString = context.Configuration.GetConnectionString("AzureServiceBusConnectionString");
        transport.ConnectionString(connectionString);

        transport.Routing().RouteToEndpoint(typeof(Ping), "Receiver");

        endpointConfiguration.EnableInstallers();
        endpointConfiguration.AuditProcessedMessagesTo("audit");

        return endpointConfiguration;
    })
    .ConfigureServices(services => services.AddHostedService<SenderWorker>())
    .Build();

await host.RunAsync();

Этот код достаточно объемный, поэтому мы рассмотрим его по частям.

Настройка узла для конечной точки

Размещение и ведение журнала настраиваются с помощью стандартных параметров универсального узла Майкрософт. Сейчас конечная точка настроена для запуска в качестве консольного приложения, но в нее можно внести минимальные изменения для запуска в Функциях Azure, что будет обсуждаться далее в этой статье.

Настройка конечной точки NServiceBus

Далее нужно указать узлу использовать NServiceBus с помощью метода расширения .UseNServiceBus(…). Метод принимает функцию обратного вызова для получения конечной точки, которая будет запускаться при запуске узла.

В конфигурации конечной точки определите AzureServiceBus для транспортировки, предоставив строку подключения из appsettings.json. Затем настройте маршрутизацию так, чтобы сообщения типа Ping отправлялись в конечную точку с именем Receiver. Это позволяет NServiceBus автоматизировать процесс отправки сообщения в назначение, не требуя адреса получателя.

При вызове функции EnableInstallers будет настроена топология в пространстве имен Служебной шины Azure при запуске конечной точки. При необходимости будут созданы нужные очереди. В рабочих средах еще одним способом создания топологии является операционное написание скриптов.

Настройка фоновой службы для отправки сообщений

Оставшаяся часть отправителя — это SenderWorker, фоновая служба, которая настроена для отправки сообщения Ping каждую секунду.

public class SenderWorker : BackgroundService
{
    private readonly IMessageSession messageSession;
    private readonly ILogger<SenderWorker> logger;

    public SenderWorker(IMessageSession messageSession, ILogger<SenderWorker> logger)
    {
        this.messageSession = messageSession;
        this.logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            var round = 0;
            while (!stoppingToken.IsCancellationRequested)
            {
                await messageSession.Send(new Ping { Round = round++ })
                    .ConfigureAwait(false);

                logger.LogInformation($"Message #{round}");

                await Task.Delay(1_000, stoppingToken)
                    .ConfigureAwait(false);
            }
        }
        catch (OperationCanceledException)
        {
            // graceful shutdown
        }
    }
}

IMessageSession в ExecuteAsync вставляется в SenderWorker и позволяет отправлять сообщения с помощью NServiceBus за пределами обработчика сообщений. Маршрутизация, настроенная в Sender, определяет назначение сообщений Ping. При этом топология системы (какие сообщения направляются на какие адреса) хранится отдельно от бизнес-кода.

Приложение Sender также содержит PongHandler. Мы вернемся к нему после того, как рассмотрим Receiver, что мы сделаем дальше.

Настройка получателя

Receiver — это конечная точка, которая ожидает сообщения Ping, регистрирует время получения сообщения и отправляет ответ обратно отправителю. В этом разделе мы быстро рассмотрим конфигурацию конечной точки, аналогичную Sender, а затем перейдем к обработчику сообщений.

Как и в случае с отправителем, настройте получателя в качестве консольного приложения с использованием универсального узла Майкрософт. Получатель использует ту же конфигурацию ведения журнала и конечных точек (служебная шина Azure используется для транспортировки сообщений), но с другим именем, чтобы отличать его от отправителя:

var endpointConfiguration = new EndpointConfiguration("Receiver");

Так как эта конечная точка отвечает только отправителю и не запускает новые беседы, настройка маршрутизации не требуется. Кроме того, в отличие от отправителя, фоновый рабочий процесс не требуется, так как получатель отвечает только при получении сообщения.

Обработчик сообщений Ping

Проект Receiver содержит обработчик сообщений с именем PingHandler:

public class PingHandler : NServiceBus.IHandleMessages<Ping>
{
    private readonly ILogger<PingHandler> logger;

    public PingHandler(ILogger<PingHandler> logger)
    {
        this.logger = logger;
    }

    public async Task Handle(Ping message, IMessageHandlerContext context)
    {
        logger.LogInformation($"Processing Ping message #{message.Round}");

        // throw new Exception("BOOM");

        var reply = new Pong { Acknowledgement = $"Ping #{message.Round} processed at {DateTimeOffset.UtcNow:s}" };

        await context.Reply(reply);
    }
}

Не будем сейчас рассматривать закомментированный код. Мы вернемся к нему позже, когда будем обсуждать вопрос восстановления после сбоя.

Класс реализует IHandleMessages<Ping> с определением одного метода: Handle. Этот интерфейс сообщает NServiceBus, что, когда конечная точка получает сообщение типа Ping, она должна обрабатываться методом Handle в этом обработчике. Метод Handle принимает само сообщение в качестве параметра и объект IMessageHandlerContext, который позволяет выполнять дальнейшие операции обмена сообщениями, такие как ответ, отправка команд и публикация событий.

PingHandler действует просто — при получении сообщения Ping регистрирует сведения о нем и отвечает отправителю в новом сообщении Pong.

Примечание

В конфигурации Sender указано, что сообщения Ping должны направляться Receiver. NServiceBus добавляет метаданные в сообщения, указывающие, помимо прочего, происхождение сообщения. Именно поэтому вам не нужно указывать данные маршрутизации для ответного сообщения Pong. Оно автоматически перенаправляется обратно к своему источнику — Sender.

Теперь, когда Sender и Receiver настроены правильно, можно запустить решение.

Запуск решения

Чтобы запустить решение, необходимо запустить Sender и Receiver. Если вы используете Visual Studio Code, запустите конфигурацию "Отладить все". Если вы используете Visual Studio, настройте решение для запуска проектов Sender и Receiver:

  1. В Обозревателе решений щелкните решение правой кнопкой мыши.
  2. Выберите "Назначить запускаемые проекты".
  3. Выберите Несколько запускаемых проектов.
  4. В раскрывающемся списке выберите "Запуск" для Sender и Receiver.

Запустите решение. Отобразятся два консольных приложения: для Sender и Receiver.

В Sender обратите внимание, что сообщение Ping отправляется каждую секунду благодаря фоновому заданию SenderWorker. Receiver отображает сведения о каждом полученном сообщении Ping, а Sender регистрирует сведения о каждом сообщении Pong, которое приходят в ответ.

Теперь, когда все работает, давайте попробуем что-то сломать.

Устойчивость в работе

Ошибки в программных системах — это неизбежно. Код может завершиться ошибкой по различным причинам, например из-за сбоя сети, блокировки базы данных, изменения в сторонних API и просто из-за ошибки в коде.

NServiceBus обладает надежными функциями восстановления для обработки сбоев. При сбое обработчика сообщений сообщения автоматически повторяются на основе предопределенной политики. Есть два типа политики повторов: немедленные и отложенные попытки повтора. Лучший способ понять их работу — увидеть их в действии. Добавим политику повтора в конечную точку Receiver:

  1. Откройте Program.cs в проекте Sender.
  2. После строки .EnableInstallers добавьте следующие код:
endpointConfiguration.SendFailedMessagesTo("error");
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(
    immediate =>
    {
        immediate.NumberOfRetries(3);
    });
recoverability.Delayed(
    delayed =>
    {
        delayed.NumberOfRetries(2);
        delayed.TimeIncrease(TimeSpan.FromSeconds(5));
    });

Прежде чем обсуждать, как работает эта политика, давайте посмотрим на нее в действии. Перед тестированием политики восстановления необходимо симитировать ошибку. Откройте код PingHandler в проекте Receiver и раскомментируйте следующую строку:

throw new Exception("BOOM");

Теперь при обработке сообщения Ping в работе Receiver будет происходить сбой. Запустите решение еще раз и давайте посмотрим, что происходит в Receiver.

При менее надежном обработчике PingHandler при получении любых сообщений будет происходить сбой. Вы можете увидеть, как для этих сообщений запускается политика повтора. При первом сбое сообщения немедленно выполняются максимум три повторные попытки:

Изображение: политика незамедлительных повторных попыток, которая повторяет обработку сообщений до трех раз.

Разумеется, повторы будут завершаться сбоем, поэтому после выполнения трех немедленных попыток повтора включается политика отложенных повторов, и обработка сообщения задерживается на пять секунд:

Изображение: политика отложенных повторов, которая задерживает сообщения с шагом в пять секунд перед попыткой выполнения очередного цикла незамедлительных повторов.

По истечении пяти секунд выполняется еще три попытки обработать сообщение (т. е. еще одна итерация политики незамедлительного повтора). Это также приведет к сбою, и NServiceBus снова задержит сообщение, на этот раз на 10 секунд, прежде чем повторить попытку.

Если PingHandler не сможет обработать сообщение после выполнения полной политики повтора, оно будет помещено в централизованную очередь ошибок с именем error, согласно определению вызова к SendFailedMessagesTo.

Изображение: сообщение, которое не удалось обработать.

Концепция централизованной очереди ошибок отличается от механизма очереди недоставленных сообщений в Служебной шине Azure с очередью недоставленных сообщений для каждой очереди обработки. С помощью NServiceBus очереди недоставленных сообщений в Служебной шине Azure выступают как истинные очереди сообщений о сбое, тогда как сообщения, которые попадают в централизованную очередь ошибок, могут быть повторно обработаны позже, если это необходимо.

Политика повтора помогает устранить несколько типов ошибок, которые часто бывают временными по своей сути. Такие ошибки часто исчезают, если сообщение просто повторно обрабатывается после небольшой задержки. К примерам относятся сбои сети, блокировки баз данных и сбои сторонних API.

Как только сообщение появится в очереди ошибок, можно просмотреть сведения о нем в выбранном инструменте, а затем решить, что с ним делать. Например, с помощью ServicePulse, средства мониторинга от Particular Software, можно просмотреть сведения о сообщении и причину сбоя:

Изображение: ServicePulse от Particular Software

После изучения сведений можно отправить сообщение обратно в исходную очередь для обработки. Кроме того, перед этим сообщение можно изменить. Если в очереди ошибок есть несколько сообщений, которые по одной причине завершились сбоем, их все можно отправить обратно в исходные назначения как пакет.

Теперь пора выяснить, где развернуть наше решение в Azure.

Размещение служб в Azure

В этом примере конечные точки Sender и Receiver настраиваются для запуска в качестве консольных приложений. Они также могут размещаться в различных службах Azure, включая Функции Azure, Службы приложений Azure, Экземпляры контейнеров Azure, Службы Azure Kubernetes и виртуальные машины Azure. Например, вот как можно настроить конечную точку Sender для запуска в качестве функции Azure:

[assembly: FunctionsStartup(typeof(Startup))]
[assembly: NServiceBusEndpointName("Sender")]

public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.UseNServiceBus(() =>
        {
            var configuration = new ServiceBusTriggeredEndpointConfiguration("Sender");
            var transport = configuration.AdvancedConfiguration.Transport;
            transport.Routing().RouteToEndpoint(typeof(Ping), "Receiver");

            return configuration;
        });
    }
}

Дополнительные сведения об использовании NServiceBus с Функциями см. в статье Использование Функций Azure со Служебной шиной Azure в документации по NServiceBus.

Следующие шаги

Дополнительные сведения об использовании NServiceBus в службах Azure см. в следующих статьях: