ASP.NET Core SignalR でストリーミングを使う

作成者: Brennan Conroy

ASP.NET Core SignalR は、クライアントからサーバーへ、およびサーバーからクライアントへのストリーミングをサポートしています。 これは、時間の経過と共にデータのフラグメントを受信するシナリオに便利です。 ストリーミング時には、すべてのデータが使用可能になるまで待つのではなく、各フラグメントが使用可能になるとすぐにクライアントまたはサーバーに送信されます。

サンプル コードを表示またはダウンロードします (ダウンロード方法)。

ストリーミング用のハブを設定する

IAsyncEnumerable<T>ChannelReader<T>Task<IAsyncEnumerable<T>>、または Task<ChannelReader<T>> が返されると、ハブ メソッドは自動的にストリーミング ハブ メソッドになります。

サーバーからクライアントへのストリーミング

ストリーミング ハブ メソッドからは、ChannelReader<T> に加えて IAsyncEnumerable<T> を返すことができます。 IAsyncEnumerable<T> を返す最もシンプルな方法は、次のサンプルに示すように、ハブ メソッドを非同期反復子メソッドにすることです。 ハブ非同期反復子メソッドは、クライアントがストリームからサブスクライブを解除したときにトリガーされる CancellationToken パラメーターを受け取ることができます。 非同期反復子メソッドを使うと、ChannelReader が適切な速さで返されない、ChannelWriter<T> を完了せずにメソッドが終了するなど、チャネルでよく発生する問題を回避できます。

Note

次の例では、C# 8.0 以降が必要です。

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

次のサンプルは、チャネルを使ってクライアントにデータをストリーミングする基本的な方法を示しています。 ChannelWriter<T> にオブジェクトが書き込まれるたびに、そのオブジェクトは直ちにクライアントに送信されます。 最後に ChannelWriter が完了し、ストリームが閉じられたことがクライアントに通知されます。

Note

バックグラウンド スレッドで ChannelWriter<T> に書き込み、できるだけ早く ChannelReader を返します。 ChannelReader が返されるまで、他のハブの呼び出しはブロックされます。

ロジックを try ... catch ステートメントでラップします。 finally ブロックChannel を完了します。 エラーをフローしたい場合は、catch ブロック内でキャプチャし、finally ブロック内で書き込みます。

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

サーバーからクライアントへのストリーミング ハブ メソッドは、クライアントがストリームからサブスクライブを解除したときにトリガーされる CancellationToken パラメーターを受け取ることができます。 クライアントがストリームの終了前に切断した場合は、このトークンを使ってサーバーの操作を停止し、リソースを解放します。

クライアントからサーバーへのストリーミング

ChannelReader<T> または IAsyncEnumerable<T> の 1 つ以上のオブジェクトを受け取ると、ハブ メソッドは自動的にクライアントからサーバーへのストリーミング ハブ メソッドになります。 次のサンプルは、クライアントから送信されたストリーミング データを読み取る基本を示しています。 クライアントから ChannelWriter<T> に書き込まれるたびに、ハブ メソッドから読み取られるサーバーの ChannelReader にデータが書き込まれます。

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

IAsyncEnumerable<T> バージョンのメソッドが続きます。

Note

次の例では、C# 8.0 以降が必要です。

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

.NET クライアント

サーバーからクライアントへのストリーミング

HubConnectionStreamAsyncStreamAsChannelAsync のメソッドは、サーバーからクライアントへのストリーミング メソッドを呼び出すために使われます。 ハブ メソッド名と、ハブ メソッドに定義されている引数を StreamAsync または StreamAsChannelAsync に渡します。 StreamAsync<T>StreamAsChannelAsync<T> のジェネリック パラメーターには、ストリーミング メソッドから返されるオブジェクトの型を指定します。 型 IAsyncEnumerable<T> または ChannelReader<T> のオブジェクトは、ストリームの呼び出しから返され、クライアント上のストリームを表します。

IAsyncEnumerable<int> を返す StreamAsync の例:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

ChannelReader<int> を返す、対応する StreamAsChannelAsync の例:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

上のコードでは、次のようになります。

  • HubConnection に対する StreamAsChannelAsync メソッドは、サーバーからクライアントへのストリーミング メソッドを呼び出すために使われます。 ハブ メソッド名と、ハブ メソッドに定義されている引数を StreamAsChannelAsync に渡します。
  • StreamAsChannelAsync<T> のジェネリック パラメーターには、ストリーミング メソッドから返されるオブジェクトの型を指定します。
  • ChannelReader<T> は、ストリームの呼び出しから返され、クライアント上のストリームを表します。

クライアントからサーバーへのストリーミング

クライアントからサーバーへのストリーミング ハブ メソッドを .NET クライアントから呼び出す方法は 2 つあります。 呼び出されたハブ メソッドに応じて、SendAsyncInvokeAsyncStreamAsChannelAsync の引数として IAsyncEnumerable<T> または ChannelReader を渡すことができます。

IAsyncEnumerable または ChannelWriter オブジェクトにデータが書き込まれるたびに、サーバー上のハブ メソッドは、クライアントからのデータを含む新しい項目を受け取ります。

IAsyncEnumerable オブジェクトを使っている場合、ストリーム項目を返すメソッドが終了すると、ストリームは終了します。

Note

次の例では、C# 8.0 以降が必要です。

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

また、ChannelWriter を使っている場合は、channel.Writer.Complete() を使ってチャネルを完了します。

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

JavaScript クライアント

サーバーからクライアントへのストリーミング

JavaScript クライアントの場合、connection.stream を使ってハブ上でサーバーからクライアントへのストリーミング メソッドを呼び出します。 stream メソッドは、2 つの引数を受け取ります。

  • ハブ メソッドの名前。 次の例では、ハブ メソッドの名前は Counter です。
  • ハブ メソッドで定義されている引数。 次の例の引数は、受信するストリーム項目の数とストリーム項目間の延期期間のカウントです。

connection.stream から IStreamResult が返されます。これには subscribe メソッドが含まれています。 subscribeIStreamSubscriber を渡し、stream の呼び出しからの通知を受け取る nexterrorcomplete のコールバックを設定します。

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

クライアントからストリームを終了するには、subscribe メソッドから返された ISubscription に対して dispose メソッドを呼び出します。 このメソッドを呼び出すと、ハブ メソッドの CancellationToken パラメーターを指定していた場合にそれが取り消されます。

クライアントからサーバーへのストリーミング

JavaScript クライアントからは、呼び出されたハブ メソッドに応じて sendinvoke、または stream の引数として Subject を渡すことで、ハブに対するクライアントからサーバーへのストリーミング メソッドが呼び出されます。 Subject は、Subject のようなクラスです。 たとえば、RxJS では、そのライブラリの Subject クラスを使うことができます。

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

項目を指定して subject.next(item) を呼び出すと、ストリームに項目が書き込まれ、ハブ メソッドはサーバー上の項目を受け取ります。

ストリームを終了するには、subject.complete() を呼び出します。

Java クライアント

サーバーからクライアントへのストリーミング

SignalR Java クライアントには、ストリーミング メソッドを呼び出すために stream メソッドが使われます。 stream は 3 つ以上の引数を受け取ります。

  • ストリーム項目の想定される型。
  • ハブ メソッドの名前。
  • ハブ メソッドで定義されている引数。
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

HubConnection に対する stream メソッドからは、ストリーム項目型の Observable が返されます。 Observable 型の subscribe メソッドには、onNextonErroronCompleted のハンドラーが定義されています。

クライアントからサーバーへのストリーミング

SignalR Java クライアントからは、呼び出されたハブ メソッドに応じて sendinvoke、または stream の引数として Observable を渡すことで、ハブに対するクライアントからサーバーへのストリーミング メソッドが呼び出されます。

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

項目を指定して stream.onNext(item) を呼び出すと、ストリームに項目が書き込まれ、ハブ メソッドはサーバー上の項目を受け取ります。

ストリームを終了するには、stream.onComplete() を呼び出します。

その他のリソース