Używanie przesyłania strumieniowego w ASP.NET Core SignalR

Autor: Brennan Conroy

ASP.NET Core SignalR obsługuje przesyłanie strumieniowe z klienta do serwera i z serwera na klienta. Jest to przydatne w scenariuszach, w których fragmenty danych docierają z upływem czasu. Podczas przesyłania strumieniowego każdy fragment jest wysyłany do klienta lub serwera, gdy tylko stanie się dostępny, zamiast czekać na udostępnienie wszystkich danych.

Wyświetl lub pobierz przykładowy kod (jak pobrać)

Konfigurowanie koncentratora na potrzeby przesyłania strumieniowego

Metoda koncentratora automatycznie staje się metodą centrum przesyłania strumieniowego, gdy zwraca wartość IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>lub Task<ChannelReader<T>>.

Przesyłanie strumieniowe serwer-klient

Metody centrum przesyłania strumieniowego mogą zwracać IAsyncEnumerable<T> się oprócz ChannelReader<T>metody . Najprostszym sposobem powrotu IAsyncEnumerable<T> jest utworzenie metody centrum jako metody iteratora asynchronicznego, jak pokazano w poniższym przykładzie. Metody iteratora asynchronicznego centrum mogą akceptować CancellationToken parametr wyzwalany, gdy klient anuluje subskrypcję strumienia. Metody iteracyjne asynchroniczne unikają typowych problemów z kanałami, takich jak brak wystarczająco wczesnego ChannelReader zwracania lub zamykania metody bez ukończenia ChannelWriter<T>metody .

Uwaga

Poniższy przykład wymaga języka C# 8.0 lub nowszego.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Poniższy przykład przedstawia podstawy danych przesyłanych strumieniowo do klienta przy użyciu kanałów. Za każdym razem, gdy obiekt jest zapisywany w ChannelWriter<T>obiekcie , jest natychmiast wysyłany do klienta. Na końcu element zostanie ukończony, aby poinformować klienta, ChannelWriter że strumień jest zamknięty.

Uwaga

Zapisz w wątku ChannelWriter<T> w tle i zwróć je ChannelReader tak szybko, jak to możliwe. Inne wywołania koncentratora są blokowane do momentu zwrócenia.ChannelReader

Zawijanie logiki w instrukcjitry ... catch. Ukończ Channel blok w finally bloku. Jeśli chcesz przepływać błąd, przechwyć go wewnątrz catch bloku i zapisać go w finally bloku.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Metody centrum przesyłania strumieniowego serwer-klient mogą akceptować CancellationToken parametr wyzwalany, gdy klient anuluje subskrypcję strumienia. Użyj tego tokenu, aby zatrzymać operację serwera i zwolnić wszystkie zasoby, jeśli klient rozłączy się przed końcem strumienia.

Przesyłanie strumieniowe klient-serwer

Metoda koncentratora automatycznie staje się metodą centrum przesyłania strumieniowego klient-serwer, gdy akceptuje co najmniej jeden obiekt typu ChannelReader<T> lub IAsyncEnumerable<T>. Poniższy przykład przedstawia podstawy odczytywania danych przesyłanych strumieniowo z klienta. Za każdym razem, gdy klient zapisuje dane ChannelWriter<T>w obiekcie , są zapisywane na ChannelReader serwerze, z którego jest odczytywana metoda centrum.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Poniżej IAsyncEnumerable<T> przedstawiono wersję metody .

Uwaga

Poniższy przykład wymaga języka C# 8.0 lub nowszego.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Klient .NET

Przesyłanie strumieniowe serwer-klient

Metody StreamAsync i StreamAsChannelAsyncHubConnection używane do wywoływania metod przesyłania strumieniowego serwer-klient. Przekaż nazwę i argumenty metody centrum zdefiniowane w metodzie hub do metody StreamAsync lub StreamAsChannelAsync. Ogólny parametr on StreamAsync<T> i StreamAsChannelAsync<T> określa typ obiektów zwracanych przez metodę przesyłania strumieniowego. Obiekt typu IAsyncEnumerable<T> lub ChannelReader<T> jest zwracany z wywołania strumienia i reprezentuje strumień na kliencie.

Przykład StreamAsync , który zwraca wartość IAsyncEnumerable<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Odpowiedni StreamAsChannelAsync przykład, który zwraca wartość ChannelReader<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

W poprzednim kodzie:

  • Metoda StreamAsChannelAsync on HubConnection służy do wywoływania metody przesyłania strumieniowego serwer-klient. Przekaż nazwę metody centrum i argumenty zdefiniowane w metodzie centrum do StreamAsChannelAsyncmetody .
  • Ogólny parametr on StreamAsChannelAsync<T> określa typ obiektów zwracanych przez metodę przesyłania strumieniowego.
  • Obiekt ChannelReader<T> jest zwracany z wywołania strumienia i reprezentuje strumień na kliencie.

Przesyłanie strumieniowe klient-serwer

Istnieją dwa sposoby wywoływania metody centrum przesyłania strumieniowego klient-serwer z klienta platformy .NET. Możesz przekazać IAsyncEnumerable<T> argument lub ChannelReader jako argument do SendAsync, InvokeAsynclub , w StreamAsChannelAsynczależności od wywoływanej metody centrum.

Za każdym razem, gdy dane są zapisywane w IAsyncEnumerable obiekcie lub ChannelWriter , metoda centrum na serwerze odbiera nowy element z danymi od klienta.

Jeśli używasz IAsyncEnumerable obiektu, strumień kończy się po zakończeniu metody zwracania elementów strumienia.

Uwaga

Poniższy przykład wymaga języka C# 8.0 lub nowszego.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Lub jeśli używasz elementu ChannelWriter, ukończ kanał za pomocą channel.Writer.Complete()polecenia :

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Klient środowiska JavaScript

Przesyłanie strumieniowe serwer-klient

Klienci języka JavaScript nazywają metody przesyłania strumieniowego serwer-klient w centrach za pomocą polecenia connection.stream. Metoda stream akceptuje dwa argumenty:

  • Nazwa metody centrum. W poniższym przykładzie nazwa metody centrum to Counter.
  • Argumenty zdefiniowane w metodzie piasty. W poniższym przykładzie argumenty są liczbą elementów strumienia do odbierania i opóźnienia między elementami strumienia.

connection.stream Zwraca element IStreamResult, który zawiera metodę subscribe . Przekaż element IStreamSubscriber i subscribe ustaw nextwywołania zwrotne , errori complete w celu odbierania powiadomień z stream wywołania.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Aby zakończyć strumień z klienta, wywołaj metodę dispose w ISubscription metodzie zwróconej subscribe z metody . Wywołanie tej metody powoduje anulowanie CancellationToken parametru metody Hub, jeśli został podany.

Przesyłanie strumieniowe klient-serwer

Klienci języka JavaScript wywołują metody przesyłania strumieniowego klient-serwer w centrach, przekazując Subject jako argument do sendmetody , invokelub stream, w zależności od wywoływanej metody centrum. Jest Subject to klasa, która wygląda jak Subject. Na przykład w języku RxJS można użyć klasy Subject z tej biblioteki.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Wywołanie subject.next(item) za pomocą elementu zapisuje element w strumieniu, a metoda centrum odbiera element na serwerze.

Aby zakończyć strumień, wywołaj metodę subject.complete().

Klienta środowiska Java

Przesyłanie strumieniowe serwer-klient

Klient SignalR java używa stream metody do wywoływania metod przesyłania strumieniowego. stream akceptuje trzy lub więcej argumentów:

  • Oczekiwany typ elementów strumienia.
  • Nazwa metody centrum.
  • Argumenty zdefiniowane w metodzie piasty.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Metoda stream on HubConnection zwraca obserwowalny typ elementu strumienia. Metoda obserwowanego subscribe typu to miejsce, w którym onNextzdefiniowano programy obsługi i onCompleted . onError

Przesyłanie strumieniowe klient-serwer

Klient SignalR Java może wywoływać metody przesyłania strumieniowego klient-serwer w centrach, przekazując element Obserwowalny jako argument do sendmetody , invokelub stream, w zależności od wywoływanej metody centrum.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Wywołanie stream.onNext(item) za pomocą elementu zapisuje element w strumieniu, a metoda centrum odbiera element na serwerze.

Aby zakończyć strumień, wywołaj metodę stream.onComplete().

Dodatkowe zasoby