Beobachter

Es gibt Situationen, in denen ein einfaches Nachrichten-/Antwortmuster nicht ausreicht und der Client asynchrone Benachrichtigungen empfangen muss. So möchte ein Benutzer z. B. benachrichtigt werden, wenn ein Freund eine neue Chatnachricht veröffentlicht hat.

„Clientbeobachter“ ist ein Mechanismus, der es ermöglicht, Clients asynchron zu benachrichtigen. Beobachterschnittstellen müssen von IGrainObserver erben, und alle Methoden müssen entweder void, Task, Task<TResult>, ValueTask oder ValueTask<TResult> zurückgeben. Ein Rückgabetyp von void ist nicht empfehlenswert, da er die Verwendung von async void in der Implementierung fördern kann. Dies ist ein gefährliches Muster, da es zu einem Absturz der Anwendung führen kann, wenn eine Ausnahme von der Methode ausgelöst wird. Ziehen Sie stattdessen in Erwägung, für Szenarien, in denen eine Benachrichtigung nach bestem Wissen und Gewissen erfolgen soll, das OneWayAttribute auf die Schnittstellenmethode des Beobachters anzuwenden. Dies führt dazu, dass der Empfänger keine Antwort auf den Methodenaufruf sendet und die Methode sofort zu der Aufrufsite zurückkehrt, ohne auf eine Antwort des Beobachters zu warten. Ein Grain ruft eine Methode für einen Beobachter auf, indem es sie wie jede andere Methode der Grainschnittstelle aufruft. Die Orleans-Runtime stellt die Übermittlung von Anforderungen und Antworten sicher. Ein häufig verwendeter Anwendungsfall für Beobachter ist, einen Client für den Empfang von Benachrichtigungen einzutragen, wenn ein Ereignis in der Orleans-Anwendung eintritt. Ein Grain, das solche Benachrichtigungen veröffentlicht, sollte eine API bereitstellen, um Beobachter hinzuzufügen oder zu entfernen. Darüber hinaus ist es in der Regel praktisch, eine Methode verfügbar zu machen, mit der ein vorhandenes Abonnement gekündigt werden kann.

Grainentwickler können eine Hilfsklasse wie ObserverManager<TObserver> verwenden, um die Entwicklung von beobachteten Graintypen zu vereinfachen. Im Gegensatz zu Grains, die nach einem Fehler automatisch reaktiviert werden, sind Clients nicht fehlertolerant: Ein fehlerhafter Client kann möglicherweise nie wiederhergestellt werden. Aus diesem Grund entfernt das Hilfsprogramm ObserverManager<T> die Abonnements nach einer konfigurierten Dauer. Aktive Clients sollten sich mit einem Timer neu anmelden, damit ihr Abonnement aktiv bleibt.

Um eine Benachrichtigung zu abonnieren, muss der Client zunächst ein lokales Objekt erstellen, das die Beobachterschnittstelle implementiert. Er ruft dann eine Methode in der Beobachterfactory CreateObjectReference auf, um das Objekt in einen Grainverweis zu verwandeln, der dann an die Abonnementmethode des benachrichtigenden Grains übergeben werden kann.

Dieses Modell kann auch von anderen Grains verwendet werden, um asynchrone Benachrichtigungen zu empfangen. Grains können auch IGrainObserver-Schnittstellen implementieren. Anders als beim Clientabonnement implementiert das abonnierende Grain einfach die Beobachterschnittstelle und übergibt einen Verweis auf sich selbst (z. B. this.AsReference<IMyGrainObserverInterface>()). Es besteht keine Notwendigkeit für CreateObjectReference(), da Grains bereits adressierbar sind.

Codebeispiel

Nehmen wir an, wir verfügen über ein Grain, das in regelmäßigen Abständen Nachrichten an Clients sendet. Der Einfachheit halber wird die Nachricht in unserem Beispiel eine Zeichenfolge sein. Zunächst definieren wir die Schnittstelle auf dem Client, der die Nachricht empfangen soll.

Die Schnittstelle sieht wie folgt aus:

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

Die einzige Besonderheit ist, dass die Schnittstelle von IGrainObserver erben sollte. Jetzt sollte jeder Client, der diese Nachrichten beobachten möchte, eine Klasse implementieren, die IChat implementiert.

Der einfachste Fall wäre in etwa wie folgt:

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

Auf dem Server sollten wir als nächstes über ein Grain verfügen, das diese Chatnachrichten an die Clients sendet. Das Grain sollte auch über einen Mechanismus verfügen, mit dem Clients Benachrichtigungen abonnieren und abbestellen können. Für Abonnements kann das Grain eine Instanz der Hilfsklasse ObserverManager<TObserver> verwenden.

Hinweis

ObserverManager<TObserver> ist seit Version 7.0 Teil von Orleans. Für ältere Versionen kann die folgende Implementierung kopiert werden.

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

Zum Senden einer Nachricht an Clients kann die Methode Notify der ObserverManager<IChat>-Instanz verwendet werden. Die Methode übernimmt eine Action<T>-Methode oder einen Lambdaausdruck (wobei T hier vom Typ IChat ist). Sie können eine beliebige Methode für die Schnittstelle aufrufen, um sie an Clients zu senden. In unserem Fall haben wir nur eine Methode, ReceiveMessage, und unser Sendecode auf dem Server würde wie folgt aussehen:

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

Jetzt verfügt unser Server über eine Methode zum Senden von Nachrichten an Beobachterclients, zwei Methoden zum Abonnieren/Abbestellen, und der Client hat eine Klasse implementiert, die die Grainnachrichten beobachten kann. Der letzte Schritt besteht darin, mithilfe unserer zuvor implementierten Chat-Klasse einen Beobachterverweis auf den Client zu erstellen und ihn die Nachrichten empfangen zu lassen, nachdem er sie abonniert hat.

Der Code würde wie folgt aussehen:

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

Wenn unser Grain auf dem Server jetzt die Methode SendUpdateMessage aufruft, erhalten alle abonnierten Clients die Nachricht. In unserem Clientcode empfängt die Instanz Chat in der Variablen c die Nachricht und gibt sie auf der Konsole aus.

Wichtig

Objekte, die an CreateObjectReference übergeben werden, werden über eine WeakReference<T> gehalten und erfahren daher eine Garbage Collection, wenn keine anderen Verweise vorhanden sind.

Benutzer sollten für jeden Beobachter, der nicht gesammelt werden soll, einen Verweis beibehalten.

Hinweis

Beobachter sind von Natur aus unzuverlässig, da ein Client, der einen Beobachter hostet, möglicherweise Fehler verursacht und nach der Wiederherstellung erstellte Beobachter unterschiedliche (zufällige) Identitäten aufweisen. ObserverManager<TObserver> erfordert das regelmäßige erneute Abonnieren durch Beobachter, wie oben erläutert, damit inaktive Beobachter entfernt werden können.

Ausführungsmodell

Implementierungen von IGrainObserver werden über einen Aufruf von IGrainFactory.CreateObjectReference registriert und jeder Aufruf dieser Methode erstellt einen neuen Verweis, der auf diese Implementierung zeigt. Orleans führt die Anforderungen, die an jeden dieser Verweise gesendet werden, nacheinander aus, bis sie abgeschlossen sind. Beobachter sind nicht eintrittsinvariant, und daher werden gleichzeitige Anforderungen an einen Beobachter nicht von Orleans überlagert. Wenn es mehrere Beobachter gibt, die gleichzeitig Anforderungen erhalten, können diese Anforderungen parallel ausgeführt werden. Die Ausführung von Beobachtermethoden wird durch Attribute wie AlwaysInterleaveAttribute oder ReentrantAttribute nicht beeinflusst: Das Ausführungsmodell kann von einem Entwickler nicht angepasst werden.