Orleans 用戶端

用戶端可讓非粒紋的程式碼與 Orleans 叢集互動。 用戶端可讓應用程式程式碼與裝載在叢集中的粒紋和資料流通訊。 根據裝載用戶端程式代碼的位置而定,有兩種方式可以取得用戶端:在與定址接收器相同的程序中,或在個別的程序中。 本文將討論這兩個選項,從建議選項開始:將用戶端程式代碼共同裝載在與粒紋程式碼相同的程序中。

共同裝載的用戶端

如果用戶端程式代碼裝載於與粒紋程式碼相同的程序中,則可以直接從主控應用程式的相依性插入容器取得用戶端。 在此情況下,用戶端會直接與其附加的定址接收器通訊,並可以利用定址接收器對於叢集的額外知識。

這提供數個優點,包括減少網路和 CPU 額外負荷,以及減少延遲和增加輸送量及可靠性。 用戶端會利用定址接收器對叢集拓撲和狀態的知識,且不需要使用不同的閘道。 這可避免網路躍點和序列化/還原序列化來回行程。 因此,這也會增加可靠性,因為用戶端與粒紋之間的所需節點數目會最小化。 如果粒紋是無狀態背景工作角色粒紋,或是在裝載用戶端的定址接收器上啟用,則完全不需要執行序列化或網路通訊,且用戶端可以獲得額外的效能和可靠性提升。 共同裝載用戶端和粒紋程式碼也可藉由不需要部署和監視兩個不同的應用程式二進位檔來簡化部署和應用程式拓撲。

這個方法也有一個缺點,主要在於粒紋程式碼不再與用戶端程序隔離。 因此,用戶端程式代碼中的問題,例如封鎖 IO 或鎖定爭用,導致執行緒耗盡可能會影響粒紋程式碼的效能。 即使沒有類似上述的程式碼瑕疵,「擾鄰」效果也只會在用戶端程式碼與粒紋程式碼位於相同的處理器上時產生,對 CPU 快取造成額外的負擔,以及一般本機資源的額外爭用。 此外,識別這些問題的來源現在比較困難,因為監視系統無法在邏輯上區分用戶端程式代碼與粒紋程式碼。

儘管具有這些缺點,但搭配粒紋程式碼的共同裝載用戶端程式代碼是熱門的選項,且是大部分應用程式都建議使用的方法。 為了詳細說明,上述缺點在實務上是輕微的,原因如下:

  • 用戶端程式代碼通常非常「精簡」,例如,將傳入的 HTTP 要求轉譯為粒紋呼叫,因此「擾鄰」效果最少,且成本與其他必要閘道相當。
  • 如果發生效能問題,開發人員的一般工作流程牽涉到 CPU 分析工具和偵錯工具等工具,即使用戶端和粒紋程式碼都在相同程序中執行,仍能快速識別問題的來源。 換句話說,計量變得更粗略,且更無法精確地識別問題的來源,但更詳細的工具仍然有效。

從主機取得用戶端

如果使用 .NET 泛型主機裝載,用戶端將會自動在主機的相依性插入容器中提供使用,並可插入 ASP.NET 控制器IHostedService 實作等服務。

或者,可以從 ISiloHost 取得 IGrainFactoryIClusterClient 之類的用戶端介面:

var client = host.Services.GetService<IClusterClient>();
await client.GetGrain<IMyGrain>(0).Ping();

外部用戶端

用戶端程式碼可以在裝載粒紋程式碼的 Orleans 叢集外部執行。 因此,外部用戶端會作為連接器或連線至叢集和應用程式的所有粒紋。 通常,前端 Web 服務器上會使用用戶端來連線至 Orleans 叢集,以作為中介層,以及執行商務邏輯的粒紋。

在一般設定中,前端 Web 服務器:

  • 接收 Web 要求。
  • 執行必要的驗證和授權驗證。
  • 決定應該處理要求的粒紋。
  • 使用 Microsoft.Orleans.Client NuGet 套件,可對粒紋發出一或多個方法呼叫。
  • 處理成功完成或失敗的粒紋呼叫和任何傳回的值。
  • 傳送回應至 Web 要求。

粒紋用戶端的初始化

在粒紋用戶端可用來呼叫裝載在 Orleans 叢集中的粒紋之前,必須先設定、初始化並連線至叢集。

組態是透過 UseOrleansClient 和數個補充選項類別提供,其中包含組態屬性階層,以程式設計方式設定用戶端。 如需詳細資訊,請參閱用戶端組態

請考慮下列用戶端組態的範例:

// Alternatively, call Host.CreateDefaultBuilder(args) if using the 
// Microsoft.Extensions.Hosting NuGet package.
using IHost host = new HostBuilder()
    .UseOrleansClient(clientBuilder =>
    {
        clientBuilder.Configure<ClusterOptions>(options =>
        {
            options.ClusterId = "my-first-cluster";
            options.ServiceId = "MyOrleansService";
        });

        clientBuilder.UseAzureStorageClustering(
            options => options.ConfigureTableServiceClient(connectionString))
    })
    .Build();

啟動 host 時,用戶端將會透過其建構的服務提供者執行個體進行設定和提供使用。

組態是透過 ClientBuilder 和數個補充選項類別提供,其中包含組態屬性階層,以程式設計方式設定用戶端。 如需詳細資訊,請參閱用戶端組態

用戶端組態的範例:

var client = new ClientBuilder()
    .Configure<ClusterOptions>(options =>
    {
        options.ClusterId = "my-first-cluster";
        options.ServiceId = "MyOrleansService";
    })
    .UseAzureStorageClustering(
        options => options.ConnectionString = connectionString)
    .ConfigureApplicationParts(
        parts => parts.AddApplicationPart(typeof(IValueGrain).Assembly))
    .Build();

最後,您必須在建構的用戶端物件上呼叫 Connect() 方法,使其連線至 Orleans 叢集。 其為傳回 Task 的非同步方法。 因此,您必須使用 await.Wait() 等候其完成。

await client.Connect();

呼叫粒紋

從用戶端對粒紋進行呼叫與從粒紋程式碼發出此類呼叫並無不同。 針對相同的 IGrainFactory.GetGrain<TGrainInterface>(Type, Guid) 方法,其中 T 是目標粒紋介面,這兩種情況都會用來取得粒紋參考。 差異在於叫用 IGrainFactory.GetGrain Factory 物件的位置。 在用戶端程式碼中,您會透過連線的用戶端物件執行此動作,如下列範例所示:

IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
Task joinGameTask = player.JoinGame(game)

await joinGameTask;

對粒紋方法的呼叫會傳回 TaskTask<TResult>,如粒紋介面規則所需。 用戶端可以使用 await 關鍵字,以非同步方式等待傳回的 Task,而不封鎖執行緒,或在某些情況下,Wait() 方法會封鎖目前執行的執行緒。

從用戶端程式代碼和從另一個粒紋內對粒紋進行呼叫之間的主要差異是單一執行緒的粒紋執行模型。 粒紋受限於 Orleans 執行階段的單一執行緒,而用戶端可能是多執行緒。 Orleans 不會在用戶端上提供任何此類保證,因此由用戶端使用適合其環境的任何同步處理建構來管理其並行,也就是鎖定、事件和 Tasks

接收通知

在某些情況下,簡單的要求-回應模式是不夠的,且用戶端需要接收非同步通知。 例如,當使用者追蹤的對象發佈新訊息時,使用者可能會想要收到通知。

使用觀察者是一種此類機制,可讓用戶端物件公開為類似粒紋的目標,以便由粒紋叫用。 對觀察者呼叫不會提供成功或失敗的任何指示,因為呼叫會以單向最佳作法訊息傳送。 因此,應用程式程式碼負責在必要時在觀察者之上建置較高層級的可靠性機制。

另一種可用來將非同步訊息傳遞至用戶端的機制是資料流。 資料流會公開個別訊息傳遞成功或失敗的指示,因此能夠可靠通訊回用戶端。

用戶端連線

叢集用戶端可能會遇到連線問題的情節有兩種:

  • 當用戶端嘗試連線至定址接收器時。
  • 對從連線叢集用戶端取得的粒紋參考發出呼叫時。

在第一種情況下,用戶端會嘗試連線至定址接收器。 如果用戶端無法連線至任何定址接收器,則會擲回例外狀況來指出發生錯誤的位置。 您可以註冊 IClientConnectionRetryFilter 來處理例外狀況,並決定是否要重試。 如果未提供重試篩選,或重試篩選傳回 false,則用戶端就會徹底放棄。

using Orleans.Runtime;

internal sealed class ClientConnectRetryFilter : IClientConnectionRetryFilter
{
    private int _retryCount = 0;
    private const int MaxRetry = 5;
    private const int Delay = 1_500;

    public async Task<bool> ShouldRetryConnectionAttempt(
        Exception exception,
        CancellationToken cancellationToken)
    {
        if (_retryCount >= MaxRetry)
        {
            return false;
        }

        if (!cancellationToken.IsCancellationRequested &&
            exception is SiloUnavailableException siloUnavailableException)
        {
            await Task.Delay(++ _retryCount * Delay, cancellationToken);
            return true;
        }

        return false;
    }
}

叢集用戶端可能會遇到連線問題的情節有兩種:

  • 一開始呼叫 IClusterClient.Connect() 方法時。
  • 對從連線叢集用戶端取得的粒紋參考發出呼叫時。

在第一種情況下,Connect 方法會擲回例外狀況,以指出發生錯誤的位置。 這通常是 (但不一定) SiloUnavailableException。 如果發生這種情況,叢集用戶端執行個體將無法使用,且應該處置。 您可以選擇性地將重試篩選函式提供給 Connect 方法,例如,在進行另一次嘗試之前,先等待指定的持續時間。 如果未提供重試篩選,或重試篩選傳回 false,則用戶端就會徹底放棄。

如果 Connect 成功傳回,則叢集用戶端會保證可供使用,直到處置叢集用戶端為止。 這表示即使用戶端遇到連線問題,仍會無限期嘗試復原。 確切的復原行為可以在 ClientBuilder 所提供的 GatewayOptions 物件上設定,例如:

var client = new ClientBuilder()
    // ...
    .Configure<GatewayOptions>(
        options =>                         // Default is 1 min.
        options.GatewayListRefreshPeriod = TimeSpan.FromMinutes(10))
    .Build();

在第二種情況下,在粒紋呼叫期間發生連線問題時,SiloUnavailableException 會在用戶端擲回。 這可以像這樣處理:

IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);

try
{
    await player.JoinGame(game);
}
catch (SiloUnavailableException)
{
    // Lost connection to the cluster...
}

在此情況下,粒紋參考不會失效;在稍後重新建立連線時,可以在相同的參考上重試呼叫。

相依性插入

在使用 .NET 泛型主機的程式中建立外部用戶端的建議方式是透過相依性插入來插入 IClusterClient 單一執行個體,然後可以接受為託管服務、ASP.NET 控制器之類的建構函式參數。

注意

在將與其連線的相同程序中共同裝載 Orleans 定址接收器時,「不」需要手動建立用戶端;Orleans 將會自動提供一個用戶端並適當地管理其存留期。

在不同的程序中連線至叢集 (在不同的電腦上) 時,常見的模式是建立如下所示的託管服務:

using Microsoft.Extensions.Hosting;

namespace Client;

public sealed class ClusterClientHostedService : IHostedService
{
    private readonly IClusterClient _client;

    public ClusterClientHostedService(IClusterClient client)
    {
        _client = client;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        // Use the _client to consume grains...

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
        => Task.CompletedTask;
}
public class ClusterClientHostedService : IHostedService
{
    private readonly IClusterClient _client;

    public ClusterClientHostedService(IClusterClient client)
    {
        _client = client;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        // A retry filter could be provided here.
        await _client.Connect();
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _client.Close();

        _client.Dispose();
    }
}

服務接著會註冊如下:

await Host.CreateDefaultBuilder(args)
    .UseOrleansClient(builder =>
    {
        builder.UseLocalhostClustering();
    })
    .ConfigureServices(services => 
    {
        services.AddHostedService<ClusterClientHostedService>();
    })
    .RunConsoleAsync();

範例

以下是上述用戶端應用程式所提供範例的擴充版本,其會連線至 Orleans、尋找玩家帳戶、訂閱玩家所屬遊戲工作階段的更新及觀察者,以及列印出通知,直到手動終止程式為止。

try
{
    using IHost host = Host.CreateDefaultBuilder(args)
        .UseOrleansClient((context, client) =>
        {
            client.Configure<ClusterOptions>(options =>
            {
                options.ClusterId = "my-first-cluster";
                options.ServiceId = "MyOrleansService";
            })
            .UseAzureStorageClustering(
                options => options.ConfigureTableServiceClient(
                    context.Configuration["ORLEANS_AZURE_STORAGE_CONNECTION_STRING"]));
        })
        .UseConsoleLifetime()
        .Build();

    await host.StartAsync();

    IGrainFactory client = host.Services.GetRequiredService<IGrainFactory>();

    // Hardcoded player ID
    Guid playerId = new("{2349992C-860A-4EDA-9590-000000000006}");
    IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
    IGameGrain? game = null;
    while (game is null)
    {
        Console.WriteLine(
            $"Getting current game for player {playerId}...");

        try
        {
            game = await player.GetCurrentGame();
            if (game is null) // Wait until the player joins a game
            {
                await Task.Delay(TimeSpan.FromMilliseconds(5_000));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Exception: {ex.GetBaseException()}");
        }
    }

    Console.WriteLine(
        $"Subscribing to updates for game {game.GetPrimaryKey()}...");

    // Subscribe for updates
    var watcher = new GameObserver();
    await game.ObserveGameUpdates(
        client.CreateObjectReference<IGameObserver>(watcher));

    Console.WriteLine(
        "Subscribed successfully. Press <Enter> to stop.");
}
catch (Exception e)
{
    Console.WriteLine(
        $"Unexpected Error: {e.GetBaseException()}");
}
await RunWatcherAsync();

// Block the main thread so that the process doesn't exit.
// Updates arrive on thread pool threads.
Console.ReadLine();

static async Task RunWatcherAsync()
{
    try
    {
        var client = new ClientBuilder()
            .Configure<ClusterOptions>(options =>
            {
                options.ClusterId = "my-first-cluster";
                options.ServiceId = "MyOrleansService";
            })
            .UseAzureStorageClustering(
                options => options.ConnectionString = connectionString)
            .ConfigureApplicationParts(
                parts => parts.AddApplicationPart(typeof(IValueGrain).Assembly))
            .Build();

            // Hardcoded player ID
            Guid playerId = new("{2349992C-860A-4EDA-9590-000000000006}");
            IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
            IGameGrain game = null;
            while (game is null)
            {
                Console.WriteLine(
                    $"Getting current game for player {playerId}...");

                try
                {
                    game = await player.GetCurrentGame();
                    if (game is null) // Wait until the player joins a game
                    {
                        await Task.Delay(TimeSpan.FromMilliseconds(5_000));
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Exception: {ex.GetBaseException()}");
                }
            }

            Console.WriteLine(
                $"Subscribing to updates for game {game.GetPrimaryKey()}...");

            // Subscribe for updates
            var watcher = new GameObserver();
            await game.SubscribeForGameUpdates(
                await client.CreateObjectReference<IGameObserver>(watcher));

            Console.WriteLine(
                "Subscribed successfully. Press <Enter> to stop.");
        }
        catch (Exception e)
        {
            Console.WriteLine(
                $"Unexpected Error: {e.GetBaseException()}");
        }
    }
}

/// <summary>
/// Observer class that implements the observer interface.
/// Need to pass a grain reference to an instance of
/// this class to subscribe for updates.
/// </summary>
class GameObserver : IGameObserver
{
    public void UpdateGameScore(string score)
    {
        Console.WriteLine("New game score: {0}", score);
    }
}