Orleans-Streams: Details zur Implementierung

Dieser Abschnitt bietet eine allgemeine Übersicht über die Implementierung von Orleans-Stream. Es werden Konzepte und Details beschrieben, die auf Anwendungsebene nicht sichtbar sind. Wenn Sie Streams nur verwenden möchten, müssen Sie diesen Abschnitt nicht lesen.

Terminologie:

Wir verweisen mit dem Wort „Warteschlange“ auf jede dauerhafte Speichertechnologie, die Streamereignisse erfassen kann und entweder das Pullen von Ereignissen ermöglicht oder einen pushbasierten Mechanismus zum Nutzen von Ereignissen bereitstellt. Um Skalierbarkeit zu gewährleisten, stellen diese Technologien in der Regel Shard-/partitionierte Warteschlangen bereit. Mit Azure-Warteschlangen können Sie beispielsweise mehrere Warteschlangen erstellen, und Event Hubs verfügen über mehrere Hubs.

Persistente Streams

Alle persistenten Streamanbieter von Orleans nutzen eine gemeinsame Implementierung PersistentStreamProvider. Diese generischen Streamanbieter müssen mit einer technologiespezifischen IQueueAdapterFactory konfiguriert werden.

Beispielsweise verfügen wir zu Testzwecken über Warteschlangenadapter, die ihre Testdaten generieren, anstatt die Daten aus einer Warteschlange zu lesen. Der folgende Code zeigt, wie wir einen persistenten Streamanbieter konfigurieren, um unseren benutzerdefinierten Warteschlangenadapter (Generator) zu verwenden. Hierzu konfigurieren Sie den Anbieter für beständigen Datenstrom mit einer Factoryfunktion, die zum Erstellen des Adapters verwendet wird.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Wenn ein Streamproduzent ein neues Streamelement generiert und stream.OnNext() aufruft, ruft die Orleans-Streaming-Runtime die entsprechende Methode auf dem IQueueAdapter für diesen Streamanbieter auf, der das Element direkt in die entsprechende Warteschlange einbindet.

Pull-Agenten

Im Mittelpunkt des persistenten Streamanbieters stehen die Pull-Agenten. Pull-Agenten pullen Ereignisse aus einer Reihe dauerhafter Warteschlangen und übermitteln sie an den Anwendungscode in Grains, die sie nutzen. Sie können sich die Pull-Agenten als verteilten „Microservice“ vorstellen – eine partitionierte, hochverfügbare und elastisch verteilte Komponente. Die Pull-Agenten werden in denselben Silos ausgeführt, die Anwendungsgrains hosten und vollständig von der Orleans-Streaming-Runtime verwaltet werden.

StreamQueueMapper und StreamQueueBalancer

Pull-Agenten werden mit IStreamQueueMapper und IStreamQueueBalancer parametrisiert. Der IStreamQueueMapper stellt eine Liste aller Warteschlangen bereit und ist auch für das Zuordnen von Streams zu Warteschlangen verantwortlich. Auf diese Weise weiß die Producerseite des persistenten Streamanbieters, in welche Warteschlange die Nachricht eingereiht werden soll.

Der IStreamQueueBalancer drückt die Art und Weise aus, wie Warteschlangen zwischen Orleans-Silos und -Agenten ausgeglichen werden. Das Ziel besteht darin, Agenten Warteschlangen auf ausgewogene Weise zuzuweisen, um Engpässe zu vermeiden und die Elastizität zu unterstützen. Wenn dem Orleans-Cluster ein neues Silo hinzugefügt wird, werden Warteschlangen automatisch auf die alten und neuen Silos verteilt. Der StreamQueueBalancer ermöglicht das Anpassen dieses Prozesses. Orleans verfügt über mehrere integrierte StreamQueueBalancer, um verschiedene Ausgleichsszenarien (große und kleine Anzahl von Warteschlangen) und verschiedene Umgebungen (Azure, lokal, statisch) zu unterstützen.

Im folgenden Code wird das obige Testgeneratorbeispiel verwendet, um zu zeigen, wie der Warteschlangenzuordnungs- und Warteschlangenausgleich konfiguriert werden kann.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

Mit dem obigen Code wird die GeneratorAdapterFactory so konfiguriert, dass sie Warteschlangenzuordnungsprogramm mit acht Warteschlangen verwendet und die Warteschlangen über den Cluster mithilfe von DynamicClusterConfigDeploymentBalancer ausgeglichen werden.

Pull-Protokoll

Jedes Silo führt eine Reihe von Pull-Agenten aus, wobei jeder Agent aus einer Warteschlange pullt. Pull-Agenten selbst werden von einer internen Laufzeitkomponente namens SystemTarget implementiert. SystemTargets sind im Wesentlichen Runtime-Grains, sie unterliegen Singlethread-Parallelität, können reguläres Grain-Messaging verwenden und sind so einfach wie Grains. Im Gegensatz zu Grains sind SystemTargets nicht virtuell: Sie werden explizit (von der Runtime) erstellt und sind nicht standorttransparent. Durch die Implementierung von Pull-Agenten als SystemTargets kann sich die Orleans-Streaming-Runtime auf integrierte Orleans-Features verlassen und auf eine sehr große Anzahl von Warteschlangen skaliert werden, da das Erstellen eines neuen Pull-Agenten so einfach ist wie das Erstellen eines neuen Grains.

Jeder Pull-Agent führt einen regelmäßigen Timer aus, der durch Aufrufen der IQueueAdapterReceiver.GetQueueMessagesAsync-Methode aus der Warteschlange pullt. Die zurückgegebenen Nachrichten werden in die interne Datenstruktur pro Agent namens IQueueCache eingefügt. Jede Nachricht wird überprüft, um ihren Zieldatenstrom zu ermitteln. Der Agent verwendet die Pub-Sub, um die Liste der Stream-Consumer zu ermitteln, die diesen Stream abonniert haben. Sobald die Consumerliste abgerufen wurde, speichert der Agent sie lokal (in seinem Pub-Sub-Cache), sodass Pub-Sub er nicht bei jeder Nachricht konsultiert werden muss. Der Agent abonniert den Pub-Sub auch, um Benachrichtigungen über neue Consumer zu erhalten, die diesen Stream abonnieren. Dieser Handshake zwischen dem Agent und dem Pub-Sub garantiert eine starke Streamingabonnementsemantik: Sobald der Consumer den Stream abonniert hat, werden alle Ereignisse angezeigt, die nach dem Abonnement generiert wurden. Darüber hinaus ermöglicht die Verwendung von StreamSequenceToken ein Abonnieren in der Vergangenheit.

Warteschlangencache

IQueueCache ist eine interne Datenstruktur pro Agent, mit der neue Ereignisse von der Warteschlange entkoppelt und an Consumer gesendet werden können. Es ermöglicht auch die Entkopplung der Übermittlung an verschiedene Streams und verschiedene Consumer.

Stellen Sie sich eine Situation vor, in der ein Stream 3 Stream-Consumer hat und einer davon langsam ist. Wenn dies nicht der Fall ist, kann sich dieser langsame Consumer auf den Fortschritt des Agents auswirken, den Verbrauch anderer Consumer dieses Datenstroms verlangsamen und sogar das Dequeuing und die Übermittlung von Ereignissen für andere Streams verlangsamen. Um dies zu verhindern und maximale Parallelität im Agent zuzulassen, verwenden wir IQueueCache.

IQueueCache puffert Ereignisse und bietet eine Möglichkeit für den Agenten, Ereignisse in seinem eigenen Tempo an jeden Consumer zu übermitteln. Die Übermittlung pro Consumer wird durch die interne Komponente namens IQueueCacheCursor implementiert, die den Fortschritt pro Consumer nachverfolgt. Auf diese Weise empfängt jeder Consumer Ereignisse in seinem eigenen Tempo: Schnelle Verbraucher erhalten Ereignisse so schnell, wie sie aus der Warteschlange entfernt werden, während langsame Consumer sie später empfangen. Sobald die Nachricht an alle Consumer übermittelt wurde, kann sie aus dem Cache gelöscht werden.

Rückstaus

Der Rückstau in der Orleans-Streaming-Runtime gilt an zwei Stellen: Streamereignisse aus der Warteschlange an den Agent bringen und die Ereignisse vom Agent an Stream-Consumer übermitteln.

Letzteres wird durch den integrierten Orleans-Nachrichtenübermittlungsmechanismus bereitgestellt. Jedes Streamereignis wird vom Agenten nacheinander über das Standard-Orleans-Grain-Messaging an die Verbraucher übermittelt. Das heißt, die Agenten senden ein Ereignis (oder einen Batch mit Ereignissen begrenzter Größe) an jeden Stream-Consumer und warten auf diesen Aufruf. Das nächste Ereignis wird erst übermittelt, wenn die Aufgabe für das vorherige Ereignis aufgelöst oder unterbrochen wurde. Auf diese Weise beschränken wir natürlich die Übermittlungsrate pro Verbraucher auf jeweils eine Nachricht.

Beim Übertragen von Streamereignissen aus der Warteschlange in den Agenten bietet Orleans-Streaming einen neuen speziellen Rückstaumechanismus. Da der Agent das Entfernen von Ereignissen aus der Warteschlange entkoppelt und sie an Verbraucher sendet, kann ein einzelner langsamer Consumer so stark zurückfallen, dass sich der IQueueCache füllt. Um ein unbegrenztes Wachstum von IQueueCache zu verhindern, beschränken wir die Größe (die Größenbegrenzung ist konfigurierbar). Der Agent entsorgt jedoch nie nicht zugestellte Ereignisse.

Wenn sich der Cache zu füllen beginnt, verlangsamen die Agents stattdessen die Geschwindigkeit, Ereignisse aus der Warteschlange zu entfernen. Auf diese Weise können wir die langsamen Lieferzeiten „aussitzen“, indem wir die Rate anpassen, mit der wir aus der Warteschlange verbrauchen („Rückstau“) und später wieder zu schnellen Verbrauchsraten übergehen. Um die „langsame Übermittlung“ zu erkennen, verwendet der IQueueCache eine interne Datenstruktur von Cache-Buckets, die den Fortschritt der Übermittlung von Ereignissen an einzelne Stream-Consumer nachverfolgt. Dies führt zu einem sehr reaktionsfähigen und sich selbst anpassenden System.