ASP.NET Core'da akış kullanmaSignalR

Con conjo tarafından yazıldı

SignalRASP.NET Core istemciden sunucuya ve sunucudan istemciye akışı destekler. Bu, veri parçalarının zaman içinde varma senaryolarında kullanışlıdır. Akışta her parça, tüm verilerin kullanılabilir olması için beklemek yerine, kullanılabilir hale geldiğinde istemciye veya sunucuya gönderilir.

SignalRASP.NET Core , sunucu yöntemlerinin akış dönüş değerlerini destekler. Bu, veri parçalarının zaman içinde varma senaryolarında kullanışlıdır. İstemciye bir dönüş değeri akışı olduğunda, her parça tüm verilerin kullanılabilir duruma dönmesini beklemek yerine, kullanılabilir hale geldiğinde istemciye gönderilir.

Örnek kodu görüntüleme veya indirme ( nasılindir)

Akış için hub ayarlama

Hub yöntemi , , veya döndürerek otomatik olarak bir akış IAsyncEnumerable<T> ChannelReader<T> hub'ı yöntemi Task<IAsyncEnumerable<T>> Task<ChannelReader<T>> olur.

Hub yöntemi, veya döndüren otomatik olarak bir akış hub'ı yöntemi ChannelReader<T> Task<ChannelReader<T>> olur.

Sunucudan istemciye akış

Akış hub'ı yöntemleri ek IAsyncEnumerable<T> olarak geri ChannelReader<T> dönebilirsiniz. Geri dönmenin en basit yolu, aşağıdaki örnekte de olduğu gibi hub yöntemini zaman uyumsuz bir IAsyncEnumerable<T> tekrarlayıcı yöntemi yapmaktır. Hub zaman uyumsuz iterator yöntemleri, istemcinin akış aboneliğini CancellationToken kaldıran bir parametreyi kabul eder. Zaman uyumsuz tekrarlayıcı yöntemler, Kanallarda yaygın olarak karşılaşılan sorunları (örneğin, yeterince erken döndürerek veya tamamlamadan ChannelReader yöntemden çıkmama gibi) ChannelWriter<T> önler.

Not

Aşağıdaki örnek C# 8.0 veya sonraki bir güncelleştirmesi gerektirir.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Aşağıdaki örnek, Kanallar kullanarak istemciye veri akışıyla ilgili temel bilgileri gösterir. nesnesine her yazıldığı ChannelWriter<T> zaman, nesnesi istemciye hemen gönderilir. Sonunda, ChannelWriter istemciye akışın kapalı olduğunu söylemek için tamamlanır.

Not

Bir arka ChannelWriter<T> plan iş parçacığında üzerine yazın ve en ChannelReader kısa zamanda geri yazın. Diğer hub çağrıları, döndürülene kadar ChannelReader engellenir.

Bir deyiminde mantığı try ... catch sarmala. bir Channel bloğunda 'i finally tamamlar. Hata akışı yapmak için bloğun içinde yakalayıp catch bloğuna finally yazın.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        Exception localException = null;
        try
        {
            for (var i = 0; i < count; i++)
            {
                // Check the cancellation token regularly so that the server will stop
                // producing items if the client disconnects.
                cancellationToken.ThrowIfCancellationRequested();
                await writer.WriteAsync(i);

                // Use the cancellationToken in other APIs that accept cancellation
                // tokens so the cancellation can flow down to them.
                await Task.Delay(delay, cancellationToken);
            }
        }
        catch (Exception ex)
        {
            localException = ex;
        }
        finally
        {
            writer.Complete(localException);
        }
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(int count, int delay)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay)
    {
        Exception localException = null;
        try
        {
            for (var i = 0; i < count; i++)
            {
                await writer.WriteAsync(i);
                await Task.Delay(delay);
            }
        }
        catch (Exception ex)
        {
            localException = ex;
        }
        finally
        {
            writer.Complete(localException);
        }

    }
}

Sunucudan istemciye akış hub'ı yöntemleri, istemcinin akış aboneliğini CancellationToken kaldıran bir parametreyi kabul eder. Bu belirteci kullanarak sunucu işlemi durdurun ve istemcinin bağlantısı akışın sonundan önce kesse tüm kaynakları serbest bırakın.

İstemciden sunucuya akış

Hub yöntemi, veya türünde bir veya daha fazla nesne kabul ederse otomatik olarak istemciden sunucuya akış hub'ı yöntemi ChannelReader<T> IAsyncEnumerable<T> olur. Aşağıdaki örnek, istemciden gönderilen akış verilerini okumanın temellerini gösterir. İstemci ' ye ChannelWriter<T> yazdığında, veriler ChannelReader hub yönteminin okuduğu sunucuya yazılır.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Yönteminin IAsyncEnumerable<T> bir sürümü aşağıdaki gibidir.

Not

Aşağıdaki örnek C# 8.0 veya sonraki bir güncelleştirmesi gerektirir.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

.NET istemcisi

Sunucudan istemciye akış

üzerinde StreamAsync ve StreamAsChannelAsync HubConnection yöntemleri, sunucudan istemciye akış yöntemlerini çağırmak için kullanılır. Hub yönteminde tanımlanan hub yöntemi adını ve bağımsız değişkenlerini veya yöntemine StreamAsync StreamAsChannelAsync iletir. ve üzerinde genel StreamAsync<T> StreamAsChannelAsync<T> parametresi, akış yöntemi tarafından döndürülen nesnelerin türünü belirtir. veya türünde bir IAsyncEnumerable<T> ChannelReader<T> nesne, akış çağırmadan döndürülür ve akışı istemcide temsil eder.

döndüren StreamAsync bir IAsyncEnumerable<int> örnek:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

döndüren StreamAsChannelAsync karşılık gelen bir ChannelReader<int> örnek:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

üzerinde StreamAsChannelAsync HubConnection yöntemi, bir sunucudan istemciye akış yöntemini çağırmak için kullanılır. Hub yönteminde tanımlanan hub yöntemi adını ve bağımsız değişkenlerini yöntemine StreamAsChannelAsync iletir. üzerinde genel StreamAsChannelAsync<T> parametresi, akış yöntemi tarafından döndürülen nesnelerin türünü belirtir. Akış ChannelReader<T> çağırmadan döndürülür ve istemcide akışı temsil eder.

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

üzerinde StreamAsChannelAsync HubConnection yöntemi, bir sunucudan istemciye akış yöntemini çağırmak için kullanılır. Hub yönteminde tanımlanan hub yöntemi adını ve bağımsız değişkenlerini yöntemine StreamAsChannelAsync iletir. üzerinde genel StreamAsChannelAsync<T> parametresi, akış yöntemi tarafından döndürülen nesnelerin türünü belirtir. Akış ChannelReader<T> çağırmadan döndürülür ve istemcide akışı temsil eder.

var channel = await hubConnection
    .StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

İstemciden sunucuya akış

.NET istemciden istemciden sunucuya akış hub'ı yöntemini çağırmanın iki yolu vardır. Çağrılan hub yöntemine bağlı olarak , veya için bağımsız değişken olarak bir veya IAsyncEnumerable<T> ChannelReader SendAsync InvokeAsync StreamAsChannelAsync geçebilirsiniz.

veya nesnesine her veri yazıldığı zaman, sunucu üzerinde hub yöntemi IAsyncEnumerable ChannelWriter istemciden verilerle yeni bir öğe alır.

Nesne IAsyncEnumerable kullanıyorsanız, akış öğeleri döndüren yöntem çıkış sonrasında akış sona erer.

Not

Aşağıdaki örnek C# 8.0 veya sonraki bir güncelleştirmesi gerektirir.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Veya kullanıyorsanız, ChannelWriter kanalı ile tamamlarsanız: channel.Writer.Complete()

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

JavaScript istemcisi

Sunucudan istemciye akış

JavaScript istemcileri ile hub'larda sunucudan istemciye akış yöntemlerini connection.stream çağırıyor. yöntemi stream iki bağımsız değişken kabul eder:

  • Hub yönteminin adı. Aşağıdaki örnekte hub yöntemi adı Counter olur.
  • Hub yönteminde tanımlanan bağımsız değişkenler. Aşağıdaki örnekte bağımsız değişkenler, alacak akış öğelerinin sayısı ve akış öğeleri arasındaki gecikme sayısıdır.

connection.stream , bir IStreamResult yöntemi içeren bir subscribe döndürür. çağrısından bildirim almak için , ve geri çağırmalarını 'a iletir ve IStreamSubscriber subscribe next error complete stream ayarlayın.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

akışı istemciden sona erer, dispose yönteminden ISubscription döndürülen üzerinde yöntemini subscribe çağırma. Bu yöntemin çağrılsı, CancellationToken sağladıysanız Hub yönteminin parametresinin iptal edilmesine neden olur.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

akışı istemciden sona erer, dispose yönteminden ISubscription döndürülen üzerinde yöntemini subscribe çağırma.

İstemciden sunucuya akış

JavaScript istemcileri, çağrılan hub yöntemine bağlı olarak , veya için bağımsız değişken olarak bir geçirmeyi kullanarak hub'larda istemciden sunucuya Subject send akış yöntemlerini invoke stream çağırır. Subject, gibi görünen bir sınıftır. Subject Örneğin, RxJS'de bu kitaplıktan Konu sınıfını kullanabilirsiniz.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Bir subject.next(item) öğeyle çağrısı yapmak öğeyi akışa yazar ve hub yöntemi öğeyi sunucudan alır.

Akışı sona erdir etmek için çağrısında subject.complete() bulundu.

Java istemcisi

Sunucudan istemciye akış

SignalRJava istemcisi, akış stream yöntemlerini çağırmak için yöntemini kullanır. stream üç veya daha fazla bağımsız değişken kabul eder:

  • Akış öğelerinin beklenen türü.
  • Hub yönteminin adı.
  • Hub yönteminde tanımlanan bağımsız değişkenler.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

üzerinde stream HubConnection yöntemi, akış öğesi türünün Observable öğesini döndürür. Gözlemlenebilir türünün subscribe yöntemi, ve onNext onError onCompleted işleyicilerin tanımlandığı yöntemdir.

İstemciden sunucuya akış

Java istemcisi, çağrılan hub yöntemine bağlı olarak , veya bağımsız değişkeni olarak Gözlemlenebilir'i geçerek hub'larda istemciden sunucuya akış SignalR send yöntemlerini invoke stream çağırabilir.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Bir stream.onNext(item) öğeyle çağrısı yapmak öğeyi akışa yazar ve hub yöntemi öğeyi sunucudan alır.

Akışı sona erdir etmek için çağrısında stream.onComplete() bulundu.

Ek kaynaklar