Реализация шины событий с помощью RabbitMQ для среды разработки или тестирования

Совет

Это содержимое является фрагментом из электронной книги, архитектуры микрослужб .NET для контейнерных приложений .NET, доступных в документации .NET или в виде бесплатного скачиваемого PDF-файла, который можно читать в автономном режиме.

Архитектура микрослужб .NET для контейнерных приложений .NET для эскиза обложки.

Начнем с того, что если вы создаете настраиваемую шину событий на основе RabbitMQ , работающей в контейнере, так как приложение eShopOnContainers делает, оно должно использоваться только для сред разработки и тестирования. Не используйте его для рабочей среды, если только вы не создаете ее в составе служебной шины, готовой к работе, как описано в разделе "Дополнительные ресурсы" ниже. Простая пользовательская шина событий может быть лишена многих критически важных для рабочей среды возможностей, которыми обладают коммерческие служебные шины.

Одна из пользовательских реализаций шины событий в eShopOnContainers по сути представляет собой библиотеку, использующую API RabbitMQ. (Существует и еще одна реализация на основе Служебной шины Azure.)

Реализация шины событий с помощью RabbitMQ позволяет микрослужбам подписываться на события, публиковать и принимать их, как показано на рисунке 6–21.

Схема, на которой показан API RabbitMQ между отправителем и получателем сообщений.

Рис. 6–21. Реализация шины событий на основе RabbitMQ

RabbitMQ выступает в роли посредника между издателем сообщения и подписчиками и обрабатывает распространение. В коде класс EventBusRabbitMQ реализует универсальный интерфейс IEventBus. Для этого применяется внедрение зависимостей, что позволяет переходить от этой версии для разработки и тестирования к рабочей версии.

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Implementation using RabbitMQ API
    //...
}

Реализация образца шины событий для разработки и тестирования на основе RabbitMQ представляет собой стандартный код. Она должна обрабатывать подключение к серверу RabbitMQ и предоставлять код для публикации события сообщения в очередях. Кроме того, должен быть реализован словарь коллекций, содержащий обработчики событий интеграции для каждого типа событий. Для каждого из этих типов событий могут применяться разные способы создания экземпляра и подписки для каждой микрослужбы-получателя, как показано на рисунке 6–21.

Реализация простого метода публикации с помощью RabbitMQ

Следующий код является упрощенной версией реализации шины событий для RabbitMQ, демонстрирующей весь сценарий. Вам необязательно обрабатывать подключение таким образом. Чтобы увидеть полную реализацию, просмотрите фактический код в репозитории dotnet-architecture/eShopOnContainers.

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Member objects and other methods ...
    // ...

    public void Publish(IntegrationEvent @event)
    {
        var eventName = @event.GetType().Name;
        var factory = new ConnectionFactory() { HostName = _connectionString };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: _brokerName,
                type: "direct");
            string message = JsonConvert.SerializeObject(@event);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: _brokerName,
                routingKey: eventName,
                basicProperties: null,
                body: body);
       }
    }
}

Реальный код метода Publish в приложении eShopOnContainers улучшен с помощью политики повтора Polly, которая пытается выполнить задачу повторно несколько раз, если контейнер RabbitMQ не готов. Это может произойти, если контейнеры запускаются с помощью docker-compose. Например, контейнер RabbitMQ может запускаться медленнее других.

Как было сказано ранее, в RabbitMQ возможно множество конфигураций, поэтому этот код следует использовать только в средах разработки и тестирования.

Реализация кода подписки с помощью интерфейса API RabbitMQ

Так же как и в случае с кодом публикации, приведенный ниже код представляет собой часть упрощенной реализации шины событий для RabbitMQ. Изменять его также обычно не нужно, если его не требуется улучшить.

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Member objects and other methods ...
    // ...

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        var eventName = _subsManager.GetEventKey<T>();

        var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
        if (!containsKey)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueBind(queue: _queueName,
                                    exchange: BROKER_NAME,
                                    routingKey: eventName);
            }
        }

        _subsManager.AddSubscription<T, TH>();
    }
}

С каждым типом событий связан канал для получения событий из RabbitMQ. Для каждого канала и типа событий может быть столько обработчиков событий, сколько требуется.

Метод Subscribe принимает объект IIntegrationEventHandler, который похож на метод обратного вызова в текущей микрослужбе, а также связанный с ним объект IntegrationEvent. Затем добавляется обработчик событий в список обработчиков событий, которые может иметь каждый тип событий интеграции в клиентской микрослужбе. Если код клиента еще не подписался на событие, для данного типа событий создается канал, который позволяет получать события из RabbitMQ принудительным образом, когда они публикуются из любой другой службы.

Как упоминалось выше, шина событий, реализованная в eShopOnContainers, предназначена только для образовательных целей, так как она обрабатывает только основные сценарии и не готова к рабочей среде.

Сведения о рабочих сценариях просмотрите в дополнительных ресурсах о RabbitMQ и разделе Реализация связи на основе событий между микрослужбами.

Дополнительные ресурсы

Готовое к работе решение с поддержкой RabbitMQ.

  • Peregrine Подключение . Упрощение интеграции с эффективным проектированием, развертыванием и управлением приложениями, API и рабочими процессами
    https://www.peregrineconnect.com/why-peregrine/rabbitmq-integration

  • NServiceBus — полностью поддерживаемая коммерческая шина служебной шины с расширенными средствами управления и мониторинга для .NET
    https://particular.net/

  • EasyNetQ — клиент API .NET с открытым кодом для RabbitMQ
    https://easynetq.com/

  • MassTransit — бесплатная платформа распределенных приложений с открытым исходным кодом для .NET
    https://masstransit-project.com/

  • Rebus — open source .NET служебная шина
    https://github.com/rebus-org/Rebus