在 ASP.NET Core SignalR 中使用串流

作者:Brennan Conroy

ASP.NET Core SignalR 支援從用戶端串流到伺服器,以及從伺服器串流到用戶端。 這對於資料片段會隨時間推進而送達的案例非常有用。 進行串流時,無需等待所有資料變得可用,當各個片段在可用時即會送往用戶端或伺服器。

檢視或下載範例程式碼 \(英文\) (如何下載)

設定串流中樞

當中樞方法傳回 IAsyncEnumerable<T>ChannelReader<T>Task<IAsyncEnumerable<T>>Task<ChannelReader<T>>時,中樞方法即自動成為串流中樞方法。

伺服器對用戶端串流

除了 ChannelReader<T> 之外,串流中樞方法也可以傳回 IAsyncEnumerable<T>。 傳回 IAsyncEnumerable<T> 的最簡單方法是將中樞方法設定為非同步迭代器方法,如下例所示。 中樞非同步迭代器方法可以接受 CancellationToken 參數,並於用戶端取消訂閱資料流程時觸發。 非同步迭代器方法可避免通道常見的問題,例如,沒有儘早傳回 ChannelReader,或在未完成 ChannelWriter<T> 的情況下結束方法。

注意

下列範例需要 C# 8.0 或更新版本。

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

下列範例顯示使用通道將資料串流至用戶端的基本概念。 每當物件寫入 ChannelWriter<T> 時,就會將物件立即傳送至用戶端。 最後,ChannelWriter 已完成,並告知用戶端資料流程已關閉。

注意

寫入背景執行緒上的 ChannelWriter<T>,並儘快傳回 ChannelReader。 未傳回 ChannelReader 之前,會封鎖其他中樞叫用。

將邏輯包裝在 try ... catch 陳述式中。 在 finally 區塊中完成 Channel。 如果您想要傳送錯誤,請在 catch 區塊內擷取該錯誤並將其寫入 finally 區塊中。

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

伺服器對用戶端串流中樞方法可以接受 CancellationToken 參數,並於用戶端取消訂閱資料流時加以觸發。 如果用戶端在資料流結束之前中斷連線,請使用此權杖停止伺服器作業並釋放任何資源。

用戶端對伺服器串流

當中樞方法接受一或多個類型為 ChannelReader<T>IAsyncEnumerable<T> 的物件時,中樞方法即自動成為用戶端對伺服器串流中樞方法。 下列範例顯示讀取用戶端所傳送串流資料的基本概念。 每當用戶端寫入 ChannelWriter<T> 時,就會將資料寫入中樞方法所讀取之伺服器上的 ChannelReader

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

該方法的 IAsyncEnumerable<T> 版本如下。

注意

下列範例需要 C# 8.0 或更新版本。

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

.NET 用戶端

伺服器對用戶端串流

HubConnection 上的 StreamAsyncStreamAsChannelAsync 方法用於叫用伺服器對用戶端串流方法。 將中樞方法中定義的中樞方法名稱和引數傳遞至 StreamAsyncStreamAsChannelAsyncStreamAsync<T>StreamAsChannelAsync<T> 上的泛型參數會指定串流方法所傳回的物件類型。 從資料流叫用傳回類型為 IAsyncEnumerable<T>ChannelReader<T> 的物件,並用以代表用戶端上的資料流。

傳回 IAsyncEnumerable<int>StreamAsync 範例:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

傳回 ChannelReader<int> 的對應 StreamAsChannelAsync 範例:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

在前述程式碼中:

  • HubConnection 上的 StreamAsChannelAsync 方法用於叫用伺服器對用戶端串流方法。 將中樞方法中定義的中樞方法名稱和引數傳遞至 StreamAsChannelAsync
  • StreamAsChannelAsync<T> 上的泛型參數會指定串流方法所傳回的物件類型。
  • 從資料流叫用傳回 ChannelReader<T>,並用以代表用戶端上的資料流。

用戶端對伺服器串流

有兩種方法可從 .NET 用戶端叫用用戶端對伺服器串流中樞方法。 您可以根據叫用的中樞方法,將 IAsyncEnumerable<T>ChannelReader 當做引數傳入SendAsyncInvokeAsyncStreamAsChannelAsync

每當資料寫入 IAsyncEnumerableChannelWriter 物件時,伺服器上的中樞方法就會從用戶端接收具有該資料的新項目。

如果是使用 IAsyncEnumerable 物件,資料流會在傳回資料流項目的方法結束之後終止。

注意

下列範例需要 C# 8.0 或更新版本。

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

或者,如果您是使用 ChannelWriter,則可使用 channel.Writer.Complete() 完成通道:

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

JavaScript 用戶端

伺服器對用戶端串流

JavaScript 用戶端會使用 connection.stream呼叫中樞上的伺服器對用戶端串流方法。 stream 方法接受兩種引數:

  • 中樞方法的名稱。 在下列範例中,中樞方法名稱為 Counter
  • 在中樞方法中定義的引數。 在下列範例中,引數包括要接收的資料流項目數和資料流項目之間的延遲。

connection.stream 會傳回 IStreamResult,其中包含 subscribe 方法。 傳遞 IStreamSubscribersubscribe 並設定 nexterrorcomplete 回呼,以接收來自 stream 叫用的通知。

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

若要結束用戶端的資料流,請在 subscribe 方法傳回的 ISubscription 上呼叫 dispose 方法。 呼叫此方法會導致取消 Hub 方法的 CancellationToken 參數 (如已提供該參數)。

用戶端對伺服器串流

JavaScript 用戶端可以透過將 Subject 作為引數傳遞給 sendinvokestream (視叫用的中樞方法而定),在中樞上呼叫用戶端對伺服器串流方法。 Subject 是一個類似 Subject 的類別。 例如,在 RxJS中,您可以使用該程式庫的 Subject 類別。

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

使用項目來呼叫 subject.next(item) 會將項目寫入資料流,而中樞方法會接收伺服器上的項目。

若要結束資料流,請呼叫 subject.complete()

Java 用戶端

伺服器對用戶端串流

SignalR JAVA 用戶端會使用 stream 方法來叫用串流方法。 stream 接受三種 (含) 以上的引數:

  • 預期類型的資料流項目。
  • 中樞方法的名稱。
  • 在中樞方法中定義的引數。
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

HubConnection 上的 stream 方法會傳回 Observable 的資料流項目類型。 Observable 類型的 subscribe 方法是定義 onNextonErroronCompleted 等處理常式之處。

用戶端對伺服器串流

SignalR JAVA 用戶端可以透過將 Observable 作為引數傳遞給 sendinvokestream (視叫用的中樞方法而定),在中樞上呼叫用戶端對伺服器串流方法。

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

使用項目來呼叫 stream.onNext(item) 會將項目寫入資料流中,而中樞方法會接收伺服器上的項目。

若要結束資料流,請呼叫 stream.onComplete()

其他資源