Esercitazione: Generare e usare flussi asincroni con C# e .NET

I flussi asincroni modellano un'origine di streaming di dati. I flussi di dati spesso recuperano o generano elementi in modo asincrono. Forniscono un modello di programmazione naturale per le origini dati di streaming asincrone.

Questa esercitazione illustra come:

  • Creare un'origine dati che genera una sequenza di elementi di dati in modo asincrono.
  • Utilizzare tale origine dati in modo asincrono.
  • Supportare l'annullamento e i contesti acquisiti per i flussi asincroni.
  • Riconoscere quando la nuova interfaccia e l'origine dati sono da preferire rispetto alle sequenze di dati sincrone precedenti.

Prerequisiti

È necessario configurare il computer per eseguire .NET, incluso il compilatore C#. Il compilatore C# è disponibile con Visual Studio 2022 o .NET SDK.

È necessario creare un token di accesso di GitHub per poter accedere all'endpoint GraphQL di GitHub. Selezionare le autorizzazioni seguenti per il token di accesso di GitHub:

  • repo:status
  • public_repo

Salvare il token di accesso in un luogo sicuro in modo da poterlo usare per ottenere l'accesso all'endpoint dell'API GitHub.

Avviso

Mantenere protetto il token di accesso personale. Qualsiasi software con il token di accesso personale può effettuare chiamate API GitHub tramite i diritti di accesso.

Per questa esercitazione si presuppone che l'utente abbia familiarità con C# e .NET, inclusa l'interfaccia della riga di comando di Visual Studio o di .NET.

Eseguire l'applicazione iniziale

È possibile ottenere il codice per l'applicazione iniziale usata in questa esercitazione dal repository dotnet/docs nella cartella asincrona-programmazione/frammenti di codice.

L'applicazione iniziale è un'applicazione console che usa l'interfaccia GraphQL di GitHub per recuperare i problemi recenti scritti nel repository dotnet/docs. Per iniziare, esaminare il codice seguente per il metodo Main dell'app iniziale:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

È possibile impostare una variabile di ambiente GitHubKey per il token di accesso personale oppure è possibile sostituire l'ultimo argomento nella chiamata a GetEnvVariable con il token di accesso personale. Non inserire il codice di accesso nel codice sorgente se si condividerà l'origine con altri utenti. Non caricare mai i codici di accesso in un repository di origine condiviso.

Dopo aver creato il client di GitHub, il codice in Main crea un oggetto di segnalazione dello stato e un token di annullamento. Dopo aver creato questi oggetti, Main chiama RunPagedQueryAsync per recuperare i 250 problemi creati più di recente. Al termine di tale attività, vengono visualizzati i risultati.

Quando si esegue l'applicazione iniziale, è possibile notare alcuni aspetti importanti della modalità di esecuzione di questa applicazione. Lo stato viene segnalato per ogni pagina restituita da GitHub. È possibile notare una notevole pausa prima che GitHub restituisca ogni nuova pagina di problemi. Infine, i problemi vengono visualizzati solo dopo aver recuperato tutte e 10 le pagine da GitHub.

Esaminare l'implementazione

L'implementazione spiega il comportamento evidenziato nella sezione precedente. Esaminare il codice per RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

La prima operazione eseguita da questo metodo consiste nel creare l'oggetto POST, usando la classe GraphQLRequest:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

che consente di formare il corpo dell'oggetto POST e di convertirlo correttamente in JSON presentato come singola stringa con il metodo ToJsonText, che rimuove tutti i caratteri di nuova riga dal corpo della richiesta contrassegnandoli con il carattere di escape \ (barra rovesciata).

Concentrarsi sull'algoritmo per la suddivisione in pagine e sulla struttura asincrona del codice precedente. (È possibile consultare la documentazione per GraphQL di GitHub per informazioni dettagliate sull'API GraphQL di GitHub.) Il metodo RunPagedQueryAsync enumera i problemi dal più recente al meno recente. Richiede 25 problemi per ogni pagina ed esamina la struttura pageInfo della risposta per continuare con la pagina precedente. Viene rispettato il supporto della suddivisione in pagine standard di GraphQL per le risposte a più pagine. La risposta include un oggetto pageInfo che include un valore hasPreviousPages e un valore startCursor usato per richiedere la pagina precedente. I problemi sono nella matrice nodes. Il metodo RunPagedQueryAsync aggiunge questi nodi in una matrice che contiene tutti i risultati da tutte le pagine.

Dopo il recupero e il ripristino di una pagina di risultati, RunPagedQueryAsync segnala lo stato e verifica se è presente una richiesta di annullamento. In caso affermativo, RunPagedQueryAsync genera un'eccezione OperationCanceledException.

Esistono diversi elementi in questo codice che possono essere migliorati. Soprattutto, RunPagedQueryAsync deve allocare spazio di archiviazione per tutti i problemi restituiti. Questo esempio si arresta dopo 250 problemi, perché il recupero di tutti i problemi richiede molta più memoria per archiviare tutti i problemi recuperati. I protocolli per supportare i report sullo stato di avanzamento e l'annullamento rendono l'algoritmo più difficile da comprendere per la prima lettura. Sono coinvolti più tipi e API. È necessario tracciare le comunicazioni tramite il CancellationTokenSource e il relativo CancellationToken associato per comprendere dove viene richiesto l'annullamento e dove viene concesso.

I flussi asincroni sono più efficaci

I flussi asincroni e il supporto del linguaggio associato offrono una risposta a tutte queste problematiche. Il codice che genera la sequenza può ora usare yield return per restituire gli elementi in un metodo dichiarato con il modificatore async. È possibile utilizzare un flusso asincrono con un ciclo await foreach, proprio come si utilizza qualsiasi sequenza con un ciclo foreach.

Queste nuove funzionalità del linguaggio dipendono da tre nuove interfacce aggiunte a .NET 2.1 Standard e implementate in .NET Core 3.0:

Queste tre interfacce dovrebbero risultare familiari alla maggior parte degli sviluppatori C#. Il comportamento è simile a quello delle relative controparti sincrone:

Un tipo che potrebbe essere poco noto è System.Threading.Tasks.ValueTask. Lo struct ValueTask fornisce un'API simile alla classe System.Threading.Tasks.Task. ValueTask viene usato in queste interfacce per motivi di prestazioni.

Convertire in flussi asincroni

A questo punto, il metodo RunPagedQueryAsync verrà convertito per generare un flusso asincrono. In primo luogo, modificare la firma di RunPagedQueryAsync per restituire IAsyncEnumerable<JToken> e rimuovere il token di annullamento e gli oggetti di stato dall'elenco di parametri, come illustrato nel codice seguente:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Il codice iniziale elabora ogni pagina quando viene recuperata, come illustrato nel codice seguente:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Sostituire queste tre righe con il codice seguente:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

È anche possibile rimuovere la dichiarazione di finalResults più indietro in questo metodo e l'istruzione return che segue il ciclo modificato.

Sono state completate le modifiche per generare un flusso asincrono. Il metodo completato dovrebbe essere simile al codice seguente:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Verrà ora modificato il codice che utilizza la raccolta per utilizzare il flusso asincrono. Trovare il codice seguente in Main che elabora la raccolta dei problemi:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Sostituire il codice con il ciclo await foreach seguente:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

La nuova interfaccia IAsyncEnumerator<T> deriva da IAsyncDisposable. Ciò significa che il ciclo precedente eliminerà in modo asincrono il flusso al termine del ciclo. È possibile immaginare che il ciclo sia simile al codice seguente:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Per impostazione predefinita, gli elementi del flusso vengono elaborati nel contesto acquisito. Se si vuole disabilitare l'acquisizione del contesto, usare il metodo di estensione TaskAsyncEnumerableExtensions.ConfigureAwait. Per altre informazioni sui contesti di sincronizzazione e sull'acquisizione del contesto corrente, vedere l'articolo sull'uso del modello asincrono basato su attività.

I flussi asincroni supportano l'annullamento usando lo stesso protocollo di altri metodi di async. Per supportare l'annullamento, modificare la firma per il metodo iteratore asincrono:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

L'attributo System.Runtime.CompilerServices.EnumeratorCancellationAttribute fa sì che il compilatore generi codice per il IAsyncEnumerator<T> che rende il token passato a GetAsyncEnumerator visibile al corpo dell'iteratore asincrono come argomento. All'interno di runQueryAsyncè possibile esaminare lo stato del token e annullare ulteriori operazioni se richiesto.

Si usa un altro metodo di estensione, WithCancellation, per passare il token di annullamento al flusso asincrono. Modificare il ciclo che enumera i problemi come indicato di seguito:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

È possibile ottenere il codice per l'esercitazione completata dal repository dotnet/docs nella cartella asincrona-programmazione/frammenti di codice.

Eseguire l'applicazione completata

Eseguire di nuovo l'applicazione. Confrontare il comportamento con il comportamento dell'applicazione iniziale. La prima pagina di risultati viene enumerata non appena è disponibile. Esiste una pausa osservabile quando viene richiesta e recuperata ogni nuova pagina, poi i risultati della pagina successiva vengono enumerati rapidamente. Il blocco try / catch non è necessario per gestire l'annullamento: il chiamante può interrompere l'enumerazione della raccolta. Lo stato viene segnalato in modo chiaro perché il flusso asincrono genera i risultati quando viene scaricata ogni pagina. Lo stato per ogni problema restituito è incluso senza problemi nel ciclo await foreach. Non è necessario un oggetto callback per tenere traccia dello stato di avanzamento.

È possibile notare miglioramenti per l'uso della memoria esaminando il codice. Non è più necessario allocare una raccolta per archiviare tutti i risultati prima che vengano enumerati. Il chiamante può determinare come utilizzare i risultati e se è necessaria una raccolta di archiviazione.

Eseguire sia l'applicazione iniziale che quella finita per osservare in autonomia le differenze tra le implementazioni. Al termine, è possibile eliminare il token di accesso di GitHub creato all'inizio di questa esercitazione. Se un utente malintenzionato riesce ad accedere al token, potrebbe ottenere l'accesso alle a API di GitHub usando le credenziali.

In questa esercitazione sono stati usati flussi asincroni per leggere singoli elementi da un'API di rete che restituisce pagine di dati. I flussi asincroni possono anche leggere da "flussi mai terminati" come un ticker di stock o un dispositivo sensore. La chiamata a MoveNextAsync restituisce l'elemento successivo non appena è disponibile.