Didacticiel : générer et utiliser des flux asynchrones à l’aide de C# 8,0 et .NET Core 3,0

C# 8,0 introduit des flux asynchrones, qui modélisent une source de données de diffusion en continu. Les flux de données récupèrent et génèrent souvent des éléments de façon asynchrone. Les flux asynchrones s’appuient sur les nouvelles interfaces introduites dans .NET Standard 2,1. Ces interfaces sont prises en charge dans .NET Core 3,0 et versions ultérieures. Ils fournissent un modèle de programmation naturel pour les sources de données de streaming asynchrones.

Dans ce tutoriel, vous apprendrez à :

  • créer une source de données qui génère une séquence d’éléments de données de façon asynchrone ;
  • consommer cette source de données de façon asynchrone ;
  • Prendre en charge l’annulation et les contextes capturés pour les flux asynchrones.
  • reconnaître quand l’interface et la source de données nouvelles sont préférables aux séquences de données synchrones précédentes.

Prérequis

Vous devez configurer votre ordinateur pour exécuter .NET Core, y compris le compilateur C# 8,0. Le compilateur C# 8 est disponible à partir de Visual Studio 2019 version 16,3 ou du Kit de développement logiciel (SDK) .net Core 3,0.

Vous devrez créer un jeton d’accès GitHub afin de pouvoir accéder au point de terminaison GitHub GraphQL. Sélectionnez les autorisations suivantes pour votre jeton d’accès GitHub :

  • repo:status
  • public_repo

Enregistrez le jeton d’accès à un endroit sûr afin de pouvoir l’utiliser pour accéder au point de terminaison de l’API GitHub.

Avertissement

Sécurisez votre jeton d’accès personnel. Tous les logiciels disposant de votre jeton d’accès personnel peuvent effectuer des appels d’API GitHub à l’aide de vos droits d’accès.

Ce tutoriel suppose de connaître C# et .NET, y compris Visual Studio ou l’interface CLI .NET Core.

Exécutez l’application de démarrage

Vous pouvez obtenir le code de l’application de démarrage utilisée dans ce didacticiel à partir du dépôt dotnet/docs dans le dossier CSharp/guide-Nouveautés/didacticiels .

L’application de démarrage est une application console qui utilise l’interface GitHub GraphQL pour récupérer des problèmes récents écrits dans le référentiel dotnet/docs. Commencez par examiner le code suivant pour la méthode Main de l’application de démarrage :

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Vous pouvez soit définir une variable d’environnement GitHubKey sur votre jeton d’accès personnel, soit remplacer le dernier argument dans l’appel par GetEnvVariable avec votre jeton d’accès personnel. Ne placez pas votre code d’accès dans le code source si vous souhaitez partager la source avec d’autres utilisateurs. Ne téléchargez jamais de codes d’accès dans un référentiel source partagé.

Après la création du client de GitHub, le code dans Main crée un objet de rapport de progression et un jeton d’annulation. Une fois que ces objets sont créés, Main appelle runPagedQueryAsync pour récupérer les 250 problèmes créés les plus récents. Une fois cette tâche terminée, les résultats sont affichés.

Lorsque vous exécutez l’application de démarrage, vous pouvez faire quelques observations importantes concernant son fonctionnement. Vous voyez la progression signalée pour chaque page retournée à partir de GitHub. Vous pouvez observer un temps de pause avant le retour de chaque nouvelle page de problèmes par GitHub. Enfin, les problèmes ne sont affichés que lorsque les 10 pages ont été récupérées à partir de GitHub.

Examinez l’implémentation

L’implémentation révèle pourquoi vous avez observé le comportement décrit dans la section précédente. Examinez the code for runPagedQueryAsync :

private static async Task<JArray> runPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();
        finalResults.Merge(issues(results)["nodes"]);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Concentrons-nous sur l’algorithme de pagination et sur la structure asynchrone du code précédent. (Vous pouvez consulter la documentation de GitHub GraphQL pour plus d’informations sur l’API GraphQL github.) La runPagedQueryAsync méthode énumère les problèmes du plus récent au plus ancien. Elle a besoin de 25 problèmes par page et examine la structure pageInfo de la réponse pour continuer avec la page précédente. Cela suit la prise en charge standard de la pagination de GraphQL pour les réponses multipages. La réponse inclut un objet pageInfo qui contient une valeur hasPreviousPages et une valeur startCursor utilisées pour demander la page précédente. Les problèmes se trouvent dans le tableau nodes. La méthode runPagedQueryAsync ajoute ces nœuds à un tableau qui contient tous les résultats de toutes les pages.

Après la récupération et la restauration d’une page de résultats, runPagedQueryAsync signale la progression et vérifie l’annulation. Si l’annulation a été demandée, runPagedQueryAsync lève une OperationCanceledException.

Plusieurs éléments de ce code peuvent être améliorés. Plus important encore, runPagedQueryAsync doit allouer du stockage pour tous les problèmes retournés. Cet exemple s’arrête à 250 problèmes, car la récupération de tous les problèmes ouverts nécessiterait beaucoup plus de mémoire pour stocker tous les problèmes récupérées. Les protocoles de prise en charge des rapports de progression et de l’annulation rendent l’algorithme plus difficile à comprendre lors de sa première lecture. D’autres types et API sont impliqués. Vous devez suivre les communications via le CancellationTokenSource et son associé CancellationToken pour comprendre où l’annulation est demandée et où elle est accordée.

Les flux asynchrones sont mieux adaptés

Les flux asynchrones et la prise en charge associée du langage résolvent tous ces problèmes. Le code qui génère la séquence peut désormais utiliser yield return pour retourner des éléments dans une méthode qui a été déclarée avec le modificateur async. Vous pouvez consommer un flux asynchrone à l’aide une boucle await foreach tout comme vous consommez n’importe quelle séquence à l’aide d’une boucle foreach.

Ces nouvelles fonctionnalités de langage dépendent de trois nouvelles interfaces ajoutées à .NET Standard 2.1 et implémentées dans .NET Core 3.0 :

Ces trois interfaces sont très certainement familières à la plupart des développeurs C#. Elles se comportent de manière similaire à leurs équivalents synchrones :

Il est possible que le type System.Threading.Tasks.ValueTask ne soit pas familier. Le struct ValueTask fournit une API similaire à la classe System.Threading.Tasks.Task. ValueTask est utilisé dans ces interfaces pour des raisons de performances.

Convertir en flux asynchrones

Ensuite, convertissez la méthode runPagedQueryAsync pour générer un flux asynchrone. Tout d’abord, modifiez la signature de runPagedQueryAsync pour retourner un IAsyncEnumerable<JToken> et supprimer les objets de jeton d’annulation et de progression de la liste de paramètres comme indiqué dans le code suivant :

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Le code de démarrage traite chaque page lorsqu’elle est récupérée, comme indiqué dans le code suivant :

finalResults.Merge(issues(results)["nodes"]);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Remplacez ces trois lignes par le code suivant :

foreach (JObject issue in issues(results)["nodes"])
    yield return issue;

Vous pouvez également supprimer la déclaration de finalResults plus tôt dans cette méthode et l’instruction return qui suit la boucle que vous avez modifiée.

Vous avez terminé les modifications permettant de générer un flux asynchrone. La méthode terminée doit ressembler au code suivant :

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();

        foreach (JObject issue in issues(results)["nodes"])
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Ensuite, vous modifiez le code qui utilise la collection pour consommer le flux de données asynchrone. Recherchez dans Main le code suivant, qui traite l’ensemble des problèmes :

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Remplacez-le par la boucle await foreach suivante :

int num = 0;
await foreach (var issue in runPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

La nouvelle interface IAsyncEnumerator<T> dérive de IAsyncDisposable . Cela signifie que la boucle précédente disposera de façon asynchrone le flux lorsque la boucle se terminera. Vous pouvez imaginer que la boucle ressemble au code suivant :

int num = 0;
var enumerator = runPagedQueryAsync(client, PagedIssueQuery, "docs").GetEnumeratorAsync();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Par défaut, les éléments de flux sont traités dans le contexte capturé. Si vous souhaitez désactiver la capture du contexte, utilisez la TaskAsyncEnumerableExtensions.ConfigureAwait méthode d’extension. Pour plus d’informations sur les contextes de synchronisation et la capture du contexte actuel, consultez l’article sur l' utilisation du modèle asynchrone basésur des tâches.

Les flux asynchrones prennent en charge l’annulation en utilisant le même protocole que les autres async méthodes. Vous pouvez modifier la signature de la méthode d’itérateur Async comme suit pour prendre en charge l’annulation :

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();

        foreach (JObject issue in issues(results)["nodes"])
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

L' EnumeratorCancellationAttribute attribut force le compilateur à générer du code pour le IAsyncEnumerator<T> qui rend le jeton passé GetAsyncEnumerator visible au corps de l’itérateur Async comme cet argument. Dans runQueryAsync , vous pouvez examiner l’état du jeton et annuler un travail supplémentaire si nécessaire.

Vous utilisez une autre méthode d’extension, WithCancellation , pour transmettre le jeton d’annulation au flux asynchrone. Vous pouvez modifier la boucle en énumérant les problèmes comme suit :

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in runPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Vous pouvez obtenir le code du didacticiel terminé à partir du dépôt dotnet/docs dans le dossier CSharp/guide-Nouveautés/didacticiels .

Exécutez l'application terminée

Exécutez de nouveau l'application. Comparez son comportement avec le comportement de l’application de démarrage. La première page de résultats est énumérée dès qu’elle est disponible. Une pause peut être observée lorsque chaque nouvelle page est demandée et récupérée, puis les résultats de la page suivante sont rapidement énumérés. Le bloc try / catch n’est pas nécessaire pour gérer l’annulation : l’appelant peut arrêter l’énumération de la collection. Le rapport de progression est clair, car le flux asynchrone génère des résultats à mesure que chaque page est téléchargée. L’état de chaque problème renvoyé est inclus en toute transparence dans la await foreach boucle. Vous n’avez pas besoin d’un objet de rappel pour suivre la progression.

Vous pouvez voir des améliorations lors de l’utilisation de mémoire en examinant le code. Vous n’avez plus besoin d’allouer une collection pour stocker tous les résultats avant qu’ils ne soient énumérés. L’appelant peut déterminer comment utiliser les résultats et si une collection de stockage est nécessaire.

Exécutez les applications de démarrage et les applications terminées. Ceci vous permettra d’observer les différences entre les implémentations pour vous. À la fin de ce tutoriel, vous pouvez supprimer le jeton d’accès GitHub que vous avez créé au début. Si un attaquant arrive à accéder à ce jeton, il pourrait accéder aux API GitHub à l’aide de vos informations d’identification.