Руководство. Создание и использование асинхронных потоков с помощью C# и .NET

Асинхронные потоки моделит источник потоковой передачи данных. Потоки данных часто извлекают или создают элементы асинхронно. Они предоставляют естественную модель программирования для асинхронных потоковых источников данных.

Из этого руководства вы узнаете, как выполнять следующие задачи:

  • Создать источник данных, который формирует последовательность элементов данных асинхронно.
  • Использовать этот источник данных асинхронно.
  • Поддержка отмены и перехваченных контекстов для асинхронных потоков.
  • Распознавать, когда новый интерфейс и источник данных предпочтительнее для более ранних синхронных последовательностей данных.

Необходимые компоненты

Вам потребуется настроить компьютер для запуска .NET, включая компилятор C#. Компилятор C# доступен в Visual Studio 2022 или пакете SDK для .NET.

Чтобы вы могли получить доступ к конечной точке GraphQL GitHub, необходимо создать маркер доступа GitHub. Выберите следующие разрешения для маркеров доступа GitHub.

  • repo:status
  • public_repo

Храните маркер доступа в надежном месте, чтобы вы могли использовать его для получения доступа к конечной точке API GitHub.

Предупреждение

Храните свой личный маркер доступа в безопасном месте. Любое программное обеспечение с вашим личным маркером доступа может выполнять вызовы API GitHub с помощью ваших прав доступа.

В этом руководстве предполагается, что вы знакомы с C# и .NET, включая Visual Studio или .NET CLI.

Запуск начального приложения

Вы можете получить код для начального приложения, используемого в этом руководстве, из репозитория dotnet/docs в папке асинхронного программирования или фрагментов кода.

Начальное приложение представляет собой консольное приложение, которое использует интерфейс GraphQL GitHub для получения последних проблем, написанных в репозитории dotnet/docs. Начнем с просмотра следующего кода для метода Main начального приложения.

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Вы можете задать переменную среды GitHubKey личному маркеру доступа или заменить последний аргумент в вызове на GetEnvVariable с помощью личного маркера доступа. Не размещайте код доступа в исходном коде, если будете предоставлять общий доступ к источнику другим пользователям. Никогда не отправляйте коды доступа в репозиторий с общим исходным кодом.

После создания клиента GitHub код в Main создает объект отчета о ходе выполнения и маркер отмены. После создания этих объектов Main вызывает RunPagedQueryAsync, чтобы получить более 250 недавно созданных проблем. Результаты отобразятся после выполнения этой задачи.

При запуске начального приложения вы можете обнаружить некоторые важные замечания о том, как будет выполняться приложение. Вы увидите ход выполнения, передаваемый каждой странице, возвращенной с GitHub. Прежде чем GitHub вернет каждую новую страницу проблем, возникает заметная пауза. Наконец, проблемы отображаются только после того, как получены все 10 страниц с GitHub.

Изучение реализации

Реализация показывает, почему возникло поведение, обсуждавшееся в предыдущем разделе. Изучите код для RunPagedQueryAsync.

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Первое, что делает этот метод, заключается в создании объекта POST с помощью GraphQLRequest класса:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

который помогает сформировать текст объекта POST и правильно преобразовать его в JSON, представленный в виде одной строки с методом ToJsonText , который удаляет все новые символы из текста запроса, \ помечая их с помощью escape-символа (обратная косая черта).

Давайте сконцентрируемся на алгоритме разбивки по страницам и асинхронной структуре предыдущего кода. (Вы можете проконсультироваться с Документация по GitHub GraphQL для получения сведений об API GraphQL GitHub.) Метод RunPagedQueryAsync перечисляет проблемы, возникающие от последнего до самого старого. Чтобы продолжить с предыдущей страницы, он запрашивает по 25 выпусков на страницу и проверяет структуру ответа pageInfo. Это следует за стандартной поддержкой страниц GraphQL для многостраничных ответов. Ответ включает в себя объект pageInfo, который содержит значение hasPreviousPages и startCursor, используемые для запроса предыдущей страницы. Проблемы в массиве nodes. Метод RunPagedQueryAsync добавляет эти узлы в массив, который содержит результаты со всех страниц.

После получения и восстановления страницы результатов RunPagedQueryAsync сообщает о ходе выполнения и проверяет наличие отмены. Если есть запрос на отмену, RunPagedQueryAsync выдает OperationCanceledException.

Существует несколько элементов в этом коде, которые можно улучшить. Самое главное, RunPagedQueryAsync должен выделить хранилище для всех возвращенных проблем. Этот пример останавливается после нахождения 250 проблем, так как для извлечения всех открытых проблем потребуется гораздо больше памяти на их хранение. Протоколы для поддержки отчетов о ходе выполнения и отмены делают алгоритм более сложным для понимания при первом чтении. Задействовано больше типов и API. Вы должны отслеживать передачу данных с помощью CancellationTokenSource и связанного с ним CancellationToken, чтобы понять, где запрашивается отмена и где она предоставляется.

Предоставление лучшего способа асинхронных потоков

Асинхронные потоки и связанная языковая поддержка обращаются ко всем этим вопросам. Код, который формирует последовательность, теперь может использовать yield return для возврата элементов в методе, который был объявлен с помощью модификатора async. Вы можете применить асинхронный поток, используя цикл await foreach, аналогично любой последовательности с помощью цикла foreach.

Эти новые языковые функции зависят от трех новых интерфейсов, добавленных в .NET Standard 2.1 и реализованных в .NET Core 3.0.

Большинство разработчиков C# должны знать об этих трех интерфейсах. Они ведут себя подобно своим синхронным аналогам.

Один тип, который может быть незнаком, — System.Threading.Tasks.ValueTask. Структура ValueTask предоставляет API, аналогичный классу System.Threading.Tasks.Task. ValueTask используется в этих интерфейсах по причинам производительности.

Преобразование в асинхронные потоки

Затем для создания асинхронного потока преобразуйте метод RunPagedQueryAsync. Сначала измените подпись RunPagedQueryAsync, чтобы вернуть IAsyncEnumerable<JToken>, затем удалите маркер отмены и объекты хода выполнения из списка параметров, как показано в следующем коде.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

В следующем коде показано, как начальный код обрабатывает каждую страницу для извлечения.

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Замените эти три строки следующим кодом.

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Вы также можете удалить объявление finalResults ранее в этом методе и оператор return, следующий за измененным циклом.

Вы завершили изменения для создания асинхронного потока. Готовый метод должен иметь вид, аналогичный приведенному ниже коду:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Затем измените код, который использует коллекцию, для асинхронного потока. Найдите следующий код в Main, который обрабатывает коллекцию проблем.

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Замените код следующим циклом await foreach.

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Новый интерфейс IAsyncEnumerator<T> является производным от IAsyncDisposable. Это означает, что предыдущий цикл будет асинхронно удалять поток по завершении цикла. Цикл похож на следующий код:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Элементы потока по умолчанию обрабатываются в захваченном контексте. Чтобы отключить захват контекста, используйте метод расширения TaskAsyncEnumerableExtensions.ConfigureAwait. Дополнительные сведения о контекстах синхронизации и захвате текущего контекста см. в статье, посвященной использованию асинхронной модели на основе задач.

Асинхронные потоки поддерживают отмену, используя тот же протокол, что и другие методы async. Для поддержки отмены можно изменить сигнатуру для метода асинхронного итератора следующим образом:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Атрибут System.Runtime.CompilerServices.EnumeratorCancellationAttribute заставляет компилятор создать код для IAsyncEnumerator<T>, который делает токен, передаваемый GetAsyncEnumerator, видимым в тексте асинхронного итератора в виде аргумента. Внутри runQueryAsync можно проверить состояние маркера и отменить дальнейшую работу при необходимости.

Используйте другой метод расширения, WithCancellation, чтобы передать токен отмены асинхронному потоку. Измените цикл, перечисляя проблемы следующим образом:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Код готового руководства можно получить из репозитория dotnet/docs в папке асинхронного программирования и фрагментов кода.

Запуск готового приложения

Повторный запуск приложения Сравните его поведение с поведением начального приложения. Первая страница результатов перечисляется, как только она становится доступной. Поскольку каждую новую страницу запрашивают и извлекают, результаты следующей страницы быстро перечисляются, возникает пауза. Блок try / catch не требует обработки отмены. Вызывающий может прекратить перечисление коллекции. Отчет о ходе выполнения четко сформирован, так как асинхронный поток формирует результаты скачивания каждой страницы. Состояние каждой возвращенной проблемы включается в цикл await foreach. Для отслеживания хода выполнения объект обратного вызова не требуется.

Изучив код, вы увидите улучшения в использовании памяти. Вам больше не нужно выделять коллекцию для хранения всех результатов до их перечисления. Вызывающий может определить, как использовать результаты и нужен ли набор хранилищ.

Запустите начальное и готовое приложение, и вы увидите различия между реализациями самостоятельно. Вы можете удалить маркер доступа GitHub, созданный при начале работы с этим руководством, после завершения изучения. Если злоумышленник получил доступ к этому маркеру, ему удастся получить доступ к API GitHub с помощью ваших учетных данных.

В этом руководстве вы использовали асинхронные потоки для чтения отдельных элементов из сетевого API, который возвращает страницы данных. Асинхронные потоки также могут читаться из "никогда не завершающихся потоков", таких как тикер акций или устройство датчика. Вызов, возвращающий MoveNextAsync следующий элемент, как только он доступен.