Självstudie: Generera och använda asynkrona strömmar med C# och .NET

Asynkrona strömmar modellerar en strömmande datakälla. Dataströmmar hämtar eller genererar ofta element asynkront. De tillhandahåller en naturlig programmeringsmodell för asynkrona strömmande datakällor.

I den här självstudien får du lära dig att:

  • Skapa en datakälla som genererar en sekvens med dataelement asynkront.
  • Använd datakällan asynkront.
  • Stöd för annullering och insamlade kontexter för asynkrona strömmar.
  • Identifiera när det nya gränssnittet och datakällan föredras framför tidigare synkrona datasekvenser.

Förutsättningar

Du måste konfigurera datorn för att köra .NET, inklusive C#-kompilatorn. C#-kompilatorn är tillgänglig med Visual Studio 2022 eller .NET SDK.

Du måste skapa en GitHub-åtkomsttoken så att du kan komma åt GitHub GraphQL-slutpunkten. Välj följande behörigheter för din GitHub-åtkomsttoken:

  • repo:status
  • public_repo

Spara åtkomsttoken på en säker plats så att du kan använda den för att få åtkomst till GitHub API-slutpunkten.

Varning

Skydda din personliga åtkomsttoken. Alla program med din personliga åtkomsttoken kan göra GitHub API-anrop med hjälp av dina åtkomsträttigheter.

Den här självstudien förutsätter att du är bekant med C# och .NET, inklusive antingen Visual Studio eller .NET CLI.

Kör startprogrammet

Du kan hämta koden för startprogrammet som används i den här självstudien från dotnet/docs-lagringsplatsen i mappen asynchronous-programming/snippets .

Startprogrammet är ett konsolprogram som använder GitHub GraphQL-gränssnittet för att hämta de senaste problemen som skrivits på dotnet/docs-lagringsplatsen . Börja med att titta på följande kod för startappmetoden Main :

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Du kan antingen ange en GitHubKey miljövariabel till din personliga åtkomsttoken eller ersätta det sista argumentet i anropet till GetEnvVariable med din personliga åtkomsttoken. Placera inte din åtkomstkod i källkoden om du ska dela källan med andra. Ladda aldrig upp åtkomstkoder till en lagringsplats för delad källa.

När du har skapat GitHub-klienten skapar koden i Main ett förloppsrapporteringsobjekt och en annulleringstoken. När dessa objekt har skapats Main anropas RunPagedQueryAsync för att hämta de senaste 250 problem som skapats. När aktiviteten har slutförts visas resultatet.

När du kör startprogrammet kan du göra några viktiga observationer om hur det här programmet körs. Förloppet rapporteras för varje sida som returneras från GitHub. Du kan observera en märkbar paus innan GitHub returnerar varje ny sida med problem. Slutligen visas problemen först när alla 10 sidor har hämtats från GitHub.

Granska implementeringen

Implementeringen visar varför du observerade beteendet som beskrevs i föregående avsnitt. Granska koden för RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Det allra första den här metoden gör är att skapa POST-objektet med hjälp av GraphQLRequest klassen:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

som hjälper till att bilda POST-objekttexten och korrekt konvertera den till JSON som presenteras som en enskild sträng med ToJsonText metoden, vilket tar bort alla nya tecken från begärandetexten \ som markerar dem med escape-tecknet (omvänt snedstreck).

Nu ska vi koncentrera oss på växlingsalgoritmen och asynkron strukturen i föregående kod. (Du kan läsa GitHub GraphQL-dokumentation för mer information om GitHub GraphQL API.) Metoden RunPagedQueryAsync räknar upp problemen från den senaste till den äldsta. Den begär 25 problem per sida och undersöker pageInfo strukturen för svaret för att fortsätta med föregående sida. Detta följer GraphQL:s standardstöd för sidindelning för svar på flera sidor. Svaret innehåller ett pageInfo objekt som innehåller ett hasPreviousPages värde och ett startCursor värde som används för att begära föregående sida. Problemen finns i matrisen nodes . Metoden RunPagedQueryAsync lägger till dessa noder i en matris som innehåller alla resultat från alla sidor.

När du har hämtat och återställt en sida med resultat RunPagedQueryAsync rapporterar du förlopp och söker efter annullering. Om annullering har begärts RunPagedQueryAsync utlöser du en OperationCanceledException.

Det finns flera element i den här koden som kan förbättras. Viktigast av allt, RunPagedQueryAsync måste allokera lagring för alla problem som returneras. Det här exemplet stoppas vid 250 problem eftersom hämtning av alla öppna problem skulle kräva mycket mer minne för att lagra alla hämtade problem. Protokollen för stöd för förloppsrapporter och annullering gör algoritmen svårare att förstå vid första behandlingen. Fler typer och API:er ingår. Du måste spåra kommunikationen via CancellationTokenSource och dess associerade CancellationToken för att förstå var annullering begärs och var den beviljas.

Asynkrona strömmar ger ett bättre sätt

Asynkrona strömmar och tillhörande språkstöd hanterar alla dessa problem. Koden som genererar sekvensen kan nu användas yield return för att returnera element i en metod som deklarerades med async modifieraren. Du kan använda en asynkron ström med hjälp av en await foreach loop precis som du använder valfri sekvens med hjälp av en foreach loop.

Dessa nya språkfunktioner beror på tre nya gränssnitt som lagts till i .NET Standard 2.1 och implementerats i .NET Core 3.0:

De här tre gränssnitten bör vara bekanta för de flesta C#-utvecklare. De beter sig på ett sätt som liknar deras synkrona motsvarigheter:

En typ som kanske inte är bekant är System.Threading.Tasks.ValueTask. Structen ValueTask tillhandahåller ett liknande API som System.Threading.Tasks.Task klassen. ValueTask används i dessa gränssnitt av prestandaskäl.

Konvertera till asynkrona strömmar

Konvertera RunPagedQueryAsync sedan metoden för att generera en asynkron ström. Ändra först signaturen RunPagedQueryAsync för för att returnera en IAsyncEnumerable<JToken>, och ta bort annulleringstoken och förloppsobjekten från parameterlistan enligt följande kod:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Startkoden bearbetar varje sida när sidan hämtas, enligt följande kod:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Ersätt dessa tre rader med följande kod:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Du kan också ta bort deklarationen från finalResults tidigare i den här metoden och instruktionen return som följer den loop som du ändrade.

Du har slutfört ändringarna för att generera en asynkron ström. Den färdiga metoden bör likna följande kod:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Därefter ändrar du koden som använder samlingen för att använda asynkron dataström. Hitta följande kod i Main som bearbetar insamlingen av problem:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Ersätt koden med följande await foreach loop:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Det nya gränssnittet IAsyncEnumerator<T> härleds från IAsyncDisposable. Det innebär att föregående loop tar bort strömmen asynkront när loopen är klar. Du kan tänka dig att loopen ser ut som följande kod:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Som standard bearbetas dataströmelement i den insamlade kontexten. Om du vill inaktivera infångning av kontexten använder du TaskAsyncEnumerableExtensions.ConfigureAwait tilläggsmetoden. Mer information om synkroniseringskontexter och insamling av den aktuella kontexten finns i artikeln om hur du använder det aktivitetsbaserade asynkrona mönstret.

Async-strömmar stöder annullering med samma protokoll som andra async metoder. Du ändrar signaturen för metoden async iterator enligt följande för att stödja annullering:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Attributet System.Runtime.CompilerServices.EnumeratorCancellationAttribute gör att kompilatorn genererar kod för IAsyncEnumerator<T> som gör att token skickas till GetAsyncEnumerator synligt brödtexten i asynkron iteratorn som det argumentet. I runQueryAsynckan du undersöka tokens tillstånd och avbryta ytterligare arbete om det begärs.

Du använder en annan tilläggsmetod, WithCancellation, för att skicka annulleringstoken till asynkron dataströmmen. Du skulle ändra loopen som räknar upp problemen på följande sätt:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Du kan hämta koden för den färdiga självstudien från dotnet/docs-lagringsplatsen i mappen asynchronous-programming/snippets .

Kör det färdiga programmet

Kör programmet igen. Kontrastera dess beteende med startprogrammets beteende. Den första sidan med resultat räknas upp så snart den är tillgänglig. Det finns en observerbar paus när varje ny sida begärs och hämtas. Nästa sidas resultat räknas snabbt upp. Blocket try / catch behövs inte för att hantera annullering: anroparen kan sluta räkna upp samlingen. Förloppet rapporteras tydligt eftersom asynkron dataström genererar resultat när varje sida laddas ned. Statusen för varje problem som returneras ingår sömlöst i loopen await foreach . Du behöver inget motringningsobjekt för att spåra förloppet.

Du kan se förbättringar i minnesanvändningen genom att undersöka koden. Du behöver inte längre allokera en samling för att lagra alla resultat innan de räknas upp. Anroparen kan avgöra hur resultatet ska användas och om en lagringssamling behövs.

Kör både startprogram och färdiga program och du kan se skillnaderna mellan implementeringarna själv. Du kan ta bort github-åtkomsttoken som du skapade när du startade den här självstudien när du är klar. Om en angripare får åtkomst till denna token kan de komma åt GitHub-API:er med dina autentiseringsuppgifter.

I den här självstudien använde du asynkrona strömmar för att läsa enskilda objekt från ett nätverks-API som returnerar datasidor. Asynkrona strömmar kan också läsas från "aldrig sinande strömmar" som en aktie ticker eller sensorenhet. Anropet till MoveNextAsync returnerar nästa objekt så snart det är tillgängligt.