教程:使用 C# 8.0 和 .NET Core 3.0 生成和使用异步流Tutorial: Generate and consume async streams using C# 8.0 and .NET Core 3.0

C# 8.0 引入了异步流,这可针对流式处理数据源建模 。C# 8.0 introduces async streams, which model a streaming source of data. 数据流经常异步检索或生成元素。Data streams often retrieve or generate elements asynchronously. 异步流依赖于 .NET Standard 2.1 中引入的新接口。Async streams rely on new interfaces introduced in .NET Standard 2.1. .NET Core 3.0 以及更高版本支持这些接口。These interfaces are supported in .NET Core 3.0 and later. 它们为异步流式处理数据源提供了自然编程模型。They provide a natural programming model for asynchronous streaming data sources.

在本教程中,你将了解:In this tutorial, you'll learn how to:

  • 创建以异步方式生成数据元素序列的数据源。Create a data source that generates a sequence of data elements asynchronously.
  • 以异步方式使用该数据源。Consume that data source asynchronously.
  • 支持异步流的取消和捕获的上下文。Support cancellation and captured contexts for asynchronous streams.
  • 识别新接口和数据源何时优先于先前的同步数据序列。Recognize when the new interface and data source are preferred to earlier synchronous data sequences.

先决条件Prerequisites

需要将计算机设置为运行 .NET Core,包括 C# 8.0 编译器。You'll need to set up your machine to run .NET Core, including the C# 8.0 compiler. Visual Studio 2019 版本 16.3.NET Core 3.0 SDK 起,开始随附 C# 8 编译器。The C# 8 compiler is available starting with Visual Studio 2019 version 16.3 or .NET Core 3.0 SDK.

将需要创建 GitHub 访问令牌,以便可以访问 GitHub GraphQL 终结点。You'll need to create a GitHub access token so that you can access the GitHub GraphQL endpoint. 为 GitHub 访问令牌选择以下权限:Select the following permissions for your GitHub Access Token:

  • repo:statusrepo:status
  • public_repopublic_repo

将访问令牌保存在安全位置,以便可以使用它来访问 GitHub API 终结点。Save the access token in a safe place so you can use it to gain access to the GitHub API endpoint.

警告

保护个人访问令牌。Keep your personal access token secure. 任何带有你的个人访问令牌的软件都可以使用你的访问权限进行 GitHub API 调用。Any software with your personal access token could make GitHub API calls using your access rights.

本教程假设你熟悉 C# 和 .NET,包括 Visual Studio 或 .NET Core CLI。This tutorial assumes you're familiar with C# and .NET, including either Visual Studio or the .NET Core CLI.

运行初学者应用程序Run the starter application

可以从 csharp/tutorials/AsyncStreams 文件夹中的 dotnet/docs 存储库获得本教程中使用的初学者应用程序代码。You can get the code for the starter application used in this tutorial from the dotnet/docs repository in the csharp/tutorials/AsyncStreams folder.

初学者应用程序是一个控制台应用程序,它使用 GitHub GraphQL 接口检索最近在 dotnet/docs 存储库中编写的问题。The starter application is a console application that uses the GitHub GraphQL interface to retrieve recent issues written in the dotnet/docs repository. 首先来看一下以下初学者应用 Main 方法的代码:Start by looking at the following code for the starter app Main method:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

可以将 GitHubKey 环境变量设置为个人访问令牌,也可以将对 GenEnvVariable 的调用中的最后一个参数替换为个人访问令牌。You can either set a GitHubKey environment variable to your personal access token, or you can replace the last argument in the call to GenEnvVariable with your personal access token. 如果要与其他人共享源,请不要将访问代码放在源代码中。Don't put your access code in source code if you'll be sharing the source with others. 不要将访问代码上传到共享源存储库。Never upload access codes to a shared source repository.

在创建 GitHub 客户端后,Main 中的代码将创建一个进度报告对象和一个取消令牌。After creating the GitHub client, the code in Main creates a progress reporting object and a cancellation token. 创建这些对象之后,Main 调用 runPagedQueryAsync 来检索最近创建的 250 个问题。Once those objects are created, Main calls runPagedQueryAsync to retrieve the most recent 250 created issues. 任务完成后,将显示结果。After that task has finished, the results are displayed.

在运行初学者应用程序时,可以对该应用程序的运行方式进行一些重要观察。When you run the starter application, you can make some important observations about how this application runs. 将看到从 GitHub 返回的每个页面的进度报告。You'll see progress reported for each page returned from GitHub. 在 GitHub 返回问题的每个新页面之前,可以观察到明显的停顿。You can observe a noticeable pause before GitHub returns each new page of issues. 最后,只有在从 GitHub 检索到所有 10 个页面之后,问题才会显示出来。Finally, the issues are displayed only after all 10 pages have been retrieved from GitHub.

检查实现情况Examine the implementation

该实现揭示了你观察到上一部分中讨论的行为的原因。The implementation reveals why you observed the behavior discussed in the previous section. 检查 runPagedQueryAsync 的代码:Examine the code for runPagedQueryAsync:

private static async Task<JArray> runPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();
        finalResults.Merge(issues(results)["nodes"]);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

让我们集中讨论前面代码的分页算法和异步结构。Let's concentrate on the paging algorithm and async structure of the preceding code. (有关 GitHub GraphQL API 的详细信息,可以参考 GitHub GraphQL 文档。)runPagedQueryAsync 方法按从最新到最旧的顺序枚举问题。(You can consult the GitHub GraphQL documentation for details on the GitHub GraphQL API.) The runPagedQueryAsync method enumerates the issues from most recent to oldest. 它每页请求 25 个问题,并检查响应的 pageInfo 结构以继续上一页的操作。It requests 25 issues per page and examines the pageInfo structure of the response to continue with the previous page. 这遵循了 GraphQL 对多页响应的标准分页支持。That follows GraphQL's standard paging support for multi-page responses. 响应包括 pageInfo 对象,该对象包含用于请求上一页的 hasPreviousPages 值和 startCursor 值。The response includes a pageInfo object that includes a hasPreviousPages value and a startCursor value used to request the previous page. 问题在 nodes 数组中。The issues are in the nodes array. runPagedQueryAsync 方法将这些节点追加到一个数组中,其中包含所有页面的所有结果。The runPagedQueryAsync method appends these nodes to an array that contains all the results from all pages.

在检索和还原结果页之后,runPagedQueryAsync 将报告进度并检查是否取消。After retrieving and restoring a page of results, runPagedQueryAsync reports progress and checks for cancellation. 如果已请求取消,runPagedQueryAsync 将引发 OperationCanceledExceptionIf cancellation has been requested, runPagedQueryAsync throws an OperationCanceledException.

此代码中有几个可以改进的元素。There are several elements in this code that can be improved. 最重要的是,runPagedQueryAsync 必须为返回的所有问题分配存储空间。Most importantly, runPagedQueryAsync must allocate storage for all the issues returned. 该示例在 250 个问题处停止,因为检索所有未决问题需要更多的内存来存储所有检索到的问题。This sample stops at 250 issues because retrieving all open issues would require much more memory to store all the retrieved issues. 支持进度报告和取消的协议使得算法在第一次读取时更加难以理解。The protocols for supporting progress reports and cancellation make the algorithm harder to understand on its first reading. 涉及更多类型和 API。More types and APIs are involved. 必须通过 CancellationTokenSource 及其关联的 CancellationToken 跟踪通信,以了解在何处请求取消,以及在何处授予取消。You must trace the communications through the CancellationTokenSource and its associated CancellationToken to understand where cancellation is requested and where it's granted.

异步流可提供更好的方法Async streams provide a better way

异步流和关联语言支持解决了所有这些问题。Async streams and the associated language support address all those concerns. 生成序列的代码现在可以使用 yield return 返回用 async 修饰符声明的方法中的元素。The code that generates the sequence can now use yield return to return elements in a method that was declared with the async modifier. 可以通过 await foreach 循环来使用异步流,就像通过 foreach 循环使用任何序列一样。You can consume an async stream using an await foreach loop just as you consume any sequence using a foreach loop.

这些新语言功能依赖于添加到 .NET Standard 2.1 并在 .NET Core 3.0 中实现的三个新接口:These new language features depend on three new interfaces added to .NET Standard 2.1 and implemented in .NET Core 3.0:

大多数 C# 开发人员都应该熟悉这三个接口。These three interfaces should be familiar to most C# developers. 它们的行为方式类似于其对应的同步对象:They behave in a manner similar to their synchronous counterparts:

可能不熟悉的一种类型是 System.Threading.Tasks.ValueTaskOne type that may be unfamiliar is System.Threading.Tasks.ValueTask. ValueTask 结构提供了与 System.Threading.Tasks.Task 类类似的 API。The ValueTask struct provides a similar API to the System.Threading.Tasks.Task class. 出于性能方面的原因,这些接口中使用了 ValueTaskValueTask is used in these interfaces for performance reasons.

转换为异步流Convert to async streams

接下来,转换 runPagedQueryAsync 方法以生成异步流。Next, convert the runPagedQueryAsync method to generate an async stream. 首先,更改 runPagedQueryAsync 的签名以返回 IAsyncEnumerable<JToken>,并从参数列表删除取消令牌和进度对象,如以下代码所示:First, change the signature of runPagedQueryAsync to return an IAsyncEnumerable<JToken>, and remove the cancellation token and progress objects from the parameter list as shown in the following code:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

起始代码在检索页面时处理每个页面,如以下代码所示:The starter code processes each page as the page is retrieved, as shown in the following code:

finalResults.Merge(issues(results)["nodes"]);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

将这三行替换为以下代码:Replace those three lines with the following code:

foreach (JObject issue in issues(results)["nodes"])
    yield return issue;

还可以在此方法中删除前面的 finalResults 声明以及你修改的循环之后的 return 语句。You can also remove the declaration of finalResults earlier in this method and the return statement that follows the loop you modified.

已完成更改以生成异步流。You've finished the changes to generate an async stream. 已完成的方法应与以下代码类似:The finished method should resemble the following code:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();

        foreach (JObject issue in issues(results)["nodes"])
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

接下来,将使用集合的代码更改为使用异步流。Next, you change the code that consumes the collection to consume the async stream. Main 中找到以下处理问题集合的代码:Find the following code in Main that processes the collection of issues:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

将该代码替换为以下 await foreach 循环:Replace that code with the following await foreach loop:

int num = 0;
await foreach (var issue in runPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

新接口 IAsyncEnumerator<T> 派生自 IAsyncDisposableThe new interface IAsyncEnumerator<T> derives from IAsyncDisposable. 这意味着在循环完成时,前面的循环会以异步方式释放流。That means the preceding loop will asynchronously dispose the stream when the loop finishes. 可以假设循环类似于以下代码:You can imagine the loop looks like the following code:

int num = 0;
var enumerator = runPagedQueryAsync(client, PagedIssueQuery, "docs").GetEnumeratorAsync();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

默认情况下,在捕获的上下文中处理流元素。By default, stream elements are processed in the captured context. 如果要禁用上下文捕获,请使用 TaskAsyncEnumerableExtensions.ConfigureAwait 扩展方法。If you want to disable capturing of the context, use the TaskAsyncEnumerableExtensions.ConfigureAwait extension method. 有关同步上下文并捕获当前上下文的详细信息,请参阅有关使用基于任务的异步模式的文章。For more information about synchronization contexts and capturing the current context, see the article on consuming the Task-based asynchronous pattern.

异步流支持使用与其他 async 方法相同的协议的取消。Async streams support cancellation using the same protocol as other async methods. 要支持取消,请按如下所示修改异步迭代器方法的签名:You would modify the signature for the async iterator method as follows to support cancellation:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();

        foreach (JObject issue in issues(results)["nodes"])
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

EnumeratorCancellationAttribute 属性导致编译器生成 IAsyncEnumerator<T> 的代码,该代码使传递给 GetAsyncEnumerator 的令牌对作为该参数的异步迭代器的主体可见。The EnumeratorCancellationAttribute attribute causes the compiler to generate code for the IAsyncEnumerator<T> that makes the token passed to GetAsyncEnumerator visible to the body of the async iterator as that argument. runQueryAsync 中,可以检查令牌的状态,并在请求时取消进一步的工作。Inside runQueryAsync, you could examine the state of the token and cancel further work if requested.

使用另一个扩展方法 WithCancellation,将取消标记传递给异步流。You use another extension method, WithCancellation, to pass the cancellation token to the async stream. 可以按如下所示修改枚举问题的循环:You would modify the loop enumerating the issues as follows:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in runPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

可以从 csharp/tutorials/AsyncStreams 文件夹中的 dotnet/docs 存储库获得完成教程的代码。You can get the code for the finished tutorial from the dotnet/docs repository in the csharp/tutorials/AsyncStreams folder.

运行完成的应用程序Run the finished application

再次运行该应用程序。Run the application again. 将其行为与初学者应用程序的行为进行对比。Contrast its behavior with the behavior of the starter application. 会在结果的第一页可用立即对其进行枚举。The first page of results is enumerated as soon as it's available. 在请求和检索每个新页面时都会有一个可观察到的暂停,然后快速枚举下一页结果。There's an observable pause as each new page is requested and retrieved, then the next page's results are quickly enumerated. 不需要 try / catch 块来处理取消:调用者可以停止枚举集合。The try / catch block isn't needed to handle cancellation: the caller can stop enumerating the collection. 由于异步流在下载每个页面时生成结果,因此可以清楚地报告进度。Progress is clearly reported because the async stream generates results as each page is downloaded. 返回的每个问题的状态都无缝包含在 await foreach 循环中。The status for each issue returned is seamlessly included in the await foreach loop. 不需要回调对象即可跟踪进度。You don't need a callback object to track progress.

通过检查代码,可以看到内存使用方面的改进。You can see improvements in memory use by examining the code. 不再需要在枚举所有结果之前分配一个集合来存储它们。You no longer need to allocate a collection to store all the results before they're enumerated. 调用者可以决定如何使用结果,以及是否需要存储集合。The caller can determine how to consume the results and if a storage collection is needed.

运行初学者应用程序和已完成的应用程序,可以自行观察实现之间的差异。Run both the starter and finished applications and you can observe the differences between the implementations for yourself. 可以在完成本教程后删除在开始学习本教程时创建的 GitHub 访问令牌。You can delete the GitHub access token you created when you started this tutorial after you've finished. 如果攻击者获得了对该令牌的访问权限,他们可以使用你的凭据来访问 GitHub API。If an attacker gained access to that token, they could access GitHub APIs using your credentials.