Sdílet prostřednictvím


Pozorovatelé

Existují situace, kdy jednoduchý vzor zpráv a odpovědí nestačí a klient potřebuje přijímat asynchronní oznámení. Například uživatel může chtít být upozorněn, když přítel publikoval novou rychlou zprávu.

Pozorovatelé klientů jsou mechanismus, který umožňuje asynchronně upozorňovat klienty. Rozhraní pozorovatele musí dědit z IGrainObservera všechny metody musí vracet buď void, Task, Task<TResult>, , ValueTasknebo ValueTask<TResult>. Návratový void typ se nedoporučuje, protože může povzbuzovat použití async void implementace, což je nebezpečný vzor, protože může způsobit chybové ukončení aplikace v případě, že je z metody vyvolán výjimka. Místo toho zvažte OneWayAttribute použití metody rozhraní pozorovatele pro scénáře oznámení s nejlepším úsilím. To způsobí, že příjemce neodešle odpověď pro vyvolání metody a způsobí, že se metoda vrátí okamžitě v lokalitě volání, aniž by čekala na odpověď od pozorovatele. Zrno volá metodu na pozorovatele tím, že ji vyvolá jako jakoukoli metodu rozhraní odstupňovaného rozhraní. Modul Orleans runtime zajistí doručení požadavků a odpovědí. Běžným případem použití pozorovatelů je zařazení klienta pro příjem oznámení v případě, že dojde k události v Orleans aplikaci. Podrobné informace, které publikují taková oznámení, by měly poskytnout rozhraní API pro přidání nebo odebrání pozorovatelů. Kromě toho je obvykle vhodné zveřejnit metodu, která umožňuje zrušení stávajícího předplatného.

Vývojáři zrna mohou používat třídu nástrojů, jako ObserverManager<TObserver> je například zjednodušení vývoje pozorovaných typů zrn. Na rozdíl od zrn, která se po selhání automaticky znovu aktivují podle potřeby, klienti nejsou odolní proti chybám: klient, který selže, se nemusí nikdy obnovit. Z tohoto důvodu ObserverManager<T> nástroj odebere předplatná po nakonfigurované době. Klienti, kteří jsou aktivní, by se měli znovu připsat časovači, aby jejich předplatné zůstali aktivní.

Aby se klient přihlásil k odběru oznámení, musí nejprve vytvořit místní objekt, který implementuje rozhraní pozorovatele. Potom volá metodu v objektu pro pozorovatele, CreateObjectReference', aby se objekt přeměnil na odkaz agregace, který pak může být předán metodě odběru v oznamovacím agregačním intervalu.

Tento model lze také použít jinými zrnky k příjmu asynchronních oznámení. Zrna mohou také implementovat IGrainObserver rozhraní. Na rozdíl od případu předplatného klienta se odpisovací interval jednoduše implementuje rozhraní pozorovatele a předává odkaz na sebe sama (např. this.AsReference<IMyGrainObserverInterface>()). Není potřeba, CreateObjectReference() protože zrnka jsou již adresovatelná.

Příklad kódu

Předpokládejme, že máme podrobné informace, které pravidelně odesílají zprávy klientům. Pro zjednodušení bude zpráva v našem příkladu řetězec. Nejprve definujeme rozhraní na klientovi, který obdrží zprávu.

Rozhraní bude vypadat takto:

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

Jediná zvláštní věc je, že rozhraní by mělo dědit z IGrainObserver. Nyní každý klient, který chce sledovat tyto zprávy, by měl implementovat třídu, která implementuje IChat.

Nejjednodušším případem by bylo něco takového:

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

Na serveru bychom měli mít další informace, které tyto chatové zprávy odesílají klientům. Grain by také měl mít mechanismus pro přihlášení klientů k odběru a odhlášení odběru oznámení. U předplatných může agregační interval použít instanci třídy ObserverManager<TObserver>utility .

Poznámka:

ObserverManager<TObserver> je součástí Orleans verze 7.0. U starších verzí je možné zkopírovat následující implementaci .

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

K odeslání zprávy klientům Notify lze použít metodu ObserverManager<IChat> instance. Metoda přebírá metodu Action<T> nebo výraz lambda (kde T je zde typ IChat ). Můžete volat libovolnou metodu v rozhraní, která ji odešle klientům. V našem případě máme pouze jednu metodu ReceiveMessagea náš odesílající kód na serveru by vypadal takto:

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

Náš server teď má metodu odesílání zpráv klientům pozorovatele, dvě metody pro přihlášení k odběru nebo zrušení odběru a klient implementoval třídu, která dokáže sledovat odstupňované zprávy. Posledním krokem je vytvoření odkazu pozorovatele na klienta pomocí dříve implementované Chat třídy a nechat ji přijímat zprávy po přihlášení k odběru.

Kód by vypadal takto:

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

Vždy, když náš agregační interval na serveru volá metodu SendUpdateMessage , obdrží zprávu všichni přihlášení klienti. V našem klientském Chat kódu obdrží instance v proměnné c zprávu a vypíše ji do konzoly.

Důležité

Objekty předané CreateObjectReference se uchovávají prostřednictvím objektu WeakReference<T> a budou proto uvolněny z paměti, pokud neexistují žádné jiné odkazy.

Uživatelé by měli udržovat referenční informace pro každého pozorovatele, který nechtějí shromažďovat.

Poznámka:

Pozorovatelé jsou ze své podstaty nespolehlivé, protože klient, který je hostitelem pozorovatele, může selhat a pozorovatelé vytvořené po obnovení mají různé (randomizované) identity. ObserverManager<TObserver> spoléhá na pravidelné opětovné indexování pozorovateli, jak je popsáno výše, aby neaktivní pozorovatelé mohli být odebráni.

Model spouštění

Implementace jsou registrovány IGrainObserver prostřednictvím volání IGrainFactory.CreateObjectReference a každé volání této metody vytvoří nový odkaz, který odkazuje na danou implementaci. Orleans provede požadavky odeslané na každý z těchto odkazů po jednom k dokončení. Pozorovatelé nejsou znovu prováděni, a proto souběžné požadavky pozorovatele nebudou Orleansprokládáním . Pokud existuje více pozorovatelů, kteří přijímají žádosti souběžně, můžou se tyto požadavky spouštět paralelně. Provádění metod pozorovatele nejsou ovlivněny atributy, jako AlwaysInterleaveAttribute jsou například: ReentrantAttributeModel spouštění nelze přizpůsobit vývojářem.