Použití streamování v ASP.NET CoreSignalR
SignalRASP.NET Core podporuje streamování z klienta na server a ze serveru na klienta. To je užitečné ve scénářích, kdy se v průběhu času dorazí fragmenty dat. Při streamování se každý fragment odesílá klientovi nebo serveru, jakmile jsou k dispozici, a nečeká na to, až budou všechna data k dispozici.
SignalRASP.NET Core podporuje návratové hodnoty streamování serverových metod. To je užitečné ve scénářích, kdy se v průběhu času dorazí fragmenty dat. Když se do klienta streamuje návratová hodnota, každý fragment se odesílaje klientovi, jakmile jsou k dispozici, a nečeká na to, až budou všechna data k dispozici.
Zobrazení nebo stažení ukázkového kódu (stažení)
Nastavení centra pro streamování
Metoda centra se automaticky stane metodou centra streamování, když vrátí IAsyncEnumerable<T> , ChannelReader<T> , nebo Task<IAsyncEnumerable<T>> Task<ChannelReader<T>> .
Metoda centra se automaticky stane metodou centra streamování, když vrátí ChannelReader<T> nebo Task<ChannelReader<T>> .
Streamování mezi serverem a klientem
Metody centra streamování se IAsyncEnumerable<T> vracejí navíc k ChannelReader<T> . Nejjednodušší způsob, jak vrátit , je udělat metodu centra asynchronní metodu iterátoru, jak IAsyncEnumerable<T> ukazuje následující ukázka. Asynchronní metody iterátoru centra mohou přijímat parametr, který se aktivuje, když se klient CancellationToken odhlásí ze streamu. Asynchronní metody iterátoru se vyhnout problémům běžným s kanály, například nevrací dostatečně brzy nebo ukončí metodu bez ChannelReader dokončení ChannelWriter<T> .
Poznámka
Následující ukázka vyžaduje C# 8,0 nebo novější.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
Následující příklad ukazuje základy streamování dat do klienta pomocí kanálů. Pokaždé, když je objekt zapsán ChannelWriter<T> do objektu , je objekt okamžitě odeslán klientovi. Na konci se dokončí , ChannelWriter aby se klientovi řeklo, že je datový proud zavřený.
Poznámka
Zapište do ChannelWriter<T> pro vlákno na pozadí a vraťte co nejdříve ChannelReader . Ostatní vyvolání centra se zablokují, dokud se ChannelReader nevrátila hodnota .
Zabalte logiku do try ... catch příkazu. Dokončete Channel v finally bloku. Pokud chcete tok chyby, zachyťte ji uvnitř bloku a catch zapište ji do finally bloku.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
public class StreamHub : Hub
{
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
await writer.WriteAsync(i);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
}
public class StreamHub : Hub
{
public ChannelReader<int> Counter(int count, int delay)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i);
await Task.Delay(delay);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
}
Metody centra streamování mezi serverem a klientem mohou přijmout parametr, který se aktivuje, když se klient CancellationToken odhlásí ze streamu. Pomocí tohoto tokenu zastavte operaci serveru a uvolněte všechny prostředky, pokud se klient odpojí před koncem streamu.
Streamování mezi klientem a serverem
Metoda centra se automaticky stane metodou centra streamování typu klient-server, když přijme jeden nebo více objektů typu ChannelReader<T> nebo IAsyncEnumerable<T> . Následující příklad ukazuje základy čtení streamovaných dat odesílaných z klienta. Pokaždé, když klient zapíše do , data se zapíšou do na serveru, ze kterého čte ChannelWriter<T> ChannelReader metoda centra.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
Následuje IAsyncEnumerable<T> verze metody .
Poznámka
Následující ukázka vyžaduje C# 8,0 nebo novější.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
Klient .NET
Streamování mezi serverem a klientem
Metody a v se používají k vyvolání metod streamování mezi StreamAsync StreamAsChannelAsync HubConnection klienty. Předejte název metody centra a argumenty definované v metodě centra do StreamAsync nebo StreamAsChannelAsync . Obecný parametr pro a StreamAsync<T> StreamAsChannelAsync<T> určuje typ objektů vrácených metodou streamování. Objekt typu nebo je vrácen z vyvolání datového proudu a IAsyncEnumerable<T> představuje datový proud na ChannelReader<T> klientovi.
Příklad, StreamAsync který vrací IAsyncEnumerable<int> :
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
Odpovídající StreamAsChannelAsync příklad, který vrátí ChannelReader<int> :
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
Metoda StreamAsChannelAsync v se používá k vyvolání metody HubConnection streamování mezi serverem a klientem. Předejte metodě hub název a argumenty definované v metodě centra StreamAsChannelAsync do . Obecný parametr v StreamAsChannelAsync<T> parametru určuje typ objektů vrácených metodou streamování. Objekt se vrátí z volání datového proudu ChannelReader<T> a představuje datový proud na klientovi.
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
Metoda StreamAsChannelAsync v se používá k vyvolání metody HubConnection streamování mezi serverem a klientem. Předejte metodě hub název a argumenty definované v metodě centra StreamAsChannelAsync do . Obecný parametr v StreamAsChannelAsync<T> parametru určuje typ objektů vrácených metodou streamování. Objekt se vrátí z volání datového proudu ChannelReader<T> a představuje datový proud na klientovi.
var channel = await hubConnection
.StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
Streamování mezi klientem a serverem
Existují dva způsoby, jak vyvolat metodu centra streamování klient-server z klienta .NET. V závislosti na vyvolané metodě centra můžete metodě centra předat jako argument nebo IAsyncEnumerable<T> ChannelReader SendAsync InvokeAsync StreamAsChannelAsync .
Při každém zápisu dat do objektu nebo přijme metoda centra na serveru novou položku s IAsyncEnumerable ChannelWriter daty z klienta.
Pokud používáte objekt IAsyncEnumerable , datový proud skončí po ukončení metody, která vrací položky datového proudu.
Poznámka
Následující ukázka vyžaduje C# 8,0 nebo novější.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
Pokud používáte , můžete kanál dokončit ChannelWriter pomocí channel.Writer.Complete() příkazu :
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
Klient JavaScriptu
Streamování mezi serverem a klientem
Klienti JavaScriptu volají metody streamování mezi klienty v centrech pomocí connection.stream . Metoda stream přijímá dva argumenty:
- Název metody centra. V následujícím příkladu má metoda centra název
Counter. - Argumenty definované v metodě centra. V následujícím příkladu jsou argumenty počet položek datového proudu, které se mají přijmout, a zpoždění mezi položkami datového proudu.
connection.stream vrátí IStreamResult , který obsahuje subscribe metodu . Předejte do a nastavte zpětná volání , a pro příjem oznámení z IStreamSubscriber subscribe next error complete stream volání.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Pokud chcete ukončit stream z klienta, zavolejte metodu pro , dispose ISubscription která je vrácena z metody subscribe . Volání této metody způsobí zrušení CancellationToken parametru metody Hub, pokud jste ho poskytli.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Pokud chcete ukončit stream z klienta, zavolejte metodu pro , dispose ISubscription která je vrácena z metody subscribe .
Streamování mezi klientem a serverem
Klienti JavaScriptu volají metody streamování typu klient-server v centrech předáním jako argumentu metodě , nebo v závislosti na Subject send invoke vyvolané stream metodě centra. Subjectje třída, která vypadá jako Subject . Například v RxJS můžete použít třídu Subject z této knihovny.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
Volání subject.next(item) s položkou zapíše položku do datového proudu a metoda centra obdrží položku na serveru.
Pokud chcete datový proud ukončit, zavolejte subject.complete() .
Klient Java
Streamování mezi serverem a klientem
Klient SignalR Java používá k vyvolání metod stream streamování metodu . stream přijímá tři nebo více argumentů:
- Očekávaný typ položek datového proudu
- Název metody centra.
- Argumenty definované v metodě centra.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
Metoda stream u HubConnection vrátí Pozorovatelné typu položky datového proudu. Metoda pozorovatelného typu je subscribe místo, kde onNext jsou onError definovány onCompleted obslužné rutiny a .
Streamování mezi klientem a serverem
Klient Javy může volat metody streamování klient-server v centrech předáním pozorovatelného parametru jako argumentu metodě , nebo v závislosti na vyvolané SignalR send invoke stream metodě centra.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
Volání stream.onNext(item) s položkou zapíše položku do datového proudu a metoda centra obdrží položku na serveru.
Pokud chcete datový proud ukončit, zavolejte stream.onComplete() .