Поделиться через


Сведения о реализации потоков Orleans

В этом разделе представлен общий обзор реализации потока Orleans. Здесь описываются основные понятия и сведения, которые не видны на уровне приложения. Если вы планируете использовать только потоки, не нужно читать этот раздел.

Терминология:

Мы называем слово "Queue" (очередь) для любой технологии долговременного хранения, которая может принимать потоковые события и позволять либо получать события, либо использовать механизм принудительной отправки для использования событий. Как правило, для обеспечения масштабируемости эти технологии предоставляют сегментированные и секционированные очереди. Например, очереди Azure позволяют создавать несколько очередей, а концентраторы событий имеют несколько концентраторов.

Постоянные потоки

Все поставщики постоянных потоков Orleans совместно используют общую реализацию PersistentStreamProvider . Эти универсальные поставщики потоков должны быть настроены с учетом особенностей конкретной IQueueAdapterFactory технологии.

Например, для целей тестирования у нас есть адаптеры очередей, которые создают свои тестовые данные, а не считывают данные из очереди. В приведенном ниже коде показано, как настроить поставщик постоянного потока для использования пользовательского адаптера очереди (генератора). Для этого нужно настроить поставщик постоянного потока с помощью функции фабрики, используемой для создания адаптера.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Когда производитель потока создает новый потоковый элемент и вызывает stream.OnNext() , среда выполнения потоковой передачи Orleans вызывает соответствующий метод для IQueueAdapter этого поставщика потока, который помещает элемент непосредственно в соответствующую очередь.

Извлечение агентов

Ядром постоянного потока являются агенты извлечения. Извлечение агентов выдает события из набора устойчивых очередей и доставляет их в код приложения в неконтролируемых гранях. Можно считать, что агенты поочередного извлечения являются распределенными "микрослужбой" — секционированным, высокодоступным и эластичным распределенным компонентом. Агенты загружаются в одни и те же приемники, которые размещают грани приложений и полностью управляются средой выполнения потоковой передачи Orleans.

StreamQueueMapper и StreamQueueBalancer

Извлечение агентов параметризовано с помощью IStreamQueueMapper и IStreamQueueBalancer . IStreamQueueMapperПредоставляет список всех очередей, а также отвечает за сопоставление потоков с очередями. Таким образом, сторона производителя устойчивого поставщика потоков знает, в какую очередь поместить сообщение в очередь.

IStreamQueueBalancerВыражает способ распределения очередей между Orleansыми приемниками и агентами. Целью является назначение очередей для агентов в сбалансированном виде, чтобы предотвратить узкие места и обеспечить эластичность. При добавлении нового приемника в кластер Orleans очереди автоматически перераспределяются по старым и новым приемникам. StreamQueueBalancerПозволяет настраивать этот процесс. Orleans имеет несколько встроенных Стреамкуеуебаланцерс для поддержки различных сценариев балансировки (большого и небольшого количества очередей) и различных сред (Azure, локального, статического).

Используя приведенный выше пример генератора тестов, в приведенном ниже коде показано, как можно настроить средство сопоставления очередей и подсистемы балансировки очереди.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

Приведенный выше код настраивает GeneratorAdapterFactory для использования средства сопоставления очередей с восемью очередями и балансировки очередей в кластере с помощью DynamicClusterConfigDeploymentBalancer .

Протокол извлечения

Каждый объект-приемник запускает набор агентов, которые каждый агент извлекает из одной очереди. Извлечение самих агентов реализуется внутренним компонентом среды выполнения, именуемым системтаржет. Системтаржетс — это, по сути, грани времени выполнения, которые подчиняются однопотоковому параллелизму, могут использовать обычный обмен сообщениями и быть как небольшие грани. В отличие от граней, Системтаржетс не являются виртуальными: они явным образом создаются (средой выполнения) и не являются прозрачными для расположения. Реализуя получение агентов как Системтаржетс, среда выполнения потоковой передачи Orleans может полагаться на встроенные функции Orleans и может масштабироваться до очень большого количества очередей, так как создание нового агента получения данных настолько дешево, как создание нового уровня детализации.

Каждый агент извлечения запускает периодический таймер, который извлекает из очереди, вызывая IQueueAdapterReceiver.GetQueueMessagesAsync метод. Возвращаемые сообщения помещаются во внутреннюю структуру данных каждого агента с именем IQueueCache . Каждое сообщение просматривается для определения потока назначения. Агент использует Pub-Sub, чтобы узнать список потребителей потоков, подписанных на этот поток. После получения списка потребителей агент сохраняет его локально (в своем кэше pub), поэтому ему не нужно обращаться к Pub-Sub при каждом сообщении. Агент также подписывается на Pub-подписку, чтобы получить уведомления всех новых потребителей, подписанных на этот поток. Это подтверждение семантики между агентом и Pub-подписыванием гарантирует надежную функцию потоковой передачи подписок: после того, как потребитель подписался на поток, он увидит все события, созданные после подписки на него. Кроме того, использование StreamSequenceToken разрешает ИТ подписываться в прошлом.

Кэш очереди

IQueueCache — Это внутренняя структура данных на основе агентов, которая позволяет разделять новые события от очереди и предоставлять их потребителям. Она также позволяет отделить доставку к различным потокам и разным потребителям.

Imagine ситуация, когда один поток имеет 3 потребителя потоков и один из них работает слишком долго. Если не принять меры, медленный потребитель может повлиять на ход работы агента, снизить потребление других потребителей этого потока и даже снизить отправку и доставку событий для других потоков. Чтобы предотвратить это и разрешить в агенте максимальный параллелизм, мы используем IQueueCache .

IQueueCache буферизация потоковых событий и предоставляет агенту способ доставки событий каждому потребителю в собственном темпе. Доставка на потребителей реализуется внутренним компонентом с именем IQueueCacheCursor , который отслеживает ход выполнения для каждого потребителя. Таким образом, каждый потребитель получает события в своем собственном темпе: быстрые потребители получают события так быстро, как они удаляются из очереди, а медленные пользователи получают их позже. После доставки сообщения всем потребителям его можно удалить из кэша.

Обратная реакция

Обратная нагрузка в среде выполнения потоковой передачи Orleans применяется в двух местах: переводить потоковые события из очереди в агент и передавать события от агента для потокового воспроизведения потребителям.

Последнее обеспечивается встроенным механизмом доставки сообщений Orleans. Каждое событие потока доставляется от агента потребителям через стандартный обмен сообщениями Orleans, по одному за раз. То есть агенты отправляют одно событие (или пакет событий ограниченного размера) каждому потребителю потока и ожидают этот вызов. Следующее событие не будет доставлено, пока задача для предыдущего события не была разрешена или нарушена. Таким образом мы будем ограничивать скорость доставки для каждого пользователя на одно сообщение за раз.

При переносе потоковых событий из очереди в агент потоковая передача Orleans предоставляет новый Специальный механизм обратного давления. Поскольку агент отделяет очередь от постановки событий из очереди и доставляет их потребителям, одновременный потребитель может понизиться до тех IQueueCache пор, пока компонент заполнит заполнение. Чтобы предотвратить IQueueCache неограниченное увеличение размера, мы не будем ограничивать его размер (можно настроить ограничение размера). Однако агент никогда не выбрасывает недоставленные события.

Вместо этого, когда начинается заполнение кэша, агенты замедляют скорость событий вывода из очереди. Таким образом, мы можем «выставить» медленные периоды доставки, настроив скорость, с которой мы используем из очереди ("Обратная нагрузка") и вернемся к скорости быстрого потребления позже. Чтобы обнаружить «низкую поставку», IQueueCache использует внутреннюю структуру данных контейнеров кэша, которая отслеживает ход доставки событий отдельным потребителям потока. Это приводит к очень быстрому реагированию и системе самонастройки.