Usar streaming en ASP.NET Core SignalRUse streaming in ASP.NET Core SignalR
Por Brennan ConroyBy Brennan Conroy
ASP.NET Core SignalR admite el streaming desde el cliente al servidor y desde el servidor al cliente.ASP.NET Core SignalR supports streaming from client to server and from server to client. Esto resulta útil para escenarios en los que llegan fragmentos de datos a lo largo del tiempo.This is useful for scenarios where fragments of data arrive over time. Al transmitir por secuencias, cada fragmento se envía al cliente o al servidor en cuanto está disponible, en lugar de esperar a que todos los datos estén disponibles.When streaming, each fragment is sent to the client or server as soon as it becomes available, rather than waiting for all of the data to become available.
ASP.NET Core SignalR admite el streaming de valores devueltos de métodos de servidor.ASP.NET Core SignalR supports streaming return values of server methods. Esto resulta útil para escenarios en los que llegan fragmentos de datos a lo largo del tiempo.This is useful for scenarios where fragments of data arrive over time. Cuando un valor devuelto se transmite al cliente, cada fragmento se envía al cliente en cuanto está disponible, en lugar de esperar a que todos los datos estén disponibles.When a return value is streamed to the client, each fragment is sent to the client as soon as it becomes available, rather than waiting for all the data to become available.
Vea o descargue el código de ejemplo (cómo descargarlo)View or download sample code (how to download)
Configuración de un centro para la transmisión por secuenciasSet up a hub for streaming
Un método de concentrador se convierte automáticamente en un método de concentrador de streaming cuando devuelve IAsyncEnumerable<T> , ChannelReader<T> , Task<IAsyncEnumerable<T>>
o Task<ChannelReader<T>>
.A hub method automatically becomes a streaming hub method when it returns IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>
, or Task<ChannelReader<T>>
.
Un método de concentrador se convierte automáticamente en un método de concentrador de streaming cuando devuelve ChannelReader<T> o Task<ChannelReader<T>>
.A hub method automatically becomes a streaming hub method when it returns a ChannelReader<T> or a Task<ChannelReader<T>>
.
Streaming de servidor a clienteServer-to-client streaming
Los métodos de streaming Hub pueden devolver además de IAsyncEnumerable<T>
ChannelReader<T>
.Streaming hub methods can return IAsyncEnumerable<T>
in addition to ChannelReader<T>
. La manera más sencilla de devolver IAsyncEnumerable<T>
es hacer que el método de concentrador sea un método de iterador asincrónico como se muestra en el ejemplo siguiente.The simplest way to return IAsyncEnumerable<T>
is by making the hub method an async iterator method as the following sample demonstrates. Los métodos de iterador Async de concentrador pueden aceptar un CancellationToken
parámetro que se desencadena cuando el cliente cancela la suscripción de la secuencia.Hub async iterator methods can accept a CancellationToken
parameter that's triggered when the client unsubscribes from the stream. Los métodos de iterador asincrónico evitan problemas comunes con los canales, como no devolver el ChannelReader
tiempo suficiente o salir del método sin completar el ChannelWriter<T> .Async iterator methods avoid problems common with Channels, such as not returning the ChannelReader
early enough or exiting the method without completing the ChannelWriter<T>.
Nota
El ejemplo siguiente requiere C# 8,0 o posterior.The following sample requires C# 8.0 or later.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
En el ejemplo siguiente se muestran los aspectos básicos de la transmisión de datos al cliente mediante canales.The following sample shows the basics of streaming data to the client using Channels. Siempre que se escribe un objeto en ChannelWriter<T> , el objeto se envía inmediatamente al cliente.Whenever an object is written to the ChannelWriter<T>, the object is immediately sent to the client. Al final, ChannelWriter
se completa para indicar al cliente que la secuencia está cerrada.At the end, the ChannelWriter
is completed to tell the client the stream is closed.
Nota
Escriba en ChannelWriter<T>
en un subproceso en segundo plano y devuelva lo ChannelReader
antes posible.Write to the ChannelWriter<T>
on a background thread and return the ChannelReader
as soon as possible. Otras invocaciones del concentrador se bloquean hasta que ChannelReader
se devuelve un.Other hub invocations are blocked until a ChannelReader
is returned.
Ajuste la lógica en una try ... catch
instrucción.Wrap logic in a try ... catch
statement. Complete el Channel
en un finally
bloque.Complete the Channel
in a finally
block. Si desea fluir un error, Capture dentro del catch
bloque y escríbalo en el finally
bloque.If you want to flow an error, capture it inside the catch
block and write it in the finally
block.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
public class StreamHub : Hub
{
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
await writer.WriteAsync(i);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
}
public class StreamHub : Hub
{
public ChannelReader<int> Counter(int count, int delay)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i);
await Task.Delay(delay);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
}
Los métodos del concentrador de streaming de servidor a cliente pueden aceptar un CancellationToken
parámetro que se desencadena cuando el cliente cancela la suscripción a la secuencia.Server-to-client streaming hub methods can accept a CancellationToken
parameter that's triggered when the client unsubscribes from the stream. Use este token para detener la operación del servidor y liberar los recursos si el cliente se desconecta antes del final de la secuencia.Use this token to stop the server operation and release any resources if the client disconnects before the end of the stream.
Streaming de cliente a servidorClient-to-server streaming
Un método de concentrador se convierte automáticamente en un método de concentrador de streaming de cliente a servidor cuando acepta uno o más objetos de tipo ChannelReader<T> o IAsyncEnumerable<T> .A hub method automatically becomes a client-to-server streaming hub method when it accepts one or more objects of type ChannelReader<T> or IAsyncEnumerable<T>. En el ejemplo siguiente se muestran los aspectos básicos de la lectura de datos de streaming enviados desde el cliente.The following sample shows the basics of reading streaming data sent from the client. Cada vez que el cliente escribe en ChannelWriter<T> , los datos se escriben en ChannelReader
en el servidor desde el que el método de concentrador está leyendo.Whenever the client writes to the ChannelWriter<T>, the data is written into the ChannelReader
on the server from which the hub method is reading.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
IAsyncEnumerable<T>A continuación se muestra una versión del método.An IAsyncEnumerable<T> version of the method follows.
Nota
El ejemplo siguiente requiere C# 8,0 o posterior.The following sample requires C# 8.0 or later.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
Cliente .NET.NET client
Streaming de servidor a clienteServer-to-client streaming
Los StreamAsync
StreamAsChannelAsync
métodos y en HubConnection
se usan para invocar métodos de streaming de servidor a cliente.The StreamAsync
and StreamAsChannelAsync
methods on HubConnection
are used to invoke server-to-client streaming methods. Pase el nombre del método de concentrador y los argumentos definidos en el método de concentrador a StreamAsync
o StreamAsChannelAsync
.Pass the hub method name and arguments defined in the hub method to StreamAsync
or StreamAsChannelAsync
. El parámetro genérico en StreamAsync<T>
y StreamAsChannelAsync<T>
especifica el tipo de objetos devueltos por el método de transmisión por secuencias.The generic parameter on StreamAsync<T>
and StreamAsChannelAsync<T>
specifies the type of objects returned by the streaming method. Un objeto de tipo IAsyncEnumerable<T>
o ChannelReader<T>
se devuelve de la invocación de flujo y representa la secuencia en el cliente.An object of type IAsyncEnumerable<T>
or ChannelReader<T>
is returned from the stream invocation and represents the stream on the client.
Un StreamAsync
ejemplo que devuelve IAsyncEnumerable<int>
:A StreamAsync
example that returns IAsyncEnumerable<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = await hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
Un StreamAsChannelAsync
ejemplo correspondiente que devuelve ChannelReader<int>
:A corresponding StreamAsChannelAsync
example that returns ChannelReader<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
El StreamAsChannelAsync
método on HubConnection
se usa para invocar un método de streaming de servidor a cliente.The StreamAsChannelAsync
method on HubConnection
is used to invoke a server-to-client streaming method. Pase el nombre del método de concentrador y los argumentos definidos en el método de concentrador a StreamAsChannelAsync
.Pass the hub method name and arguments defined in the hub method to StreamAsChannelAsync
. El parámetro Generic en StreamAsChannelAsync<T>
especifica el tipo de objetos devueltos por el método de transmisión por secuencias.The generic parameter on StreamAsChannelAsync<T>
specifies the type of objects returned by the streaming method. ChannelReader<T>
Se devuelve un objeto de la invocación de flujo y representa la secuencia en el cliente.A ChannelReader<T>
is returned from the stream invocation and represents the stream on the client.
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
El StreamAsChannelAsync
método on HubConnection
se usa para invocar un método de streaming de servidor a cliente.The StreamAsChannelAsync
method on HubConnection
is used to invoke a server-to-client streaming method. Pase el nombre del método de concentrador y los argumentos definidos en el método de concentrador a StreamAsChannelAsync
.Pass the hub method name and arguments defined in the hub method to StreamAsChannelAsync
. El parámetro Generic en StreamAsChannelAsync<T>
especifica el tipo de objetos devueltos por el método de transmisión por secuencias.The generic parameter on StreamAsChannelAsync<T>
specifies the type of objects returned by the streaming method. ChannelReader<T>
Se devuelve un objeto de la invocación de flujo y representa la secuencia en el cliente.A ChannelReader<T>
is returned from the stream invocation and represents the stream on the client.
var channel = await hubConnection
.StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
Streaming de cliente a servidorClient-to-server streaming
Hay dos maneras de invocar un método de concentrador de streaming de cliente a servidor desde el cliente .NET.There are two ways to invoke a client-to-server streaming hub method from the .NET client. Puede pasar IAsyncEnumerable<T>
o ChannelReader
como un argumento a SendAsync
, InvokeAsync
o StreamAsChannelAsync
, dependiendo del método de concentrador invocado.You can either pass in an IAsyncEnumerable<T>
or a ChannelReader
as an argument to SendAsync
, InvokeAsync
, or StreamAsChannelAsync
, depending on the hub method invoked.
Cada vez que se escriben datos en el IAsyncEnumerable
ChannelWriter
objeto o, el método de concentrador del servidor recibe un nuevo elemento con los datos del cliente.Whenever data is written to the IAsyncEnumerable
or ChannelWriter
object, the hub method on the server receives a new item with the data from the client.
Si se usa un IAsyncEnumerable
objeto, el flujo termina después de que se cierre el método que devuelve los elementos de la secuencia.If using an IAsyncEnumerable
object, the stream ends after the method returning stream items exits.
Nota
El ejemplo siguiente requiere C# 8,0 o posterior.The following sample requires C# 8.0 or later.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
O bien, si está usando un ChannelWriter
, complete el canal con channel.Writer.Complete()
:Or if you're using a ChannelWriter
, you complete the channel with channel.Writer.Complete()
:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
Cliente de JavaScriptJavaScript client
Streaming de servidor a clienteServer-to-client streaming
Los clientes de JavaScript llaman a los métodos de streaming de servidor a cliente en los concentradores con connection.stream
.JavaScript clients call server-to-client streaming methods on hubs with connection.stream
. El stream
método acepta dos argumentos:The stream
method accepts two arguments:
- Nombre del método de concentrador.The name of the hub method. En el ejemplo siguiente, el nombre del método del concentrador es
Counter
.In the following example, the hub method name isCounter
. - Argumentos definidos en el método de concentrador.Arguments defined in the hub method. En el ejemplo siguiente, los argumentos son un recuento del número de elementos de secuencia que se van a recibir y el retraso entre los elementos de la secuencia.In the following example, the arguments are a count for the number of stream items to receive and the delay between stream items.
connection.stream
Devuelve un IStreamResult
, que contiene un subscribe
método.connection.stream
returns an IStreamResult
, which contains a subscribe
method. Pase IStreamSubscriber
a subscribe
y establezca las next
error
complete
devoluciones de llamada, y para recibir notificaciones de la stream
invocación.Pass an IStreamSubscriber
to subscribe
and set the next
, error
, and complete
callbacks to receive notifications from the stream
invocation.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Para finalizar la secuencia del cliente, llame al dispose
método en el ISubscription
que se devuelve desde el subscribe
método.To end the stream from the client, call the dispose
method on the ISubscription
that's returned from the subscribe
method. La llamada a este método provoca la cancelación del CancellationToken
parámetro del método de concentrador, si se proporcionó uno.Calling this method causes cancellation of the CancellationToken
parameter of the Hub method, if you provided one.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Para finalizar la secuencia del cliente, llame al dispose
método en el ISubscription
que se devuelve desde el subscribe
método.To end the stream from the client, call the dispose
method on the ISubscription
that's returned from the subscribe
method.
Streaming de cliente a servidorClient-to-server streaming
Los clientes de JavaScript llaman a métodos de streaming de cliente a servidor en los concentradores pasando un Subject
como argumento a send
, invoke
o stream
, dependiendo del método de concentrador invocado.JavaScript clients call client-to-server streaming methods on hubs by passing in a Subject
as an argument to send
, invoke
, or stream
, depending on the hub method invoked. Subject
Es una clase que tiene el aspecto de un Subject
.The Subject
is a class that looks like a Subject
. Por ejemplo, en RxJS, puede usar la clase de asunto de la biblioteca.For example in RxJS, you can use the Subject class from that library.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
La llamada a subject.next(item)
con un elemento escribe el elemento en la secuencia y el método de concentrador recibe el elemento en el servidor.Calling subject.next(item)
with an item writes the item to the stream, and the hub method receives the item on the server.
Para finalizar el flujo, llame a subject.complete()
.To end the stream, call subject.complete()
.
Cliente de JavaJava client
Streaming de servidor a clienteServer-to-client streaming
El SignalR cliente de Java usa el stream
método para invocar métodos de streaming.The SignalR Java client uses the stream
method to invoke streaming methods. stream
acepta tres o más argumentos:stream
accepts three or more arguments:
- Tipo esperado de los elementos de la secuencia.The expected type of the stream items.
- Nombre del método de concentrador.The name of the hub method.
- Argumentos definidos en el método de concentrador.Arguments defined in the hub method.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
El stream
método en HubConnection
devuelve un objeto observable del tipo de elemento de secuencia.The stream
method on HubConnection
returns an Observable of the stream item type. El método del tipo observable subscribe
es donde onNext
, onError
y onCompleted
se definen los controladores.The Observable type's subscribe
method is where onNext
, onError
and onCompleted
handlers are defined.
Streaming de cliente a servidorClient-to-server streaming
El SignalR cliente de Java puede llamar a métodos de streaming de cliente a servidor en los concentradores pasando un objeto observable como argumento a send
, invoke
o stream
, dependiendo del método de concentrador invocado.The SignalR Java client can call client-to-server streaming methods on hubs by passing in an Observable as an argument to send
, invoke
, or stream
, depending on the hub method invoked.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
La llamada a stream.onNext(item)
con un elemento escribe el elemento en la secuencia y el método de concentrador recibe el elemento en el servidor.Calling stream.onNext(item)
with an item writes the item to the stream, and the hub method receives the item on the server.
Para finalizar el flujo, llame a stream.onComplete()
.To end the stream, call stream.onComplete()
.