Verwenden des Streamings in ASP.NET Core SignalR

Von Brennan Conroy

ASP.NET Core SignalR unterstützt das Streaming vom Client zum Server und vom Server zum Client. Dies ist nützlich für Szenarien, in denen Datenfragmente im Laufe der Zeit eintreffen. Beim Streaming wird jedes Fragment an den Client oder Server gesendet, sobald es verfügbar ist, anstatt darauf zu warten, dass die gesamten Daten verfügbar sind.

Anzeigen oder Herunterladen von Beispielcode (Vorgehensweise zum Herunterladen)

Einrichten eines Hubs für das Streaming

Eine Hubmethode wird automatisch zu einer Hubmethode für das Streaming, wenn sie IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>> oder Task<ChannelReader<T>> zurückgibt.

Streaming vom Server zum Client

Hubmethoden für das Streaming können zusätzlich zu ChannelReader<T> auch IAsyncEnumerable<T> zurückgeben. Die einfachste Möglichkeit, IAsyncEnumerable<T> zurückzugeben, besteht darin, die Hubmethode zu einer asynchronen Iteratormethode zu machen, wie das folgende Beispiel zeigt. Die asynchronen Iteratormethoden des Hubs können einen CancellationToken-Parameter akzeptieren, der ausgelöst wird, wenn der Client das Abonnement des Datenstroms kündigt. Asynchrone Iteratormethoden vermeiden Probleme, die bei Kanälen häufig auftreten, z. B. dass der ChannelReader nicht früh genug zurückgegeben oder die Methode verlassen wird, ohne den ChannelWriter<T> abzuschließen.

Hinweis

Das folgende Beispiel erfordert C# 8.0 oder höher.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Das folgende Beispiel zeigt die Grundlagen des Streaming von Daten zum Client mithilfe von Kanälen. Immer wenn ein Objekt in den ChannelWriter<T> geschrieben wird, wird das Objekt sofort an den Client gesendet. Am Ende wird der ChannelWriter abgeschlossen, um dem Client mitzuteilen, dass der Datenstrom geschlossen ist.

Hinweis

Schreiben Sie in einem Hintergrundthread in den ChannelWriter<T>, und geben Sie den ChannelReader so schnell wie möglich zurück. Andere Hubaufrufe werden blockiert, bis ein ChannelReader zurückgegeben wird.

Umschließen Sie die Logik mit einer try ... catch Anweisung. Vervollständigen Sie den Channel in einem finally-Block. Wenn Sie den Ablauf eines Fehlers verfolgen möchten, erfassen Sie ihn innerhalb des catch-Blocks und schreiben ihn in den finally-Block.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Hubmethoden für das Streaming vom Server zum Client können einen CancellationToken-Parameter akzeptieren, der ausgelöst wird, wenn der Client das Abonnement des Datenstroms kündigt. Verwenden Sie dieses Token, um den Serverbetrieb anzuhalten und alle Ressourcen freizugeben, wenn der Client die Verbindung vor dem Ende des Datenstroms trennt.

Streaming vom Client zum Server

Eine Hubmethode wird automatisch zu einer Hubmethode für das Streaming vom Client zum Server, wenn sie ein oder mehrere Objekte des Typs ChannelReader<T> oder IAsyncEnumerable<T> akzeptiert. Das folgende Beispiel zeigt die Grundlagen des Lesens von Streamingdaten, die vom Client gesendet werden. Immer wenn der Client in den ChannelWriter<T> schreibt, werden die Daten in den ChannelReader auf dem Server geschrieben, von dem die Hubmethode gerade liest.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Eine IAsyncEnumerable<T>-Version der Methode folgt.

Hinweis

Das folgende Beispiel erfordert C# 8.0 oder höher.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

.NET-Client

Streaming vom Server zum Client

Die Methoden StreamAsync und StreamAsChannelAsync für HubConnection werden verwendet, um Methoden für das Streaming vom Server zum Client aufzurufen. Übergeben Sie den Namen der Hubmethode und die in der Hubmethode definierten Argumente an StreamAsync oder StreamAsChannelAsync. Der generische Parameter für StreamAsync<T> und StreamAsChannelAsync<T> gibt den Typ der Objekte an, die von der Streamingmethode zurückgegeben werden. Ein Objekt vom Typ IAsyncEnumerable<T> oder ChannelReader<T> wird vom Datenstromaufruf zurückgegeben und repräsentiert den Datenstrom auf dem Client.

Ein StreamAsync-Beispiel, das IAsyncEnumerable<int> zurückgibt:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Ein entsprechendes StreamAsChannelAsync-Beispiel, das ChannelReader<int> zurückgibt:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Im vorherigen Code:

  • Die Methode StreamAsChannelAsync für HubConnection wird verwendet, um eine Streamingmethode vom Server zum Client aufzurufen. Übergeben Sie den Namen der Hubmethode und die in der Hubmethode definierten Argumente an StreamAsChannelAsync.
  • Der generische Parameter für StreamAsChannelAsync<T> gibt den Typ der Objekte an, die von der Streamingmethode zurückgegeben werden.
  • Ein ChannelReader<T> wird vom Datenstromaufruf zurückgegeben und repräsentiert den Datenstrom auf dem Client.

Streaming vom Client zum Server

Es gibt zwei Möglichkeiten, eine Hubmethode für das Streaming vom Client zum Server vom .NET-Client aus aufzurufen. Sie können entweder ein IAsyncEnumerable<T> oder ein ChannelReader als Argument an SendAsync, InvokeAsync oder StreamAsChannelAsync übergeben, je nachdem, welche Hubmethode aufgerufen wird.

Immer wenn Daten in das IAsyncEnumerable- oder ChannelWriter-Objekt geschrieben werden, erhält die Hubmethode auf dem Server ein neues Element mit den Daten vom Client.

Wenn Sie ein IAsyncEnumerable-Objekt verwenden, endet der Datenstrom nach Beendigung der Methode, die Datenstromelemente zurückgibt.

Hinweis

Das folgende Beispiel erfordert C# 8.0 oder höher.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Oder wenn Sie einen ChannelWriter verwenden, vervollständigen Sie den Kanal mit channel.Writer.Complete():

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

JavaScript-Client

Streaming vom Server zum Client

JavaScript-Clients rufen Methoden zum Streaming vom Server zum Client auf Hubs mit connection.stream auf. Die Methode stream akzeptiert zwei Argumente:

  • Den Namen der Hubmethode. Im folgenden Beispiel lautet der Name der Hubmethode Counter.
  • In der Hubmethode definierte Argumente. Im folgenden Beispiel sind die Argumente ein Zähler für die Anzahl der zu empfangenden Datenstromelemente und die Verzögerung zwischen den Datenstromelementen.

connection.stream gibt ein IStreamResult zurück, das eine subscribe-Methode enthält. Übergeben Sie einen IStreamSubscriber an subscribe und legen Sie die next-, error- und complete-Rückrufe fest, um Benachrichtigungen vom stream-Aufruf zu erhalten.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Um den Datenstrom vom Client zu beenden, rufen Sie die Methode dispose für das ISubscription auf, das von der Methode subscribe zurückgegeben wird. Der Aufruf dieser Methode führt zum Abbruch des Parameters CancellationToken der Hubmethode, falls Sie einen solchen angegeben haben.

Streaming vom Client zum Server

JavaScript-Clients rufen Methoden zum Streaming vom Client zum Server auf Hubs auf, indem sie ein Subject als Argument an send, invoke oder stream übergeben, je nachdem, welche Hubmethode aufgerufen wird. Das Subject ist eine Klasse, die wie ein Subject aussieht. In RxJS können Sie z. B. die Subject-Klasse aus dieser Bibliothek verwenden.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Der Aufruf von subject.next(item) mit einem Element schreibt das Element in den Datenstrom, und die Hubmethode empfängt das Element auf dem Server.

Um den Datenstrom zu beenden, rufen Sie subject.complete() auf.

Java-Client

Streaming vom Server zum Client

Der SignalR Java-Client verwendet die stream-Methode zum Aufrufen von Streamingmethoden. stream akzeptiert drei oder mehr Argumente:

  • Der erwartete Typ der Datenstromelemente.
  • Den Namen der Hubmethode.
  • In der Hubmethode definierte Argumente.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Die Methode stream für HubConnection gibt ein „Observable“ des Datenstromelementtyps zurück. In der Methode subscribe des Observable-Typs werden die Handler onNext, onError und onCompleted definiert.

Streaming vom Client zum Server

Der SignalR Java-Client kann Methoden für das Streaming vom Client zum Server auf Hubs aufrufen, indem er ein Observable als Argument an send, invoke oder stream übergibt, je nachdem, welche Hubmethode aufgerufen wird.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Der Aufruf von stream.onNext(item) mit einem Element schreibt das Element in den Datenstrom, und die Hubmethode empfängt das Element auf dem Server.

Um den Datenstrom zu beenden, rufen Sie stream.onComplete() auf.

Zusätzliche Ressourcen