Uso de streaming en ASP.NET CoreSignalR

Por Conroy

SignalRASP.NET Core admite el streaming de cliente a servidor y de servidor a cliente. Esto es útil para escenarios en los que llegan fragmentos de datos con el tiempo. Cuando se transmite por secuencias, cada fragmento se envía al cliente o servidor en cuanto está disponible, en lugar de esperar a que todos los datos estén disponibles.

SignalRASP.NET Core admite los valores devueltos de streaming de los métodos de servidor. Esto es útil para escenarios en los que llegan fragmentos de datos con el tiempo. Cuando se transmite un valor devuelto al cliente, cada fragmento se envía al cliente en cuanto está disponible, en lugar de esperar a que todos los datos estén disponibles.

Vea o descargue el código de ejemplo (cómo descargarlo)

Configuración de un centro para streaming

Un método de concentrador se convierte automáticamente en un método de centro de streaming cuando devuelve IAsyncEnumerable<T> ChannelReader<T> , , o Task<IAsyncEnumerable<T>> Task<ChannelReader<T>> .

Un método de concentrador se convierte automáticamente en un método de centro de streaming cuando devuelve ChannelReader<T> o Task<ChannelReader<T>> .

Streaming de servidor a cliente

Los métodos del centro de streaming IAsyncEnumerable<T> pueden devolver además de ChannelReader<T> . La manera más sencilla de devolver es convertir el método hub en un método de iterador asincrónico, como se muestra IAsyncEnumerable<T> en el ejemplo siguiente. Los métodos de iterador asincrónico de concentrador pueden aceptar un parámetro que se desencadena CancellationToken cuando el cliente cancela la suscripción a la secuencia. Los métodos de iterador asincrónicos evitan problemas comunes con los canales, como no devolver lo suficientemente pronto o salir del método ChannelReader sin completar ChannelWriter<T> .

Nota

El ejemplo siguiente requiere C# 8.0 o posterior.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

En el ejemplo siguiente se muestran los conceptos básicos de la transmisión de datos al cliente mediante canales. Cada vez que se escribe un objeto en ChannelWriter<T> , el objeto se envía inmediatamente al cliente. Al final, se ChannelWriter completa para decir al cliente que la secuencia está cerrada.

Nota

Escriba en en ChannelWriter<T> un subproceso en segundo plano y devuelva lo antes ChannelReader posible. Otras invocaciones de concentrador se bloquean hasta que ChannelReader se devuelve .

Ajuste la lógica en una try ... catch instrucción. Complete en Channel un finally bloque. Si desea que fluya un error, capturelo dentro del catch bloque y escríbalo en el finally bloque .

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        Exception localException = null;
        try
        {
            for (var i = 0; i < count; i++)
            {
                // Check the cancellation token regularly so that the server will stop
                // producing items if the client disconnects.
                cancellationToken.ThrowIfCancellationRequested();
                await writer.WriteAsync(i);

                // Use the cancellationToken in other APIs that accept cancellation
                // tokens so the cancellation can flow down to them.
                await Task.Delay(delay, cancellationToken);
            }
        }
        catch (Exception ex)
        {
            localException = ex;
        }
        finally
        {
            writer.Complete(localException);
        }
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(int count, int delay)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay)
    {
        Exception localException = null;
        try
        {
            for (var i = 0; i < count; i++)
            {
                await writer.WriteAsync(i);
                await Task.Delay(delay);
            }
        }
        catch (Exception ex)
        {
            localException = ex;
        }
        finally
        {
            writer.Complete(localException);
        }

    }
}

Los métodos del centro de streaming de servidor a cliente pueden aceptar un parámetro que se desencadena CancellationToken cuando el cliente cancela la suscripción a la secuencia. Use este token para detener la operación del servidor y liberar los recursos si el cliente se desconecta antes del final de la secuencia.

Streaming de cliente a servidor

Un método de concentrador se convierte automáticamente en un método de centro de streaming de cliente a servidor cuando acepta uno o varios objetos de ChannelReader<T> tipo o IAsyncEnumerable<T> . En el ejemplo siguiente se muestran los conceptos básicos de la lectura de los datos de streaming enviados desde el cliente. Cada vez que el cliente escribe en , los datos se escriben en en el servidor desde el que se lee ChannelWriter<T> ChannelReader el método central.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

A IAsyncEnumerable<T> continuación se ofrece una versión del método .

Nota

El ejemplo siguiente requiere C# 8.0 o posterior.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Cliente .NET

Streaming de servidor a cliente

Los StreamAsync StreamAsChannelAsync métodos y de se usan para invocar métodos de streaming de servidor HubConnection a cliente. Pase el nombre del método central y los argumentos definidos en el método de concentrador a StreamAsync o StreamAsChannelAsync . El parámetro genérico en StreamAsync<T> y especifica el tipo de objetos StreamAsChannelAsync<T> devueltos por el método de streaming. Se devuelve un objeto IAsyncEnumerable<T> de tipo o de la ChannelReader<T> invocación de secuencia y representa la secuencia en el cliente.

Ejemplo StreamAsync que devuelve IAsyncEnumerable<int> :

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Ejemplo correspondiente StreamAsChannelAsync que devuelve ChannelReader<int> :

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

El método en se usa para invocar un método de streaming de servidor StreamAsChannelAsync HubConnection a cliente. Pase el nombre del método central y los argumentos definidos en el método de concentrador a StreamAsChannelAsync . El parámetro genérico en StreamAsChannelAsync<T> especifica el tipo de objetos devueltos por el método de streaming. Se ChannelReader<T> devuelve de la invocación de secuencia y representa la secuencia en el cliente.

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

El método en se usa para invocar un método de streaming de servidor StreamAsChannelAsync HubConnection a cliente. Pase el nombre del método central y los argumentos definidos en el método de concentrador a StreamAsChannelAsync . El parámetro genérico en StreamAsChannelAsync<T> especifica el tipo de objetos devueltos por el método de streaming. Se ChannelReader<T> devuelve de la invocación de secuencia y representa la secuencia en el cliente.

var channel = await hubConnection
    .StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Streaming de cliente a servidor

Hay dos maneras de invocar un método de centro de streaming de cliente a servidor desde el cliente de .NET. Puede pasar un elemento o IAsyncEnumerable<T> como argumento a , o , ChannelReader SendAsync InvokeAsync StreamAsChannelAsync dependiendo del método de concentrador invocado.

Cada vez que se escriben datos en el objeto o , el método hub en el servidor recibe IAsyncEnumerable un nuevo elemento con los datos del ChannelWriter cliente.

Si se usa un objeto , la secuencia finaliza después de que se cierre el IAsyncEnumerable método que devuelve elementos de secuencia.

Nota

El ejemplo siguiente requiere C# 8.0 o posterior.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

O bien, si usa ChannelWriter un , completa el canal con channel.Writer.Complete() :

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Cliente de JavaScript

Streaming de servidor a cliente

Los clientes de JavaScript llaman a métodos de streaming de servidor a cliente en centros con connection.stream . El stream método acepta dos argumentos:

  • Nombre del método central. En el ejemplo siguiente, el nombre del método central es Counter .
  • Argumentos definidos en el método hub. En el ejemplo siguiente, los argumentos son un recuento para el número de elementos de secuencia que se van a recibir y el retraso entre los elementos de secuencia.

connection.stream devuelve , IStreamResult que contiene un método subscribe . Pase a IStreamSubscriber y establezca las devoluciones de llamada , y para recibir notificaciones de la subscribe next error complete stream invocación.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Para finalizar la secuencia desde el cliente, llame al método en el que dispose se devuelve desde el método ISubscription subscribe . Llamar a este método provoca la CancellationToken cancelación del parámetro del método hub, si proporcionó uno.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Para finalizar la secuencia desde el cliente, llame al método en el que dispose se devuelve desde el método ISubscription subscribe .

Streaming de cliente a servidor

Los clientes de JavaScript llaman a métodos de streaming de cliente a servidor en concentradores pasando como argumento a , o , en función del Subject send método de concentrador invoke stream invocado. es Subject una clase que tiene el aspecto de Subject . Por ejemplo, en RxJS, puede usar la clase Subject de esa biblioteca.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Al llamar a con un elemento, se escribe el elemento en la secuencia y el método subject.next(item) de concentrador recibe el elemento en el servidor.

Para finalizar la secuencia, llame a subject.complete() .

Cliente de Java

Streaming de servidor a cliente

El SignalR cliente de Java usa el método para invocar stream métodos de streaming. stream acepta tres o más argumentos:

  • Tipo esperado de los elementos de secuencia.
  • Nombre del método central.
  • Argumentos definidos en el método hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

El stream método en devuelve un HubConnection observable del tipo de elemento de secuencia. El método del tipo Observable es donde se definen los controladores subscribe onNext y onError onCompleted .

Streaming de cliente a servidor

El cliente de Java puede llamar a métodos de streaming de cliente a servidor en concentradores pasando un observable como argumento a , o , dependiendo del método de concentrador SignalR send invoke stream invocado.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Al llamar a con un elemento, se escribe el elemento en la secuencia y el método stream.onNext(item) de concentrador recibe el elemento en el servidor.

Para finalizar la secuencia, llame a stream.onComplete() .

Recursos adicionales