Didacticiel : générer et utiliser des flux asynchrones à l’aide C# de 8,0 et .net Core 3,0Tutorial: Generate and consume async streams using C# 8.0 and .NET Core 3.0

C# 8.0 introduit des flux asynchrones, ce qui permet de modéliser une source de données en diffusion en continu lorsque les éléments du flux de données peuvent être récupérés ou générés de façon asynchrone.C# 8.0 introduces async streams, which model a streaming source of data when the elements in the data stream may be retrieved or generated asynchronously. Les flux asynchrones sont basés sur de nouvelles interfaces introduites dans .NET Standard 2.1 et implémentées dans .NET Core 3.0 afin de fournir un modèle de programmation naturel pour les sources de données asynchrones en diffusion en continu.Async streams rely on new interfaces introduced in .NET Standard 2.1 and implemented in .NET Core 3.0 to provide a natural programming model for asynchronous streaming data sources.

Dans ce tutoriel, vous allez apprendre à :In this tutorial, you'll learn how to:

  • créer une source de données qui génère une séquence d’éléments de données de façon asynchrone ;Create a data source that generates a sequence of data elements asynchronously.
  • consommer cette source de données de façon asynchrone ;Consume that data source asynchronously.
  • reconnaître quand l’interface et la source de données nouvelles sont préférables aux séquences de données synchrones précédentes.Recognize when the new interface and data source are preferred to earlier synchronous data sequences.

Configuration requisePrerequisites

Vous devez configurer votre ordinateur pour exécuter .NET Core, y compris le C# compilateur 8,0.You’ll need to set up your machine to run .NET Core, including the C# 8.0 compiler. Le C# compilateur 8 est disponible à partir de Visual Studio 2019 version 16,3 ou du Kit de développement logiciel (SDK) .net Core 3,0.The C# 8 compiler is available starting with Visual Studio 2019 version 16.3 or .NET Core 3.0 SDK.

Vous devrez créer un jeton d’accès GitHub afin de pouvoir accéder au point de terminaison GitHub GraphQL.You'll need to create a GitHub access token so that you can access the GitHub GraphQL endpoint. Sélectionnez les autorisations suivantes pour votre jeton d’accès GitHub :Select the following permissions for your GitHub Access Token:

  • repo:statusrepo:status
  • public_repopublic_repo

Enregistrez le jeton d’accès à un endroit sûr afin de pouvoir l’utiliser pour accéder au point de terminaison de l’API GitHub.Save the access token in a safe place so you can use it to gain access to the GitHub API endpoint.

Avertissement

Sécurisez votre jeton d’accès personnel.Keep your personal access token secure. Tous les logiciels disposant de votre jeton d’accès personnel peuvent effectuer des appels d’API GitHub à l’aide de vos droits d’accès.Any software with your personal access token could make GitHub API calls using your access rights.

Ce tutoriel suppose de connaître C# et .NET, y compris Visual Studio ou l’interface CLI .NET Core.This tutorial assumes you're familiar with C# and .NET, including either Visual Studio or the .NET Core CLI.

Exécutez l’application de démarrageRun the starter application

Vous pouvez obtenir le code pour l’application de démarrage utilisée dans ce tutoriel dans notre référentiel dotnet/samples, dossier csharp/tutorials/AsyncStreams.You can get the code for the starter application used in this tutorial from our dotnet/samples repository in the csharp/tutorials/AsyncStreams folder.

L’application de démarrage est une application console qui utilise l’interface GitHub GraphQL pour récupérer des problèmes récents écrits dans le référentiel dotnet/docs.The starter application is a console application that uses the GitHub GraphQL interface to retrieve recent issues written in the dotnet/docs repository. Commencez par examiner le code suivant pour la méthode Main de l’application de démarrage :Start by looking at the following code for the starter app Main method:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Vous pouvez soit définir une variable d’environnement GitHubKey sur votre jeton d’accès personnel, soit remplacer le dernier argument dans l’appel par GenEnvVariable avec votre jeton d’accès personnel.You can either set a GitHubKey environment variable to your personal access token, or you can replace the last argument in the call to GenEnvVariable with your personal access token. Ne placez pas votre code d’accès dans le code source si vous envisagez d’enregistrer la source avec d’autres utilisateurs, ou de la placer dans un référentiel de code source partagé.Don't put your access code in source code if you'll be saving the source with others, or putting it in a shared source repository.

Après la création du client de GitHub, le code dans Main crée un objet de rapport de progression et un jeton d’annulation.After creating the GitHub client, the code in Main creates a progress reporting object and a cancellation token. Une fois que ces objets sont créés, Main appelle runPagedQueryAsync pour récupérer les 250 problèmes créés les plus récents.Once those objects are created, Main calls runPagedQueryAsync to retrieve the most recent 250 created issues. Une fois cette tâche terminée, les résultats sont affichés.After that task has finished, the results are displayed.

Lorsque vous exécutez l’application de démarrage, vous pouvez faire quelques observations importantes concernant son fonctionnement.When you run the starter application, you can make some important observations about how this application runs. Vous voyez la progression signalée pour chaque page retournée à partir de GitHub.You'll see progress reported for each page returned from GitHub. Vous pouvez observer un temps de pause avant le retour de chaque nouvelle page de problèmes par GitHub.You can observe a noticeable pause before GitHub returns each new page of issues. Enfin, les problèmes ne sont affichés que lorsque les 10 pages ont été récupérées à partir de GitHub.Finally, the issues are displayed only after all 10 pages have been retrieved from GitHub.

Examinez l’implémentationExamine the implementation

L’implémentation révèle pourquoi vous avez observé le comportement décrit dans la section précédente.The implementation reveals why you observed the behavior discussed in the previous section. Examinez the code for runPagedQueryAsync :Examine the code for runPagedQueryAsync:

private static async Task<JArray> runPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();
        finalResults.Merge(issues(results)["nodes"]);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Concentrons-nous sur l’algorithme de pagination et sur la structure asynchrone du code précédent.Let's concentrate on the paging algorithm and async structure of the preceding code. (Vous pouvez consulter la documentation de GitHub GraphQL pour plus d’informations sur l’API GraphQL github.) La méthode runPagedQueryAsync énumère les problèmes de la plus récente à la plus ancienne.(You can consult the GitHub GraphQL documentation for details on the GitHub GraphQL API.) The runPagedQueryAsync method enumerates the issues from most recent to oldest. Elle a besoin de 25 problèmes par page et examine la structure pageInfo de la réponse pour continuer avec la page précédente.It requests 25 issues per page and examines the pageInfo structure of the response to continue with the previous page. Cela suit la prise en charge standard de la pagination de GraphQL pour les réponses multipages.That follows GraphQL's standard paging support for multi-page responses. La réponse inclut un objet pageInfo qui contient une valeur hasPreviousPages et une valeur startCursor utilisées pour demander la page précédente.The response includes a pageInfo object that includes a hasPreviousPages value and a startCursor value used to request the previous page. Les problèmes se trouvent dans le tableau nodes.The issues are in the nodes array. La méthode runPagedQueryAsync ajoute ces nœuds à un tableau qui contient tous les résultats de toutes les pages.The runPagedQueryAsync method appends these nodes to an array that contains all the results from all pages.

Après la récupération et la restauration d’une page de résultats, runPagedQueryAsync signale la progression et vérifie l’annulation.After retrieving and restoring a page of results, runPagedQueryAsync reports progress and checks for cancellation. Si l’annulation a été demandée, runPagedQueryAsync lève une OperationCanceledException.If cancellation has been requested, runPagedQueryAsync throws an OperationCanceledException.

Plusieurs éléments de ce code peuvent être améliorés.There are several elements in this code that can be improved. Plus important encore, runPagedQueryAsync doit allouer du stockage pour tous les problèmes retournés.Most importantly, runPagedQueryAsync must allocate storage for all the issues returned. Cet exemple s’arrête à 250 problèmes, car la récupération de tous les problèmes ouverts nécessiterait beaucoup plus de mémoire pour stocker tous les problèmes récupérées.This sample stops at 250 issues because retrieving all open issues would require much more memory to store all the retrieved issues. De plus, les protocoles pour la prise en charge de la progression et de l’annulation rendent l’algorithme plus difficile à comprendre lors de sa première lecture.In addition, the protocols for supporting progress and supporting cancellation make the algorithm harder to understand on its first reading. Vous devez rechercher la classe de progression pour trouver le rapport de progression.You must look for the progress class to find where progress is reported. Vous devez également suivre les communications via le CancellationTokenSource et son CancellationToken associé pour comprendre où l’annulation est demandée et où elle est accordée.You also have to trace the communications through the CancellationTokenSource and its associated CancellationToken to understand where cancellation is requested and where it's granted.

Les flux asynchrones sont mieux adaptésAsync streams provide a better way

Les flux asynchrones et la prise en charge associée du langage résolvent tous ces problèmes.Async streams and the associated language support address all those concerns. Le code qui génère la séquence peut désormais utiliser yield return pour retourner des éléments dans une méthode qui a été déclarée avec le modificateur async.The code that generates the sequence can now use yield return to return elements in a method that was declared with the async modifier. Vous pouvez consommer un flux asynchrone à l’aide une boucle await foreach tout comme vous consommez n’importe quelle séquence à l’aide d’une boucle foreach.You can consume an async stream using an await foreach loop just as you consume any sequence using a foreach loop.

Ces nouvelles fonctionnalités de langage dépendent de trois nouvelles interfaces ajoutées à .NET Standard 2.1 et implémentées dans .NET Core 3.0 :These new language features depend on three new interfaces added to .NET Standard 2.1 and implemented in .NET Core 3.0:

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        T Current { get; }

        ValueTask<bool> MoveNextAsync();
    }
}

namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

Ces trois interfaces sont très certainement familières à la plupart des développeurs C#.These three interfaces should be familiar to most C# developers. Elles se comportent de manière similaire à leurs équivalents synchrones :They behave in a manner similar to their synchronous counterparts:

Il est possible que le type System.Threading.Tasks.ValueTask ne soit pas familier.One type that may be unfamiliar is System.Threading.Tasks.ValueTask. Le struct ValueTask fournit une API similaire à la classe System.Threading.Tasks.Task.The ValueTask struct provides a similar API to the System.Threading.Tasks.Task class. ValueTask est utilisé dans ces interfaces pour des raisons de performances.ValueTask is used in these interfaces for performance reasons.

Convertir en flux asynchronesConvert to async streams

Ensuite, convertissez la méthode runPagedQueryAsync pour générer un flux asynchrone.Next, convert the runPagedQueryAsync method to generate an async stream. Tout d’abord, modifiez la signature de runPagedQueryAsync pour retourner un IAsyncEnumerable<JToken> et supprimer les objets de jeton d’annulation et de progression de la liste de paramètres comme indiqué dans le code suivant :First, change the signature of runPagedQueryAsync to return an IAsyncEnumerable<JToken>, and remove the cancellation token and progress objects from the parameter list as shown in the following code:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client, 
    string queryText, string repoName)

Le code de démarrage traite chaque page lorsqu’elle est récupérée, comme indiqué dans le code suivant :The starter code processes each page as the page is retrieved, as shown in the following code:

finalResults.Merge(issues(results)["nodes"]);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Remplacez ces trois lignes par le code suivant :Replace those three lines with the following code:

foreach (JObject issue in issues(results)["nodes"])
    yield return issue;

Vous pouvez également supprimer la déclaration de finalResults plus tôt dans cette méthode et l’instruction return qui suit la boucle que vous avez modifiée.You can also remove the declaration of finalResults earlier in this method and the return statement that follows the loop you modified.

Vous avez terminé les modifications permettant de générer un flux asynchrone.You've finished the changes to generate an async stream. La méthode terminée doit ressembler au code ci-dessous :The finished method should resemble the code below:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client, 
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();

        foreach (JObject issue in issues(results)["nodes"])
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Ensuite, vous modifiez le code qui utilise la collection pour consommer le flux de données asynchrone.Next, you change the code that consumes the collection to consume the async stream. Recherchez dans Main le code suivant, qui traite l’ensemble des problèmes :Find the following code in Main that processes the collection of issues:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Remplacez-le par la boucle await foreach suivante :Replace that code with the following await foreach loop:

int num = 0;
await foreach (var issue in runPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Vous pouvez obtenir le code du tutoriel terminé dans le référentiel dotnet/samples, dossier csharp/tutorials/AsyncStreams.You can get the code for the finished tutorial from the dotnet/samples repository in the csharp/tutorials/AsyncStreams folder.

Exécutez l'application terminéeRun the finished application

Exécutez de nouveau l'application.Run the application again. Comparez son comportement avec le comportement de l’application de démarrage.Contrast its behavior with the behavior of the starter application. La première page de résultats est énumérée dès qu’elle est disponible.The first page of results is enumerated as soon as it's available. Une pause peut être observée lorsque chaque nouvelle page est demandée et récupérée, puis les résultats de la page suivante sont rapidement énumérés.There's an observable pause as each new page is requested and retrieved, then the next page's results are quickly enumerated. Le bloc try / catch n’est pas nécessaire pour gérer l’annulation : l’appelant peut arrêter l’énumération de la collection.The try / catch block isn't needed to handle cancellation: the caller can stop enumerating the collection. Le rapport de progression est clair, car le flux asynchrone génère des résultats à mesure que chaque page est téléchargée.Progress is clearly reported because the async stream generates results as each page is downloaded. L’état de chaque problème renvoyé est inclus en toute transparence dans la boucle await foreach.The status for each issue returned is seamlessly included in the await foreach loop. Vous n’avez pas besoin d’un objet de rappel pour suivre la progression.You don't need a callback object to track progress.

Vous pouvez voir des améliorations lors de l’utilisation de mémoire en examinant le code.You can see improvements in memory use by examining the code. Vous n’avez plus besoin d’allouer une collection pour stocker tous les résultats avant qu’ils ne soient énumérés.You no longer need to allocate a collection to store all the results before they're enumerated. L’appelant peut déterminer comment utiliser les résultats et si une collection de stockage est nécessaire.The caller can determine how to consume the results and if a storage collection is needed.

Exécutez les applications de démarrage et les applications terminées. Ceci vous permettra d’observer les différences entre les implémentations pour vous.Run both the starter and finished applications and you can observe the differences between the implementations for yourself. À la fin de ce tutoriel, vous pouvez supprimer le jeton d’accès GitHub que vous avez créé au début.You can delete the GitHub access token you created when you started this tutorial after you've finished. Si un attaquant arrive à accéder à ce jeton, il pourrait accéder aux API GitHub à l’aide de vos informations d’identification.If an attacker gained access to that token, they could access GitHub APIs using your credentials.