Verwenden von Streaming in ASP.NET CoreSignalR

Von Zapan Conzön

SignalRASP.NET Core unterstützt das Streaming von Client zu Server und von Server zu Client. Dies ist nützlich für Szenarien, in denen Fragmente von Daten im Laufe der Zeit eingehen. Beim Streaming wird jedes Fragment an den Client oder Server gesendet, sobald es verfügbar ist, anstatt darauf zu warten, dass alle Daten verfügbar sind.

SignalRASP.NET Core unterstützt Streamingrückgabewerte von Servermethoden. Dies ist nützlich für Szenarien, in denen Fragmente von Daten im Laufe der Zeit eingehen. Wenn ein Rückgabewert an den Client gestreamt wird, wird jedes Fragment an den Client gesendet, sobald es verfügbar ist, anstatt darauf zu warten, dass alle Daten verfügbar sind.

Anzeigen oder Herunterladen von Beispielcode (Vorgehensweise zum Herunterladen)

Einrichten eines Hubs für das Streaming

Eine Hubmethode wird automatisch zu einer Streaminghubmethode, wenn sie IAsyncEnumerable<T> , , oder ChannelReader<T> zurückgibt. Task<IAsyncEnumerable<T>> Task<ChannelReader<T>>

Eine Hubmethode wird automatisch zu einer Streaminghubmethode, wenn sie oder ChannelReader<T> Task<ChannelReader<T>> zurückgibt.

Server-zu-Client-Streaming

Streaminghubmethoden können IAsyncEnumerable<T> zusätzlich zu ChannelReader<T> zurückgeben. Die einfachste Möglichkeit zur Rückgabe besteht darin, IAsyncEnumerable<T> die Hubmethode zu einer asynchronen Iteratormethode zu machen, wie im folgenden Beispiel veranschaulicht. Asynchrone Hub-Iteratormethoden können einen CancellationToken Parameter akzeptieren, der ausgelöst wird, wenn der Client das Abonnement des Streams abbestellt. Asynchrone Iteratormethoden vermeiden probleme, die bei Kanälen auftreten, z. B. das nicht frühzeitig genug zurückgeben ChannelReader oder die Methode beenden, ohne die ChannelWriter<T> abzuschließen.

Hinweis

Für das folgende Beispiel ist C# 8.0 oder höher erforderlich.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Das folgende Beispiel zeigt die Grundlagen des Streamings von Daten an den Client mithilfe von Kanälen. Jedes Mal, wenn ein Objekt in das geschrieben ChannelWriter<T> wird, wird das Objekt sofort an den Client gesendet. Am Ende ChannelWriter wird abgeschlossen, um dem Client mitzuteilen, dass der Stream geschlossen ist.

Hinweis

Schreiben Sie in den in ChannelWriter<T> einem Hintergrundthread, und geben Sie ChannelReader so bald wie möglich zurück. Andere Hubaufrufe werden blockiert, bis ein ChannelReader zurückgegeben wird.

Umschließen Sie die Logik in einer try ... catch -Anweisung. Schließen Sie Channel in einem finally -Blockab. Wenn Sie einen Fehler fließen möchten, erfassen Sie ihn im catch -Block, und schreiben Sie ihn in den finally -Block.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        Exception localException = null;
        try
        {
            for (var i = 0; i < count; i++)
            {
                // Check the cancellation token regularly so that the server will stop
                // producing items if the client disconnects.
                cancellationToken.ThrowIfCancellationRequested();
                await writer.WriteAsync(i);

                // Use the cancellationToken in other APIs that accept cancellation
                // tokens so the cancellation can flow down to them.
                await Task.Delay(delay, cancellationToken);
            }
        }
        catch (Exception ex)
        {
            localException = ex;
        }
        finally
        {
            writer.Complete(localException);
        }
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(int count, int delay)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay)
    {
        Exception localException = null;
        try
        {
            for (var i = 0; i < count; i++)
            {
                await writer.WriteAsync(i);
                await Task.Delay(delay);
            }
        }
        catch (Exception ex)
        {
            localException = ex;
        }
        finally
        {
            writer.Complete(localException);
        }

    }
}

Server-zu-Client-Streaminghubmethoden können einen CancellationToken Parameter akzeptieren, der ausgelöst wird, wenn der Client das Abonnement des Streams abbestellt. Verwenden Sie dieses Token, um den Servervorgang zu beenden und alle Ressourcen freizugeben, wenn der Client die Verbindung vor dem Ende des Streams trennt.

Client-zu-Server-Streaming

Eine Hubmethode wird automatisch zu einer Client-zu-Server-Streaminghubmethode, wenn sie mindestens ein Objekt vom Typ ChannelReader<T> oder IAsyncEnumerable<T> akzeptiert. Das folgende Beispiel zeigt die Grundlagen des Lesens von Streamingdaten, die vom Client gesendet werden. Wenn der Client in ChannelWriter<T> schreibt, werden die Daten in den ChannelReader auf dem Server geschrieben, von dem die Hubmethode liest.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

IAsyncEnumerable<T>Es folgt eine Version der -Methode.

Hinweis

Für das folgende Beispiel ist C# 8.0 oder höher erforderlich.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

.NET-Client

Server-zu-Client-Streaming

Die StreamAsync Methoden und für werden StreamAsChannelAsync HubConnection verwendet, um Server-zu-Client-Streamingmethoden aufzurufen. Übergeben Sie den Namen und die Argumente der Hubmethode, die in der Hubmethode definiert sind, an StreamAsync oder StreamAsChannelAsync . Der generische Parameter für StreamAsync<T> und gibt den Typ der von der StreamAsChannelAsync<T> Streamingmethode zurückgegebenen Objekte an. Ein Objekt vom Typ IAsyncEnumerable<T> oder ChannelReader<T> wird vom Streamaufruf zurückgegeben und stellt den Stream auf dem Client dar.

Ein StreamAsync Beispiel, das IAsyncEnumerable<int> zurückgibt:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Ein entsprechendes StreamAsChannelAsync Beispiel, das ChannelReader<int> zurückgibt:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Die StreamAsChannelAsync -Methode für HubConnection wird verwendet, um eine Server-zu-Client-Streamingmethode aufzurufen. Übergeben Sie den Namen und die Argumente der Hubmethode, die in der Hubmethode definiert sind, an StreamAsChannelAsync . Der generische Parameter für StreamAsChannelAsync<T> gibt den Typ der von der Streamingmethode zurückgegebenen Objekte an. Ein ChannelReader<T> wird vom Streamaufruf zurückgegeben und stellt den Stream auf dem Client dar.

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Die StreamAsChannelAsync -Methode für HubConnection wird verwendet, um eine Server-zu-Client-Streamingmethode aufzurufen. Übergeben Sie den Namen und die Argumente der Hubmethode, die in der Hubmethode definiert sind, an StreamAsChannelAsync . Der generische Parameter für StreamAsChannelAsync<T> gibt den Typ der von der Streamingmethode zurückgegebenen Objekte an. Ein ChannelReader<T> wird vom Streamaufruf zurückgegeben und stellt den Stream auf dem Client dar.

var channel = await hubConnection
    .StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Client-zu-Server-Streaming

Es gibt zwei Möglichkeiten, eine Client-zu-Server-Streaminghubmethode vom .NET-Client aufzurufen. Sie können je IAsyncEnumerable<T> nach ChannelReader SendAsync InvokeAsync aufgerufener Hubmethode entweder oder als Argument an , oder StreamAsChannelAsync übergeben.

Wenn Daten in das -Objekt oder das -Objekt geschrieben IAsyncEnumerable ChannelWriter werden, empfängt die Hub-Methode auf dem Server ein neues Element mit den Daten vom Client.

Wenn Sie ein IAsyncEnumerable -Objekt verwenden, endet der Stream, nachdem die Methode, die Streamelemente zurückgibt, beendet wird.

Hinweis

Für das folgende Beispiel ist C# 8.0 oder höher erforderlich.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Wenn Sie ein ChannelWriter verwenden, schließen Sie den Kanal mit channel.Writer.Complete() ab:

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

JavaScript-Client

Server-zu-Client-Streaming

JavaScript-Clients rufen Server-zu-Client-Streamingmethoden auf Hubs mit connection.stream auf. Die stream -Methode akzeptiert zwei Argumente:

  • Der Name der Hubmethode. Im folgenden Beispiel lautet der Name der Hubmethode Counter .
  • In der Hubmethode definierte Argumente. Im folgenden Beispiel sind die Argumente eine Anzahl für die Anzahl der zu empfangenen Streamelemente und die Verzögerung zwischen Streamelementen.

connection.stream gibt eine IStreamResult zurück, die eine subscribe -Methode enthält. Übergeben Sie eine IStreamSubscriber an , und legen Sie die subscribe next error Rückrufe , und complete fest, um Benachrichtigungen vom Aufruf zu stream empfangen.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Um den Stream vom Client zu beenden, rufen Sie die dispose -Methode für den ISubscription auf, der von der -Methode zurückgegeben subscribe wird. Das Aufrufen dieser Methode führt zum Abbruch des CancellationToken Parameters der Hub-Methode, sofern Sie einen angegeben haben.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Um den Stream vom Client zu beenden, rufen Sie die dispose -Methode für den ISubscription auf, der von der -Methode zurückgegeben subscribe wird.

Client-zu-Server-Streaming

JavaScript-Clients rufen Client-zu-Server-Streamingmethoden auf Hubs auf, indem sie Subject je send nach invoke aufgerufener Hubmethode als Argument an , oder stream übergeben. ist Subject eine Klasse, die wie ein aussieht. Subject In RxJS können Sie beispielsweise die Subject-Klasse aus dieser Bibliothek verwenden.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Beim Aufrufen subject.next(item) von mit einem Element wird das Element in den Stream geschrieben, und die Hubmethode empfängt das Element auf dem Server.

Rufen Sie auf, um den Stream zu subject.complete() beenden.

Java-Client

Server-zu-Client-Streaming

Der SignalR Java-Client verwendet die stream -Methode, um Streamingmethoden aufzurufen. stream akzeptiert drei oder mehr Argumente:

  • Der erwartete Typ der Streamelemente.
  • Der Name der Hubmethode.
  • In der Hubmethode definierte Argumente.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Die stream -Methode für HubConnection gibt einen Observable des Streamelementtyps zurück. Die Methode des Observable-Typs subscribe ist der Ort, an dem onNext die Handler , und definiert onError onCompleted werden.

Client-zu-Server-Streaming

Der SignalR Java-Client kann Client-zu-Server-Streamingmethoden auf Hubs aufrufen, indem er je nach aufgerufener Hubmethode ein Observable als Argument an , oder übergibt. send invoke stream

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Beim Aufrufen stream.onNext(item) von mit einem Element wird das Element in den Stream geschrieben, und die Hubmethode empfängt das Element auf dem Server.

Rufen Sie auf, um den Stream zu stream.onComplete() beenden.

Zusätzliche Ressourcen