자습서: C# 및 .NET을 사용하여 비동기 스트림 생성 및 사용

비동기 스트림은 데이터의 스트리밍 원본을 모델링합니다. 데이터 스트림은 종종 요소를 비동기적으로 검색하거나 생성합니다. 비동기 스트리밍 데이터 소스의 자연스러운 프로그래밍 모델을 제공합니다.

이 자습서에서는 다음과 같은 작업을 수행하는 방법을 알아봅니다.

  • 데이터 요소 시퀀스를 비동기적으로 생성하는 데이터 소스를 만듭니다.
  • 데이터 소스를 비동기적으로 사용합니다.
  • 비동기 스트림의 취소 및 캡처된 컨텍스트를 지원합니다.
  • 새 인터페이스 및 데이터 소스가 이전 동기 데이터 시퀀스로 기본 설정되는 경우를 인식합니다.

사전 요구 사항

C# 컴파일러를 포함하여 .NET을 실행하도록 머신을 설정해야 합니다. C# 컴파일러는 Visual Studio 2022 또는 .NET SDK에서 사용할 수 있습니다.

GitHub GraphQL 엔드포인트에 액세스할 수 있도록 GitHub 액세스 토큰을 만들어야 합니다. GitHub 액세스 토큰에 사용할 다음 권한을 선택합니다.

  • repo:status
  • public_repo

GitHub API 엔드포인트의 액세스 권한을 부여하는 데 사용할 수 있도록 액세스 토큰을 안전한 장소에 보관합니다.

경고

개인용 액세스 토큰을 안전하게 보관합니다. 개인용 액세스 토큰이 있는 소프트웨어는 액세스 권한을 사용하여 GitHub API 호출을 수행할 수 있습니다.

이 자습서에서는 여러분이 Visual Studio 또는 .NET CLI를 비롯한 C# 및 .NET에 익숙하다고 가정합니다.

시작 애플리케이션 실행

asynchronous-programming/snippets 폴더의 dotnet/docs 리포지토리에서 이 자습서에 사용된 시작 애플리케이션의 코드를 가져올 수 있습니다.

시작 애플리케이션은 GitHub GraphQL 인터페이스를 사용하여 dotnet/docs 리포지토리에 기록된 최근 문제를 검색하는 콘솔 애플리케이션입니다. 먼저 시작 앱 Main 메서드의 다음 코드를 살펴봅니다.

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

GitHubKey 환경 변수를 개인용 액세스 토큰으로 설정하거나, GetEnvVariable 호출의 마지막 인수를 개인용 액세스 토큰으로 바꿀 수 있습니다. 소스를 다른 사용자와 공유하는 경우 소스 코드에 액세스 코드를 넣지 마세요. 공유된 소스 리포지토리에 액세스 코드를 업로드하지 마세요.

GitHub 클라이언트를 만든 후 Main의 코드는 진행 보고 개체 및 취소 토큰을 만듭니다. 해당 개체가 만들어지면 MainRunPagedQueryAsync를 호출하여 250개의 가장 최근 생성된 문제를 검색합니다. 작업이 완료되면 결과가 표시됩니다.

시작 애플리케이션을 실행할 때 이 애플리케이션의 실행 방식에 대한 몇 가지 중요한 사항을 관찰할 수 있습니다. GitHub에서 반환된 각 페이지에 대해 보고된 진행 상황이 표시됩니다. GitHub가 각 새로운 문제 페이지를 반환하기 전에 분명한 일시 정지를 관찰할 수 있습니다. 마지막으로 이 문제는 10페이지가 GitHub에서 모두 검색된 후에만 표시됩니다.

구현 살펴보기

구현은 이전 섹션에서 설명한 동작을 관찰한 이유를 드러냅니다. RunPagedQueryAsync에 대한 코드를 검사합니다.

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

이 메서드가 가장 먼저 수행하는 작업은 GraphQLRequest 클래스를 사용하여 POST 개체를 만드는 것입니다.

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

POST 개체 본문을 구성하고 \(백슬래시) 이스케이프 문자로 표시된 요청 본문에서 줄 바꿈 문자를 모두 제거하는 ToJsonText 메서드를 사용하여 단일 문자열로 표시된 JSON으로 올바르게 변환하는 데 도움이 됩니다.

앞 코드의 페이징 알고리즘 및 비동기 구조를 중점적으로 살펴보겠습니다. (GitHub GraphQL API에 대한 자세한 내용은 GitHub GraphQL 설명서를 참조하세요.) RunPagedQueryAsync 메서드는 가장 최근에서 가장 오래된 순서로 문제를 열거합니다. 이 메서드는 페이지당 25개 문제를 요청하고 응답의 pageInfo 구조체를 검사하여 이전 페이지를 계속 진행합니다. 다중 페이지 응답에 대한 GraphQL의 표준 페이징 지원을 따릅니다. 응답에는 이전 페이지를 요청하는 데 사용되는 hasPreviousPages 값과 startCursor 값을 포함하는 pageInfo 개체가 포함됩니다. 문제는 nodes 배열에 있습니다. RunPagedQueryAsync 메서드는 모든 페이지의 모든 결과를 포함하는 배열에 해당 노드를 추가합니다.

결과 페이지를 검색 및 복원한 후에 RunPagedQueryAsync가 진행 상황을 보고하고 취소를 확인합니다. 취소가 요청된 경우 RunPagedQueryAsyncOperationCanceledException을 throw합니다.

이 코드에서 여러 가지 요소를 개선할 수 있습니다. 가장 중요한 것은 RunPagedQueryAsync가 반환된 모든 문제에 대해 스토리지를 할당해야 한다는 것입니다. 모든 미해결 문제를 검색하면 모든 검색된 문제를 저장하는 데 훨씬 더 많은 메모리가 필요하므로 이 샘플은 250개 문제만 검색합니다. 진행률 보고서 및 취소 지원 프로토콜로 인해 알고리즘을 처음 읽을 때 이해하기가 더 어려워집니다. 추가 형식 및 API가 포함됩니다. 취소 요청 위치 및 제공 위치를 파악하려면 CancellationTokenSource 및 연결된 CancellationToken을 통해 통신을 추적해야 합니다.

더 나은 방법을 제공하는 비동기 스트림

비동기 스트림 및 연결된 언어 지원으로 해당 문제가 모두 해결됩니다. 시퀀스를 생성하는 코드에서 이제 yield return을 사용하여 async 한정자로 선언된 메서드에서 요소를 반환할 수 있습니다. foreach 루프를 통해 시퀀스를 사용하는 것처럼 await foreach 루프를 통해 비동기 스트림을 사용할 수 있습니다.

이 새로운 언어 기능은 .NET Standard 2.1에 추가되고 .NET Core 3.0에 구현된 세 가지 새 인터페이스를 사용합니다.

이 세 가지 인터페이스는 대부분의 C# 개발자에게 익숙합니다. 이 인터페이스는 동기 인터페이스와 유사한 방식으로 작동합니다.

익숙하지 않을 수도 있는 하나의 형식은 System.Threading.Tasks.ValueTask입니다. ValueTask 구조체는 System.Threading.Tasks.Task 클래스에 유사한 API를 제공합니다. ValueTask는 성능상의 이유로 해당 인터페이스에서 사용됩니다.

비동기 스트림으로 변환

그런 다음, RunPagedQueryAsync 메서드를 변환하여 비동기 스트림을 생성합니다. 먼저 RunPagedQueryAsync의 시그니처를 변경하여 IAsyncEnumerable<JToken>을 반환하고 다음 코드에 표시된 대로 매개 변수 목록에서 취소 토큰 및 진행 개체를 제거합니다.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

시작 코드는 다음 코드에 표시된 대로 페이지가 검색될 때 각 페이지를 처리합니다.

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

해당 세 줄을 다음 코드로 바꿉니다.

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

이 메서드의 앞부분에 있는 finalResults 선언 및 수정한 루프 뒤에 있는 return 문을 제거할 수도 있습니다.

비동기 스트림을 생성하기 위한 변경을 완료했습니다. 완료된 메서드는 다음 코드와 유사합니다.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

그런 다음, 컬렉션을 사용하는 코드를 변경하여 비동기 스트림을 사용합니다. 문제 컬렉션을 처리하는 Main에서 다음 코드를 찾습니다.

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

해당 코드를 다음 await foreach 루프로 바꿉니다.

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

새 인터페이스 IAsyncEnumerator<T>IAsyncDisposable에서 파생됩니다. 즉, 루프가 완료되면 이전 루프가 스트림을 비동기적으로 삭제합니다. 루프는 다음 코드와 같이 표시될 수 있습니다.

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

기본적으로 스트림 요소는 캡처된 컨텍스트에서 처리됩니다. 컨텍스트 캡처를 사용하지 않도록 설정하려면 TaskAsyncEnumerableExtensions.ConfigureAwait 확장 메서드를 사용합니다. 동기화 컨텍스트 및 현재 컨텍스트 캡처에 대한 자세한 내용은 작업 기반 비동기 패턴 사용에 대한 문서를 참조하세요.

비동기 스트림은 다른 async 메서드와 동일한 프로토콜을 사용하여 취소를 지원합니다. 취소를 지원하기 위해 다음과 같이 비동기 반복기 메서드의 시그니처를 수정합니다.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

System.Runtime.CompilerServices.EnumeratorCancellationAttribute 특성을 사용하면 컴파일러는 비동기 반복기 본문에 표시되는 GetAsyncEnumerator에 전달된 토큰을 해당 인수로 만드는 IAsyncEnumerator<T>에 관한 코드를 생성합니다. runQueryAsync 내에서 토큰 상태를 검사하고 요청 시 추가 작업을 취소할 수 있습니다.

다른 확장 메서드인 WithCancellation을 사용하여 취소 토큰을 비동기 스트림에 전달합니다. 다음과 같이 문제를 열거하는 루프를 수정합니다.

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

asynchronous-programming/snippets 폴더의 dotnet/docs 리포지토리에서 완성된 자습서의 코드를 가져올 수 있습니다.

완료된 애플리케이션 실행

애플리케이션을 다시 실행합니다. 해당 동작을 시작 애플리케이션의 동작과 대조합니다. 결과의 첫 번째 페이지는 사용 가능한 즉시 열거됩니다. 새로운 각 페이지가 요청 및 검색될 때 확인 가능한 일시 정지가 있고 난 뒤 다음 페이지의 결과가 빠르게 열거됩니다. 취소를 처리하는 데는 try / catch 블록이 필요하지 않습니다. 호출자가 컬렉션의 열거를 중지할 수 있습니다. 각 페이지가 다운로드될 때 비동기 스트림이 결과를 생성하므로 진행이 명확하게 보고됩니다. 반환된 각 문제에 대한 상태는 await foreach 루프에 포함됩니다. 진행 상황을 추적하는 데 콜백 개체가 필요하지 않습니다.

코드를 검사하여 향상된 메모리 사용을 확인할 수 있습니다. 열거되기 전에 모든 결과를 저장하기 위해 더 이상 컬렉션을 할당할 필요가 없습니다. 호출자는 결과를 사용하는 방법 및 스토리지 컬렉션이 필요한지 여부를 결정할 수 있습니다.

시작 및 완료된 애플리케이션을 둘 다 실행하고 직접 구현 간 차이를 관찰할 수 있습니다. 완료한 후 이 자습서를 시작할 때 만든 GitHub 액세스 토큰을 삭제할 수 있습니다. 공격자가 해당 토큰의 액세스 권한을 얻으면 사용자의 자격 증명을 사용하여 GitHub API에 액세스할 수 있습니다.

이 자습서에서는 비동기 스트림을 사용하여 데이터 페이지를 반환하는 네트워크 API에서 개별 항목을 읽었습니다. 비동기 스트림은 주식 시세 또는 센서 디바이스와 같은 "종료되지 않는 스트림"에서 읽을 수도 있습니다. MoveNextAsync에 대한 호출은 사용 가능한 즉시 다음 항목을 반환합니다.