ASP.NET Core SignalR에서 스트리밍 사용

작성자: Brennan Conroy

ASP.NET Core SignalR은 클라이언트에서 서버로, 서버에서 클라이언트로의 스트리밍을 지원합니다. 이는 시간이 지남에 따라 데이터 조각이 도착하는 시나리오에 유용합니다. 스트리밍할 때 각 조각은 모든 데이터를 사용할 수 있을 때까지 기다리지 않고 사용할 수 있게 되는 즉시 클라이언트 또는 서버로 전송됩니다.

샘플 코드 보기 및 다운로드(다운로드 방법)

스트리밍을 위한 허브 설정

허브 메서드는 IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>> 또는 Task<ChannelReader<T>>를 반환할 때 자동으로 스트리밍 허브 메서드가 됩니다.

서버-클라이언트 스트리밍

스트리밍 허브 메서드는 ChannelReader<T> 외에도 IAsyncEnumerable<T>을 반환할 수 있습니다. IAsyncEnumerable<T>을 반환하는 가장 간단한 방법은 다음 샘플과 같이 허브 메서드를 비동기 반복기 메서드로 만드는 것입니다. 허브 비동기 반복기 메서드는 클라이언트가 스트림에서 구독을 취소할 때 트리거되는 CancellationToken 매개 변수를 수락할 수 있습니다. 비동기 반복기 메서드는 ChannelWriter<T>를 완료하지 않고 조기에 ChannelReader를 반환하거나 메서드를 종료하지 않는 등 채널과 관련된 문제를 방지합니다.

참고 항목

다음 샘플에는 C# 8.0 이상이 필요합니다.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

다음 샘플에서는 채널을 사용하여 클라이언트로 데이터를 스트리밍하는 기본 사항을 보여 줍니다. 개체가 ChannelWriter<T>에 기록될 때마다 개체는 클라이언트에 즉시 전송됩니다. 마지막에 ChannelWriter가 완료되어 클라이언트에 스트림이 닫혔음을 알립니다.

참고 항목

백그라운드 스레드에서 ChannelWriter<T>에 쓰고 최대한 빨리 ChannelReader를 반환합니다. 다른 허브 호출은 ChannelReader가 반환될 때까지 차단됩니다.

try ... catch에 논리를 래핑합니다. finally 블록에서 Channel을 완료합니다. 오류를 흐르게 하려면 catch 블록 내에서 오류를 캡처하고 finally 블록에 씁니다.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

서버-클라이언트 스트리밍 허브 메서드는 클라이언트가 스트림에서 구독을 취소할 때 트리거되는 CancellationToken 매개 변수를 수락할 수 있습니다. 이 토큰을 사용하여 서버 작업을 중지하고 스트림이 끝나기 전에 클라이언트의 연결이 끊어지면 리소스를 해제합니다.

클라이언트-서버 스트리밍

허브 메서드는 ChannelReader<T> 또는 IAsyncEnumerable<T> 형식의 개체를 하나 이상 허용하면 자동으로 클라이언트-서버 스트리밍 허브 메서드가 됩니다. 다음 샘플에서는 클라이언트에서 보낸 스트리밍 데이터를 읽는 기본 사항을 보여 줍니다. 클라이언트가 ChannelWriter<T>에 쓸 때마다 허브 메서드가 읽는 서버의 ChannelReader에 데이터가 기록됩니다.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

메서드의 IAsyncEnumerable<T> 버전은 다음과 같습니다.

참고 항목

다음 샘플에는 C# 8.0 이상이 필요합니다.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

.NET 클라이언트

서버-클라이언트 스트리밍

HubConnectionStreamAsyncStreamAsChannelAsync 메서드는 서버-클라이언트 스트리밍 메서드를 호출하는 데 사용됩니다. 허브 메서드에 정의된 허브 메서드 이름 및 인수를 StreamAsync 또는 StreamAsChannelAsync에 전달합니다. StreamAsync<T>StreamAsChannelAsync<T>의 제네릭 매개 변수는 스트리밍 메서드에서 반환하는 개체의 형식을 지정합니다. IAsyncEnumerable<T> 또는 ChannelReader<T> 형식의 개체가 스트림 호출에서 반환되고 클라이언트의 스트림을 나타냅니다.

IAsyncEnumerable<int>을 반환하는 StreamAsync 예제입니다.

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

ChannelReader<int>를 반환하는 해당 StreamAsChannelAsync 예제입니다.

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

위의 코드에서:

  • HubConnectionStreamAsChannelAsync 메서드는 서버-클라이언트 스트리밍 메서드를 호출하는 데 사용됩니다. 허브 메서드에 정의된 허브 메서드 이름 및 인수를 StreamAsChannelAsync에 전달합니다.
  • StreamAsChannelAsync<T>의 제네릭 매개 변수는 스트리밍 메서드에서 반환하는 개체의 형식을 지정합니다.
  • ChannelReader<T>는 스트림 호출에서 반환되고 클라이언트의 스트림을 나타냅니다.

클라이언트-서버 스트리밍

.NET 클라이언트에서 클라이언트-서버 스트리밍 허브 메서드를 호출하는 방법에는 두 가지가 있습니다. IAsyncEnumerable<T> 또는 ChannelReader를 호출된 허브 메서드에 따라 SendAsync, InvokeAsync 또는 StreamAsChannelAsync에 인수로 전달할 수 있습니다.

IAsyncEnumerable 또는 ChannelWriter 개체에 데이터를 쓸 때마다 서버의 허브 메서드는 클라이언트의 데이터가 있는 새 항목을 받습니다.

IAsyncEnumerable 개체를 사용하는 경우 스트림 항목을 반환하는 메서드가 종료된 후 스트림이 종료됩니다.

참고 항목

다음 샘플에는 C# 8.0 이상이 필요합니다.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

또는 ChannelWriter를 사용하는 경우 channel.Writer.Complete()를 사용하여 채널을 완료합니다.

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

JavaScript 클라이언트

서버-클라이언트 스트리밍

JavaScript 클라이언트는 connection.stream을 사용하여 허브에서 서버-클라이언트 스트리밍 메서드를 호출합니다. stream 메서드는 두 개의 인수를 허용합니다.

  • 허브 메서드의 이름. 다음 예제에서 허브 메서드 이름은 Counter입니다.
  • 허브 메서드에 정의된 인수입니다. 다음 예제에서 인수는 수신할 스트림 항목 수와 스트림 항목 간의 지연 횟수입니다.

connection.streamsubscribe 메서드를 포함하는 IStreamResult를 반환합니다. IStreamSubscribersubscribe에 전달하고 next, errorcomplete 콜백을 설정하여 stream 호출에서 알림을 받습니다.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

클라이언트에서 스트림을 종료하려면 subscribe 메서드에서 반환된 ISubscription에서 dispose 메서드를 호출합니다. 이 메서드를 호출하면 허브 메서드의 CancellationToken 매개 변수가 취소됩니다(제공된 경우).

클라이언트-서버 스트리밍

JavaScript 클라이언트는 호출된 허브 메서드에 따라 Subjectsend, invoke 또는 stream에 인수로 전달하여 허브에서 클라이언트-서버 스트리밍 메서드를 호출합니다. SubjectSubject와 같은 클래스입니다. 예를 들어 RxJS에서 해당 라이브러리의 Subject 클래스를 사용할 수 있습니다.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

항목을 사용하여 subject.next(item)를 호출하면 스트림에 항목이 기록되고 허브 메서드가 서버의 항목을 받습니다.

스트림을 종료하려면 subject.complete()를 호출합니다.

Java 클라이언트

서버-클라이언트 스트리밍

SignalR Java 클라이언트는 stream 메서드를 사용하여 스트리밍 메서드를 호출합니다. stream은 세 개 이상의 인수를 허용합니다.

  • 스트림 항목의 예상 형식입니다.
  • 허브 메서드의 이름.
  • 허브 메서드에 정의된 인수입니다.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

HubConnectionstream 메서드는 스트림 항목 형식의 Observable을 반환합니다. Observable 형식의 subscribe 메서드는 onNext, onErroronCompleted 처리기가 정의된 위치입니다.

클라이언트-서버 스트리밍

SignalR Java 클라이언트는 호출된 허브 메서드에 따라 Observablesend, invoke 또는 stream에 인수로 전달하여 허브에서 클라이언트-서버 스트리밍 메서드를 호출할 수 있습니다.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

항목을 사용하여 stream.onNext(item)를 호출하면 스트림에 항목이 기록되고 허브 메서드가 서버의 항목을 받습니다.

스트림을 종료하려면 stream.onComplete()를 호출합니다.

추가 리소스