Usare lo streaming in ASP.NET Core SignalR

Di Brennan Conroy

ASP.NET Core SignalR supporta lo streaming dal client al server e dal server al client. Ciò è utile per gli scenari in cui arrivano frammenti di dati nel tempo. Quando si esegue lo streaming, ogni frammento viene inviato al client o al server non appena diventa disponibile, anziché attendere che tutti i dati diventino disponibili.

Visualizzare o scaricare il codice di esempio (procedura per il download)

Configurare un hub per lo streaming

Un metodo hub diventa automaticamente un metodo hub di streaming quando restituisce IAsyncEnumerable<T>, Task<IAsyncEnumerable<T>>ChannelReader<T>, o Task<ChannelReader<T>>.

Streaming da server a client

I metodi dell'hub di streaming possono restituire IAsyncEnumerable<T> oltre a ChannelReader<T>. Il modo più semplice per restituire IAsyncEnumerable<T> consiste nel rendere il metodo hub un metodo iteratore asincrono come illustrato nell'esempio seguente. I metodi iteratore asincrono dell'hub possono accettare un CancellationToken parametro attivato quando il client annulla la sottoscrizione al flusso. I metodi iteratori asincroni evitano problemi comuni con i canali, ad esempio non restituendo il ChannelReader metodo abbastanza presto o chiudendo il metodo senza completare .ChannelWriter<T>

Nota

L'esempio seguente richiede C# 8.0 o versione successiva.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

L'esempio seguente illustra le nozioni di base dei dati di streaming al client usando i canali. Ogni volta che un oggetto viene scritto in ChannelWriter<T>, l'oggetto viene inviato immediatamente al client. Al termine, l'oggetto ChannelWriter viene completato per indicare al client che il flusso è chiuso.

Nota

Scrivere in ChannelWriter<T> su un thread in background e restituire il ChannelReader prima possibile. Altre chiamate dell'hub vengono bloccate fino a quando non viene restituito un oggetto ChannelReader .

Eseguire il wrapping della logica in un'istruzione try ... catch. Completare l'oggetto Channel in un finally blocco. Se si vuole eseguire il flusso di un errore, acquisiscerlo all'interno del catch blocco e scriverlo nel finally blocco.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

I metodi dell'hub di streaming da server a client possono accettare un CancellationToken parametro attivato quando il client annulla la sottoscrizione al flusso. Usare questo token per arrestare l'operazione del server e rilasciare eventuali risorse se il client si disconnette prima della fine del flusso.

Streaming da client a server

Un metodo hub diventa automaticamente un metodo hub di streaming da client a server quando accetta uno o più oggetti di tipo ChannelReader<T> o IAsyncEnumerable<T>. L'esempio seguente illustra le nozioni di base sulla lettura dei dati di streaming inviati dal client. Ogni volta che il client scrive in ChannelWriter<T>, i dati vengono scritti nel ChannelReader nel server da cui viene letto il metodo hub.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Segue una IAsyncEnumerable<T> versione del metodo .

Nota

L'esempio seguente richiede C# 8.0 o versione successiva.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Client .NET

Streaming da server a client

I StreamAsync metodi e StreamAsChannelAsync in HubConnection vengono usati per richiamare i metodi di streaming da server a client. Passare il nome del metodo hub e gli argomenti definiti nel metodo hub a StreamAsync o StreamAsChannelAsync. Il parametro generico su StreamAsync<T> e StreamAsChannelAsync<T> specifica il tipo di oggetti restituiti dal metodo di streaming. Un oggetto di tipo IAsyncEnumerable<T> o ChannelReader<T> viene restituito dalla chiamata al flusso e rappresenta il flusso nel client.

Esempio che restituisce StreamAsyncIAsyncEnumerable<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Esempio corrispondente StreamAsChannelAsync che restituisce ChannelReader<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Nel codice precedente:

  • Il StreamAsChannelAsync metodo su HubConnection viene usato per richiamare un metodo di streaming da server a client. Passare il nome del metodo hub e gli argomenti definiti nel metodo hub a StreamAsChannelAsync.
  • Il parametro generico in StreamAsChannelAsync<T> specifica il tipo di oggetti restituiti dal metodo di streaming.
  • Un ChannelReader<T> oggetto viene restituito dalla chiamata al flusso e rappresenta il flusso nel client.

Streaming da client a server

Esistono due modi per richiamare un metodo hub di streaming da client a server dal client .NET. È possibile passare un IAsyncEnumerable<T> oggetto o ChannelReader come argomento a SendAsync, InvokeAsynco StreamAsChannelAsync, a seconda del metodo hub richiamato.

Ogni volta che i dati vengono scritti nell'oggetto IAsyncEnumerable o ChannelWriter , il metodo hub nel server riceve un nuovo elemento con i dati dal client.

Se si usa un IAsyncEnumerable oggetto , il flusso termina dopo l'uscita del metodo che restituisce gli elementi del flusso.

Nota

L'esempio seguente richiede C# 8.0 o versione successiva.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

In alternativa, se si usa un ChannelWriteroggetto , completare il canale con channel.Writer.Complete():

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Client JavaScript

Streaming da server a client

I client JavaScript chiamano metodi di streaming da server a client in hub con connection.stream. Il stream metodo accetta due argomenti:

  • Nome del metodo hub. Nell'esempio seguente il nome del metodo hub è Counter.
  • Argomenti definiti nel metodo hub. Nell'esempio seguente gli argomenti sono un conteggio per il numero di elementi del flusso da ricevere e il ritardo tra gli elementi del flusso.

connection.stream restituisce un IStreamResultoggetto , che contiene un subscribe metodo . Passare un IStreamSubscriber oggetto a subscribe e impostare i nextcallback , errore complete per ricevere notifiche dalla stream chiamata.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Per terminare il flusso dal client, chiamare il dispose metodo sull'oggetto ISubscription restituito dal subscribe metodo . La chiamata a questo metodo causa l'annullamento CancellationToken del parametro del metodo Hub, se ne è stato specificato uno.

Streaming da client a server

I client JavaScript chiamano metodi di streaming da client a server negli hub passando un Subject oggetto come argomento a send, invokeo stream, a seconda del metodo hub richiamato. Subject è una classe che ha un aspetto simile a .Subject Ad esempio, in RxJS, è possibile usare la classe Subject da tale libreria.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

La chiamata subject.next(item) con un elemento scrive l'elemento nel flusso e il metodo hub riceve l'elemento nel server.

Per terminare il flusso, chiamare subject.complete().

Client Java

Streaming da server a client

Il SignalR client Java usa il stream metodo per richiamare i metodi di streaming. stream accetta tre o più argomenti:

  • Tipo previsto degli elementi del flusso.
  • Nome del metodo hub.
  • Argomenti definiti nel metodo hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Il stream metodo su HubConnection restituisce un oggetto Observable del tipo di elemento del flusso. Il metodo del tipo Observable è il punto in subscribe cui onNextonError sono definiti i gestori e onCompleted .

Streaming da client a server

Il SignalR client Java può chiamare metodi di streaming da client a server negli hub passando un observable come argomento a send, invokeo stream, a seconda del metodo hub richiamato.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

La chiamata stream.onNext(item) con un elemento scrive l'elemento nel flusso e il metodo hub riceve l'elemento nel server.

Per terminare il flusso, chiamare stream.onComplete().

Risorse aggiuntive