Tutoriel : Générer et consommer des flux asynchrones en utilisant C# et .NET

Les flux asynchrones modélisent une source de données de streaming. Souvent, les flux de données récupèrent ou génèrent des éléments de manière asynchrone. Ils fournissent un modèle de programmation naturel pour les sources de données de streaming asynchrones.

Ce didacticiel vous montre comment effectuer les opérations suivantes :

  • créer une source de données qui génère une séquence d’éléments de données de façon asynchrone ;
  • consommer cette source de données de façon asynchrone ;
  • prendre en charge l’annulation et les contextes capturés pour les flux asynchrones ;
  • reconnaître quand l’interface et la source de données nouvelles sont préférables aux séquences de données synchrones précédentes.

Prérequis

Vous devrez configurer votre ordinateur de façon à exécuter .NET, avec le compilateur C#. Le compilateur C# est accessible via Visual Studio 2022 ou le SDK .NET.

Vous devrez créer un jeton d’accès GitHub afin de pouvoir accéder au point de terminaison GitHub GraphQL. Sélectionnez les autorisations suivantes pour votre jeton d’accès GitHub :

  • repo:status
  • public_repo

Enregistrez le jeton d’accès à un endroit sûr afin de pouvoir l’utiliser pour accéder au point de terminaison de l’API GitHub.

Avertissement

Sécurisez votre jeton d’accès personnel. Tous les logiciels disposant de votre jeton d’accès personnel peuvent effectuer des appels d’API GitHub à l’aide de vos droits d’accès.

Ce tutoriel suppose de connaître C# et .NET, y compris Visual Studio ou l’interface CLI .NET.

Exécutez l’application de démarrage

Vous pouvez vous procurer le code de l’application de démarrage utilisée dans ce tutoriel dans notre référentiel dotnet/docs situé dans le dossier asynchronous-programming/snippets.

L’application de démarrage est une application console qui utilise l’interface GitHub GraphQL pour récupérer des problèmes récents écrits dans le référentiel dotnet/docs. Commencez par examiner le code suivant pour la méthode Main de l’application de démarrage :

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Vous pouvez soit définir une variable d’environnement GitHubKey sur votre jeton d’accès personnel, soit remplacer le dernier argument dans l’appel par GetEnvVariable avec votre jeton d’accès personnel. Ne placez pas votre code d’accès dans le code source si vous prévoyez de partager la source avec d’autres utilisateurs. Ne chargez jamais de codes d’accès dans un référentiel source partagé.

Après la création du client de GitHub, le code dans Main crée un objet de rapport de progression et un jeton d’annulation. Une fois que ces objets sont créés, Main appelle RunPagedQueryAsync pour récupérer les 250 problèmes créés les plus récents. Une fois cette tâche terminée, les résultats sont affichés.

Lorsque vous exécutez l’application de démarrage, vous pouvez faire quelques observations importantes concernant son fonctionnement. Vous voyez la progression signalée pour chaque page retournée à partir de GitHub. Vous pouvez observer un temps de pause avant le retour de chaque nouvelle page de problèmes par GitHub. Enfin, les problèmes ne sont affichés que lorsque les 10 pages ont été récupérées à partir de GitHub.

Examinez l’implémentation

L’implémentation révèle pourquoi vous avez observé le comportement décrit dans la section précédente. Examinez the code for RunPagedQueryAsync :

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

La première action effectuée par cette méthode est de créer l’objet POST, à l’aide de la classe GraphQLRequest :

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

qui permet de former le corps de l’objet POST et de le convertir correctement au format JSON présenté sous forme de chaîne unique avec la méthode ToJsonText. Cela supprime ainsi tous les caractères nouvelle ligne du corps de la demande en les marquant avec le caractère d’échappement \ (barre oblique inverse).

Concentrons-nous sur l’algorithme de pagination et sur la structure asynchrone du code précédent. (Vous pouvez consulter la documentation GitHub GraphQL pour obtenir des détails sur l’API GitHub GraphQL.) La méthode RunPagedQueryAsync énumère les problèmes du plus récent au plus ancien. Elle a besoin de 25 problèmes par page et examine la structure pageInfo de la réponse pour continuer avec la page précédente. Cela suit la prise en charge standard de la pagination de GraphQL pour les réponses multipages. La réponse inclut un objet pageInfo qui contient une valeur hasPreviousPages et une valeur startCursor utilisées pour demander la page précédente. Les problèmes se trouvent dans le tableau nodes. La méthode RunPagedQueryAsync ajoute ces nœuds à un tableau qui contient tous les résultats de toutes les pages.

Après la récupération et la restauration d’une page de résultats, RunPagedQueryAsync signale la progression et vérifie l’annulation. Si l’annulation a été demandée, RunPagedQueryAsync lève une OperationCanceledException.

Plusieurs éléments de ce code peuvent être améliorés. Plus important encore, RunPagedQueryAsync doit allouer du stockage pour tous les problèmes retournés. Cet exemple s’arrête à 250 problèmes, car la récupération de tous les problèmes ouverts nécessiterait beaucoup plus de mémoire pour stocker tous les problèmes récupérées. Les protocoles pour la prise en charge des rapports de progression et de l’annulation rendent l’algorithme plus difficile à comprendre lors de sa première lecture. D’autres types et API sont impliqués. Vous devez tracer les communications via le CancellationTokenSource et son CancellationToken associé pour comprendre où l’annulation est demandée et où elle est accordée.

Les flux asynchrones sont mieux adaptés

Les flux asynchrones et la prise en charge associée du langage résolvent tous ces problèmes. Le code qui génère la séquence peut désormais utiliser yield return pour retourner des éléments dans une méthode qui a été déclarée avec le modificateur async. Vous pouvez consommer un flux asynchrone à l’aide une boucle await foreach tout comme vous consommez n’importe quelle séquence à l’aide d’une boucle foreach.

Ces nouvelles fonctionnalités de langage dépendent de trois nouvelles interfaces ajoutées à .NET Standard 2.1 et implémentées dans .NET Core 3.0 :

Ces trois interfaces sont très certainement familières à la plupart des développeurs C#. Elles se comportent de manière similaire à leurs équivalents synchrones :

Il est possible que le type System.Threading.Tasks.ValueTask ne soit pas familier. Le struct ValueTask fournit une API similaire à la classe System.Threading.Tasks.Task. ValueTask est utilisé dans ces interfaces pour des raisons de performances.

Convertir en flux asynchrones

Ensuite, convertissez la méthode RunPagedQueryAsync pour générer un flux asynchrone. Tout d’abord, modifiez la signature de RunPagedQueryAsync pour retourner un IAsyncEnumerable<JToken> et supprimer les objets de jeton d’annulation et de progression de la liste de paramètres comme indiqué dans le code suivant :

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Le code de démarrage traite chaque page lorsqu’elle est récupérée, comme indiqué dans le code suivant :

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Remplacez ces trois lignes par le code suivant :

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Vous pouvez également supprimer la déclaration de finalResults plus tôt dans cette méthode et l’instruction return qui suit la boucle que vous avez modifiée.

Vous avez terminé les modifications permettant de générer un flux asynchrone. Une fois terminée, la méthode doit ressembler au code suivant :

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Ensuite, vous modifiez le code qui utilise la collection pour consommer le flux de données asynchrone. Recherchez dans Main le code suivant, qui traite l’ensemble des problèmes :

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Remplacez-le par la boucle await foreach suivante :

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

La nouvelle interface IAsyncEnumerator<T> est dérivée de IAsyncDisposable. Cela signifie que la boucle précédente supprime de façon asynchrone le flux à la fin de la boucle. Vous pouvez imaginer que la boucle ressemble au code suivant :

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Par défaut, les éléments de flux sont traités dans le contexte capturé. Si vous souhaitez désactiver la capture du contexte, utilisez la méthode d’extension TaskAsyncEnumerableExtensions.ConfigureAwait. Pour plus d’informations sur les contextes de synchronisation et la capture du contexte actuel, consultez l’article traitant de la consommation du modèle asynchrone basé sur les tâches.

Les flux asynchrones prennent en charge l’annulation en utilisant le même protocole que les autres méthodes async. Pour prendre en charge l’annulation, vous devez modifier la signature de la méthode d’itérateur asynchrone comme suit :

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

L’attribut System.Runtime.CompilerServices.EnumeratorCancellationAttribute amène le compilateur à générer du code pour le IAsyncEnumerator<T> qui fait que le jeton transmis à GetAsyncEnumerator est visible par le corps de l’itérateur asynchrone en tant qu’argument. À l’intérieur de runQueryAsync, vous pouvez examiner l’état du jeton et annuler tout travail supplémentaire si nécessaire.

Vous devez utiliser une autre méthode d’extension, WithCancellation, pour transmettre le jeton d’annulation au flux asynchrone. Vous devez modifier la boucle énumérant les problèmes comme suit :

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Vous pouvez vous procurer le code du tutoriel terminé dans ce tutoriel dans notre référentiel dotnet/docs situé dans le dossier asynchronous-programming/snippets.

Exécutez l'application terminée

Exécutez de nouveau l'application. Comparez son comportement avec le comportement de l’application de démarrage. La première page de résultats est énumérée dès qu’elle est disponible. Une pause peut être observée lorsque chaque nouvelle page est demandée et récupérée, puis les résultats de la page suivante sont rapidement énumérés. Le bloc try / catch n’est pas nécessaire pour gérer l’annulation : l’appelant peut arrêter l’énumération de la collection. Le rapport de progression est clair, car le flux asynchrone génère des résultats à mesure que chaque page est téléchargée. Le statut de chaque problème retourné est inclus de façon fluide dans la boucle await foreach. Vous n’avez pas besoin d’un objet de rappel pour suivre la progression.

Vous pouvez voir des améliorations lors de l’utilisation de mémoire en examinant le code. Vous n’avez plus besoin d’allouer une collection pour stocker tous les résultats avant qu’ils ne soient énumérés. L’appelant peut déterminer comment utiliser les résultats et si une collection de stockage est nécessaire.

Exécutez les applications de démarrage et les applications terminées. Ceci vous permettra d’observer les différences entre les implémentations pour vous. À la fin de ce tutoriel, vous pouvez supprimer le jeton d’accès GitHub que vous avez créé au début. Si un attaquant arrive à accéder à ce jeton, il pourrait accéder aux API GitHub à l’aide de vos informations d’identification.

Dans ce didacticiel, vous avez utilisé des flux asynchrones pour lire des éléments individuels à partir d’une API réseau qui retourne des pages de données. Les flux asynchrones peuvent également lire à partir de « flux sans fin », comme un ticker boursier ou un dispositif de capteur. L’appel à MoveNextAsync retourne l’élément suivant dès qu’il est disponible.