Použití streamování v ASP.NET CoreSignalR

Od Předáka Conroye

SignalRASP.NET Core podporuje streamování z klienta na server a ze serveru na klienta. To je užitečné ve scénářích, kdy se v průběhu času dorazí fragmenty dat. Při streamování se každý fragment odesílá klientovi nebo serveru, jakmile jsou k dispozici, a nečeká na to, až budou všechna data k dispozici.

SignalRASP.NET Core podporuje návratové hodnoty streamování serverových metod. To je užitečné ve scénářích, kdy se v průběhu času dorazí fragmenty dat. Když se do klienta streamuje návratová hodnota, každý fragment se odesílaje klientovi, jakmile jsou k dispozici, a nečeká na to, až budou všechna data k dispozici.

Zobrazení nebo stažení ukázkového kódu (stažení)

Nastavení centra pro streamování

Metoda centra se automaticky stane metodou centra streamování, když vrátí IAsyncEnumerable<T> , ChannelReader<T> , nebo Task<IAsyncEnumerable<T>> Task<ChannelReader<T>> .

Metoda centra se automaticky stane metodou centra streamování, když vrátí ChannelReader<T> nebo Task<ChannelReader<T>> .

Streamování mezi serverem a klientem

Metody centra streamování se IAsyncEnumerable<T> vracejí navíc k ChannelReader<T> . Nejjednodušší způsob, jak vrátit , je udělat metodu centra asynchronní metodu iterátoru, jak IAsyncEnumerable<T> ukazuje následující ukázka. Asynchronní metody iterátoru centra mohou přijímat parametr, který se aktivuje, když se klient CancellationToken odhlásí ze streamu. Asynchronní metody iterátoru se vyhnout problémům běžným s kanály, například nevrací dostatečně brzy nebo ukončí metodu bez ChannelReader dokončení ChannelWriter<T> .

Poznámka

Následující ukázka vyžaduje C# 8,0 nebo novější.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Následující příklad ukazuje základy streamování dat do klienta pomocí kanálů. Pokaždé, když je objekt zapsán ChannelWriter<T> do objektu , je objekt okamžitě odeslán klientovi. Na konci se dokončí , ChannelWriter aby se klientovi řeklo, že je datový proud zavřený.

Poznámka

Zapište do ChannelWriter<T> pro vlákno na pozadí a vraťte co nejdříve ChannelReader . Ostatní vyvolání centra se zablokují, dokud se ChannelReader nevrátila hodnota .

Zabalte logiku do try ... catch příkazu. Dokončete Channel v finally bloku. Pokud chcete tok chyby, zachyťte ji uvnitř bloku a catch zapište ji do finally bloku.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        Exception localException = null;
        try
        {
            for (var i = 0; i < count; i++)
            {
                // Check the cancellation token regularly so that the server will stop
                // producing items if the client disconnects.
                cancellationToken.ThrowIfCancellationRequested();
                await writer.WriteAsync(i);

                // Use the cancellationToken in other APIs that accept cancellation
                // tokens so the cancellation can flow down to them.
                await Task.Delay(delay, cancellationToken);
            }
        }
        catch (Exception ex)
        {
            localException = ex;
        }
        finally
        {
            writer.Complete(localException);
        }
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(int count, int delay)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay)
    {
        Exception localException = null;
        try
        {
            for (var i = 0; i < count; i++)
            {
                await writer.WriteAsync(i);
                await Task.Delay(delay);
            }
        }
        catch (Exception ex)
        {
            localException = ex;
        }
        finally
        {
            writer.Complete(localException);
        }

    }
}

Metody centra streamování mezi serverem a klientem mohou přijmout parametr, který se aktivuje, když se klient CancellationToken odhlásí ze streamu. Pomocí tohoto tokenu zastavte operaci serveru a uvolněte všechny prostředky, pokud se klient odpojí před koncem streamu.

Streamování mezi klientem a serverem

Metoda centra se automaticky stane metodou centra streamování typu klient-server, když přijme jeden nebo více objektů typu ChannelReader<T> nebo IAsyncEnumerable<T> . Následující příklad ukazuje základy čtení streamovaných dat odesílaných z klienta. Pokaždé, když klient zapíše do , data se zapíšou do na serveru, ze kterého čte ChannelWriter<T> ChannelReader metoda centra.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Následuje IAsyncEnumerable<T> verze metody .

Poznámka

Následující ukázka vyžaduje C# 8,0 nebo novější.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Klient .NET

Streamování mezi serverem a klientem

Metody a v se používají k vyvolání metod streamování mezi StreamAsync StreamAsChannelAsync HubConnection klienty. Předejte název metody centra a argumenty definované v metodě centra do StreamAsync nebo StreamAsChannelAsync . Obecný parametr pro a StreamAsync<T> StreamAsChannelAsync<T> určuje typ objektů vrácených metodou streamování. Objekt typu nebo je vrácen z vyvolání datového proudu a IAsyncEnumerable<T> představuje datový proud na ChannelReader<T> klientovi.

Příklad, StreamAsync který vrací IAsyncEnumerable<int> :

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Odpovídající StreamAsChannelAsync příklad, který vrátí ChannelReader<int> :

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Metoda StreamAsChannelAsync v se používá k vyvolání metody HubConnection streamování mezi serverem a klientem. Předejte metodě hub název a argumenty definované v metodě centra StreamAsChannelAsync do . Obecný parametr v StreamAsChannelAsync<T> parametru určuje typ objektů vrácených metodou streamování. Objekt se vrátí z volání datového proudu ChannelReader<T> a představuje datový proud na klientovi.

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Metoda StreamAsChannelAsync v se používá k vyvolání metody HubConnection streamování mezi serverem a klientem. Předejte metodě hub název a argumenty definované v metodě centra StreamAsChannelAsync do . Obecný parametr v StreamAsChannelAsync<T> parametru určuje typ objektů vrácených metodou streamování. Objekt se vrátí z volání datového proudu ChannelReader<T> a představuje datový proud na klientovi.

var channel = await hubConnection
    .StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Streamování mezi klientem a serverem

Existují dva způsoby, jak vyvolat metodu centra streamování klient-server z klienta .NET. V závislosti na vyvolané metodě centra můžete metodě centra předat jako argument nebo IAsyncEnumerable<T> ChannelReader SendAsync InvokeAsync StreamAsChannelAsync .

Při každém zápisu dat do objektu nebo přijme metoda centra na serveru novou položku s IAsyncEnumerable ChannelWriter daty z klienta.

Pokud používáte objekt IAsyncEnumerable , datový proud skončí po ukončení metody, která vrací položky datového proudu.

Poznámka

Následující ukázka vyžaduje C# 8,0 nebo novější.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Pokud používáte , můžete kanál dokončit ChannelWriter pomocí channel.Writer.Complete() příkazu :

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Klient JavaScriptu

Streamování mezi serverem a klientem

Klienti JavaScriptu volají metody streamování mezi klienty v centrech pomocí connection.stream . Metoda stream přijímá dva argumenty:

  • Název metody centra. V následujícím příkladu má metoda centra název Counter .
  • Argumenty definované v metodě centra. V následujícím příkladu jsou argumenty počet položek datového proudu, které se mají přijmout, a zpoždění mezi položkami datového proudu.

connection.stream vrátí IStreamResult , který obsahuje subscribe metodu . Předejte do a nastavte zpětná volání , a pro příjem oznámení z IStreamSubscriber subscribe next error complete stream volání.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Pokud chcete ukončit stream z klienta, zavolejte metodu pro , dispose ISubscription která je vrácena z metody subscribe . Volání této metody způsobí zrušení CancellationToken parametru metody Hub, pokud jste ho poskytli.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Pokud chcete ukončit stream z klienta, zavolejte metodu pro , dispose ISubscription která je vrácena z metody subscribe .

Streamování mezi klientem a serverem

Klienti JavaScriptu volají metody streamování typu klient-server v centrech předáním jako argumentu metodě , nebo v závislosti na Subject send invoke vyvolané stream metodě centra. Subjectje třída, která vypadá jako Subject . Například v RxJS můžete použít třídu Subject z této knihovny.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Volání subject.next(item) s položkou zapíše položku do datového proudu a metoda centra obdrží položku na serveru.

Pokud chcete datový proud ukončit, zavolejte subject.complete() .

Klient Java

Streamování mezi serverem a klientem

Klient SignalR Java používá k vyvolání metod stream streamování metodu . stream přijímá tři nebo více argumentů:

  • Očekávaný typ položek datového proudu
  • Název metody centra.
  • Argumenty definované v metodě centra.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Metoda stream u HubConnection vrátí Pozorovatelné typu položky datového proudu. Metoda pozorovatelného typu je subscribe místo, kde onNext jsou onError definovány onCompleted obslužné rutiny a .

Streamování mezi klientem a serverem

Klient Javy může volat metody streamování klient-server v centrech předáním pozorovatelného parametru jako argumentu metodě , nebo v závislosti na vyvolané SignalR send invoke stream metodě centra.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Volání stream.onNext(item) s položkou zapíše položku do datového proudu a metoda centra obdrží položku na serveru.

Pokud chcete datový proud ukončit, zavolejte stream.onComplete() .

Další zdroje informací