Share via


Orleans Streams implementáció részletei

Ez a szakasz magas szintű áttekintést nyújt az Orleans Stream megvalósításáról. Az alkalmazás szintjén nem látható fogalmakat és részleteket ismerteti. Ha csak streameket szeretne használni, nem kell elolvasnia ezt a szakaszt.

Terminológia:

A "queue" szóra hivatkozunk minden olyan tartós tárolási technológiára, amely képes streames eseményeket beolvasni, és lehetővé teszi események lekérését, vagy leküldéses alapú mechanizmust biztosít az események felhasználásához. A méretezhetőség biztosítása érdekében ezek a technológiák általában horizontálisan particionált/particionált üzenetsorokat biztosítanak. Az Azure Queues például lehetővé teszi, hogy több üzenetsort hozzon létre, az Event Hubs pedig több központot is.

Állandó streamek

Minden Orleans állandó stream szolgáltatók közös megvalósítása PersistentStreamProvider. Ezeket az általános streamszolgáltatókat technológiaspecifikus IQueueAdapterFactorykonfigurálással kell konfigurálni.

Tesztelési célokból például vannak olyan üzenetsor-adapterek, amelyek a tesztelési adatokat generálják ahelyett, hogy beolvasnánk az adatokat egy üzenetsorból. Az alábbi kód bemutatja, hogyan konfigurálunk egy állandó streamszolgáltatót az egyéni (generátoros) üzenetsor-adapter használatára. Ezt úgy teszi, hogy konfigurálja az állandó streamszolgáltatót az adapter létrehozásához használt gyári függvénnyel.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Amikor egy streamgyártó létrehoz egy új streamelemet, és meghívja stream.OnNext(), az Orleans streamelési futtatókörnyezete meghívja a megfelelő metódust a IQueueAdapter streamszolgáltatón, amely közvetlenül a megfelelő üzenetsorba köti az elemet.

Ügynökök lekérése

Az állandó streamszolgáltató középpontjában a lekérési ügynökök állnak. Az ügynökök lekérése a tartós üzenetsorokból lekéri az eseményeket, és azokat az alkalmazás kódjába továbbítja azokat felemésztő szemcsékben. A lekérési ügynökökre úgy gondolhatunk, mint egy elosztott "mikroszolgáltatásra" – egy particionált, magas rendelkezésre állású és rugalmas elosztott összetevőre. A lekérési ügynökök ugyanabban a silóban futnak, amelyek az alkalmazás szemcséit üzemeltetik, és teljes mértékben az Orleans Streaming Runtime felügyeli.

StreamQueueMapper és StreamQueueBalancer

A lekérési ügynökök paraméterezve vannak a következővel IStreamQueueMapper : és IStreamQueueBalancer. Ez IStreamQueueMapper az összes üzenetsor listáját tartalmazza, és a streamek várólistákhoz való leképezéséért is felelős. Így az állandó streamszolgáltató gyártói oldala tudja, hogy melyik üzenetsorba szeretné beküldni az üzenetet.

Ez IStreamQueueBalancer azt fejezi ki, hogy az üzenetsorok hogyan oszlanak el orleansi silókban és ügynökökben. A cél az üzenetsorok kiegyensúlyozott hozzárendelése az ügynökökhöz a szűk keresztmetszetek elkerülése és a rugalmasság támogatása érdekében. Amikor új silót ad hozzá az Orleans-fürthöz, az üzenetsorok automatikusan újraegyensúlyozódnak a régi és az új silók között. Ez StreamQueueBalancer lehetővé teszi a folyamat testreszabását. Az Orleans számos beépített StreamQueueBalancer-et használ a különböző egyensúlyozási forgatókönyvek (nagy és kis számú üzenetsor) és különböző környezetek (Azure, helyszíni, statikus) támogatásához.

A fenti tesztgenerátor példáját használva az alábbi kód bemutatja, hogyan konfigurálható az üzenetsor-leképező és a várólista-kiegyensúlyozó.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

A fenti kód egy nyolc üzenetsort tartalmazó üzenetsor-leképező használatára konfigurálja GeneratorAdapterFactory , és a fürtben lévő üzenetsorokat a DynamicClusterConfigDeploymentBalancerhasználatával egyensúlyba állítja.

Lekérési protokoll

Minden siló lekérési ügynököket futtat, minden ügynök egy üzenetsorból. Maguk a lekéréses ügynökök egy belső futtatókörnyezeti összetevő, a SystemTarget implementálódnak. A SystemTargets lényegében futásidejű szemcsék, egyszálas egyidejűségnek vannak kitéve, normál szemcsés üzenetkezelést használhat, és ugyanolyan könnyűek, mint a szemek. A szemcsékkel ellentétben a SystemTarget-ek nem virtuálisak: explicit módon jönnek létre (a futtatókörnyezet által), és nem átláthatók a helyük. Az ügynökök SystemTargets-ként való implementálásával az Orleans Streaming Runtime beépített Orleans-funkciókra támaszkodhat, és nagyon sok üzenetsorra méretezhető, mivel egy új lekérési ügynök létrehozása ugyanolyan olcsó, mint egy új gabona létrehozása.

Minden lekérési ügynök egy rendszeres időzítőt futtat, amely a metódus meghívásával lekéri az IQueueAdapterReceiver.GetQueueMessagesAsync üzenetsorból. A visszaadott üzenetek az ügynökenkénti belső adatstruktúrába kerülnek.IQueueCache A rendszer minden üzenetet megvizsgál, hogy megtudja a célstreamet. Az ügynök a Pub-Sub használja az erre a streamre előfizetett streamfogyasztók listájának megkeresésére. A fogyasztói lista lekérése után az ügynök helyileg tárolja azt (a pub-sub cache-ben), így nem kell minden üzenetben konzultálnia Pub-Sub. Az ügynök feliratkozik a pub-sub szolgáltatásra is, hogy értesítést kapjon azokról az új fogyasztókról, amelyek feliratkoznak az adott streamre. Ez az ügynök és a pub-sub közötti kézfogás erős streamelési előfizetési szemantikát garantál: miután a fogyasztó feliratkozott a streamre, látni fogja az összes eseményt, amely az előfizetés után lett létrehozva. Emellett a korábbi StreamSequenceToken előfizetést is lehetővé teszi.

Üzenetsor-gyorsítótár

IQueueCache Az egy ügynökre jutó belső adatstruktúra, amely lehetővé teszi az új események leválasztását az üzenetsorból, és eljuttatja őket a fogyasztóknak. Emellett lehetővé teszi a különböző streamek és különböző fogyasztók számára történő kézbesítés szétválasztását is.

Imagine olyan helyzet, amikor egy stream 3 streamfogyasztóval rendelkezik, és az egyik lassú. Ha nem gondoskodik róla, ez a lassú fogyasztó befolyásolhatja az ügynök előrehaladását, lelassíthatja az adott stream többi felhasználójának fogyasztását, és akár lelassíthatja az események lekérését és kézbesítését más streamek esetében. Ennek elkerülése és a maximális párhuzamosság engedélyezése az ügynökben.IQueueCache

IQueueCache puffereli az eseményeket, és lehetővé teszi, hogy az ügynök a saját tempójában kézbesítse az eseményeket az egyes fogyasztóknak. A fogyasztónkénti kézbesítést az úgynevezett IQueueCacheCursorbelső összetevő valósítja meg, amely nyomon követi a fogyasztónkénti előrehaladást. Így minden fogyasztó a saját tempójában kapja meg az eseményeket: a gyors fogyasztók olyan gyorsan kapják meg az eseményeket, amilyen gyorsan le vannak távolítva az üzenetsorból, míg a lassú fogyasztók később kapják meg őket. Miután az üzenet minden felhasználónak el lett küldve, törölhető a gyorsítótárból.

Visszanyomás

Az Orleans Streaming Runtime-ban a visszanyomás két helyen érvényes: streames események továbbítása az üzenetsorból az ügynökhöz , és az események továbbítása az ügynöktől a streamfogyasztók számára.

Az utóbbit a beépített Orleans üzenetkézbesítési mechanizmus biztosítja. Minden streames eseményt az ügynök küld a fogyasztóknak a szabványos Orleans grain üzenetküldésen keresztül, egyenként. Ez azt jelent, hogy az ügynökök egy eseményt (vagy korlátozott méretű eseményköteget) küldenek az egyes streamfelhasználóknak, és várják ezt a hívást. A következő esemény nem indul el, amíg az előző eseményhez tartozó feladat fel nem oldódott vagy megszakadt. Így természetesen egyszerre egy üzenetre korlátozzuk a fogyasztónkénti kézbesítési arányt.

Amikor streames eseményeket hoz létre az üzenetsorból az ügynökhöz, az Orleans Streaming egy új speciális Backpressure mechanizmust biztosít. Mivel az ügynök leválasztja az eseményeket az üzenetsorról, és elküldi őket a fogyasztóknak, egyetlen lassú fogyasztó annyira lemaradhat, hogy a rendszer kitölti őket IQueueCache . A határozatlan ideig történő növekedés megakadályozása IQueueCache érdekében korlátozzuk a méretét (a méretkorlát konfigurálható). Az ügynök azonban soha nem dob el nem kézbesített eseményeket.

Ehelyett, amikor a gyorsítótár elkezd töltődni, az ügynökök lelassítják az események lekérésének sebességét az üzenetsorból. Ily módon a lassú kézbesítési időszakokat "kihozhatjuk" a várakozási sorból (backpressure) felhasznált mennyiség módosításával, és később visszatérhetünk a gyors fogyasztási arányokhoz. A "lassú kézbesítés" völgyeinek észleléséhez a IQueueCache gyorsítótár-gyűjtők belső adatstruktúráját használja, amely nyomon követi az eseményeknek az egyes streamfogyasztók számára történő kézbesítésének előrehaladását. Ez egy nagyon rugalmas és önbeállító rendszert eredményez.