Oktatóanyag: Aszinkron streamek létrehozása és felhasználása c# és .NET használatával

Az aszinkron streamek adatstreamelési forrást modellezhetnek. Az adatfolyamok gyakran aszinkron módon kérik le vagy generálják az elemeket. Természetes programozási modellt biztosítanak az aszinkron streamelési adatforrásokhoz.

Az oktatóanyag segítségével megtanulhatja a következőket:

  • Hozzon létre egy adatforrást, amely aszinkron módon hoz létre adatelemek sorozatát.
  • Használja az adatforrást aszinkron módon.
  • Aszinkron streamek lemondásának és rögzített környezeteinek támogatása.
  • Ismerje fel, hogy az új felület és adatforrás mikor előnyösebb a korábbi szinkron adatütemezésekhez.

Előfeltételek

Be kell állítania a gépet a .NET futtatására, beleértve a C# fordítót is. A C# fordító a Visual Studio 2022-ben vagy a .NET SDK-val érhető el.

Létre kell hoznia egy GitHub hozzáférési jogkivonatot , hogy hozzáférhessen a GitHub GraphQL-végponthoz. Válassza ki a következő engedélyeket a GitHub hozzáférési jogkivonatához:

  • adattár:állapot
  • public_repo

Mentse a hozzáférési jogkivonatot biztonságos helyre, hogy ezzel hozzáférést kapjon a GitHub API-végponthoz.

Figyelmeztetés

A személyes hozzáférési jogkivonat védelme. A személyes hozzáférési jogkivonattal rendelkező szoftverek a Hozzáférési jogosultságok használatával indíthatnak GitHub API-hívásokat.

Ez az oktatóanyag feltételezi, hogy ismeri a C# és a .NET használatát, beleértve a Visual Studiót vagy a .NET CLI-t is.

A kezdőalkalmazás futtatása

Az oktatóanyagban használt kezdőalkalmazás kódját az aszinkron-programozás/kódrészletek mappában található dotnet/docs-adattárbólszerezheti be.

A kezdőalkalmazás egy konzolalkalmazás, amely a GitHub GraphQL felületét használja a dotnet/docs-adattárban írt legutóbbi problémák lekéréséhez. Első lépésként tekintse meg a kezdőalkalmazás-metódus Main következő kódját:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Beállíthat egy környezeti változót GitHubKey a személyes hozzáférési jogkivonatra, vagy lecserélheti a hívás GetEnvVariable utolsó argumentumát a személyes hozzáférési jogkivonatára. Ne helyezze a hozzáférési kódot a forráskódba, ha megosztja a forrást másokkal. Soha ne töltsön fel hozzáférési kódokat egy megosztott forrásadattárba.

A GitHub-ügyfél létrehozása után a benne lévő Main kód létrehoz egy folyamatjelentési objektumot és egy lemondási jogkivonatot. Az objektumok létrehozása Main után a legutóbbi 250 létrehozott probléma lekérésére hív meg hívásokat RunPagedQueryAsync . A tevékenység befejezése után megjelennek az eredmények.

A kezdőalkalmazás futtatásakor fontos megfigyeléseket tehet az alkalmazás működéséről. A GitHubról visszaadott minden egyes oldal előrehaladását láthatja. Észrevehető szüneteltetés figyelhető meg, mielőtt a GitHub visszaadja a problémák minden új oldalát. Végül a problémák csak akkor jelennek meg, ha mind a 10 oldalt lekértük a GitHubról.

A megvalósítás vizsgálata

Az implementációból kiderül, hogy miért figyelte meg az előző szakaszban tárgyalt viselkedést. Vizsgálja meg a következőhöz tartozó RunPagedQueryAsynckódot:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

A módszer első lépéseként hozza létre a POST objektumot az GraphQLRequest osztály használatával:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

amely segít létrehozni a POST objektum törzsét, és megfelelően átalakítani a metódussal egyetlen sztringként ToJsonText megjelenített JSON-ra, amely eltávolítja az összes újvonalas karaktert a kérelem törzséből, és megjelöli őket a \ (fordított perjel) feloldó karakterrel.

Koncentráljunk az előző kód lapozási algoritmusára és aszinkron szerkezetére. (A A GitHub GraphQL dokumentációja a GitHub GraphQL API-val kapcsolatos részletekért.) A RunPagedQueryAsync metódus a legújabbtól a legrégebbiig sorolja fel a problémákat. Oldalanként 25 problémát kér, és megvizsgálja a pageInfo válasz szerkezetét, hogy az előző oldallal folytatódjon. Ez a GraphQL szabványos lapozási támogatását követi a többoldalas válaszokhoz. A válasz tartalmaz egy pageInfo objektumot, amely tartalmaz egy hasPreviousPages értéket és egy startCursor értéket, amelyet az előző oldal kéréséhez használnak. A problémák a nodes tömbben vannak. A RunPagedQueryAsync metódus hozzáfűzi ezeket a csomópontokat egy tömbhöz, amely az összes oldal összes eredményét tartalmazza.

Az eredmények egy oldalának lekérése és visszaállítása után jelentést készít a folyamat előrehaladásáról, RunPagedQueryAsync és ellenőrzi a lemondást. Ha a lemondást kérték, RunPagedQueryAsync a rendszer egy OperationCanceledException.

A kód több olyan elemet is tartalmazhat, amelyek javíthatók. A legfontosabb, hogy RunPagedQueryAsync minden visszaadott problémához ki kell osztania a tárterületet. Ez a minta 250-nél leáll, mert az összes nyitott probléma beolvasása sokkal több memóriát igényel a beolvasott problémák tárolásához. Az előrehaladási jelentések és lemondások támogatásának protokolljai megnehezítik az algoritmus megértését az első olvasatban. További típusok és API-k is érintettek. Nyomon kell követnie a kommunikációt a kapcsolódó és CancellationToken azon CancellationTokenSource keresztül, hogy megértse, hol kérik a lemondást, és hol kapják meg.

Az aszinkron streamek jobb módot biztosítanak

Az aszinkron streamek és a hozzá tartozó nyelvi támogatás az összes ilyen problémát kezelik. A sorozatot létrehozó kód mostantól a módosítóval async deklarált metódus elemeinek visszaadására használhatóyield return. Az aszinkron streameket ugyanúgy használhatja hurkok await foreach használatával, mint bármely sorozatot egy foreach hurok használatával.

Ezek az új nyelvi funkciók a .NET Standard 2.1-hez hozzáadott és a .NET Core 3.0-ban implementált három új felülettől függnek:

Ez a három felület a legtöbb C#-fejlesztő számára ismerős lehet. A szinkron megfelelőikhez hasonló módon viselkednek:

Az egyik ismeretlen típus a .System.Threading.Tasks.ValueTask A ValueTask szerkezet az osztályhoz System.Threading.Tasks.Task hasonló API-t biztosít. ValueTask a rendszer teljesítménybeli okokból használja ezeket az interfészeket.

Konvertálás aszinkron streamekké

Ezután konvertálja a metódust RunPagedQueryAsync aszinkron stream létrehozásához. Először módosítsa az aláírást RunPagedQueryAsync , hogy visszaadjon egy IAsyncEnumerable<JToken>, és távolítsa el a lemondási jogkivonatot és a folyamatobjektumokat a paraméterlistából az alábbi kódban látható módon:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

A kezdőkód az egyes oldalakat az oldal lekérésekor dolgozza fel, ahogyan az a következő kódban is látható:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Cserélje le ezt a három sort a következő kódra:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

A metódus korábbi deklarációját finalResults és a return módosított ciklust követő utasítást is eltávolíthatja.

Befejezte a módosításokat az aszinkron stream létrehozásához. A kész metódusnak a következő kódhoz kell hasonlítania:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Ezután módosítja a gyűjteményt használó kódot, hogy az aszinkron streamet használja. Keresse meg a következő kódot, amely Main feldolgozza a problémák gyűjteményét:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Cserélje le a kódot a következő await foreach hurokra:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Az új felület IAsyncEnumerator<T> a következőből IAsyncDisposableszármazik: . Ez azt jelenti, hogy az előző ciklus aszinkron módon törli a streamet a ciklus befejeződésekor. El tudja képzelni, hogy a hurok a következő kódhoz hasonlóan néz ki:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Alapértelmezés szerint a streamelemek feldolgozása a rögzített környezetben történik. Ha le szeretné tiltani a környezet rögzítését, használja a TaskAsyncEnumerableExtensions.ConfigureAwait bővítménymetódust. A szinkronizálási környezetekről és az aktuális környezet rögzítéséről a tevékenységalapú aszinkron minta felhasználásáról szóló cikkben talál további információt.

Az aszinkron streamek ugyanazt a protokollt használják, mint más async metódusok. Az aszinkron iterátor metódus aláírását az alábbiak szerint módosítaná a lemondás támogatásához:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Az System.Runtime.CompilerServices.EnumeratorCancellationAttribute attribútum hatására a fordító olyan kódot hoz létre, IAsyncEnumerator<T> amely az aszinkron iterátor törzsének láthatóvá teszi a jogkivonatot GetAsyncEnumerator argumentumként. Belül runQueryAsyncmegvizsgálhatja a jogkivonat állapotát, és igény esetén megszakíthatja a további munkát.

Egy másik bővítménymetódus WithCancellationhasználatával adja át a lemondási jogkivonatot az aszinkron streamnek. A problémákat számba adó hurkot az alábbiak szerint módosítaná:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

A kész oktatóanyag kódját az aszinkron-programozási/kódrészletek mappában található dotnet/docs-adattárbólszerezheti be.

A kész alkalmazás futtatása

Futtassa ismét az alkalmazást. A viselkedése és a kezdőalkalmazás viselkedésének ellentéte. A találatok első oldala a rendelkezésre állás után enumerálódik. Megfigyelhető szüneteltetés történik az új lapok kérése és lekérése során, majd a következő lap eredményei gyorsan számba lesznek véve. A try / catch letiltás nem szükséges a lemondás kezeléséhez: a hívó leállíthatja a gyűjtemény számbavételét. A folyamat előrehaladását egyértelműen jelenti a rendszer, mert az aszinkron stream az egyes lapok letöltésekor eredményeket hoz létre. Az egyes visszaadott problémák állapota zökkenőmentesen szerepel a await foreach ciklusban. A folyamat nyomon követéséhez nincs szükség visszahívási objektumra.

A kód vizsgálatával a memóriahasználat fejlesztései láthatók. A továbbiakban nem kell lefoglalnia egy gyűjteményt az összes eredmény tárolásához a számbavétel előtt. A hívó meghatározhatja, hogyan használhatja fel az eredményeket, és hogy szükség van-e tárgyűjteményre.

Futtassa az indítási és a befejezett alkalmazásokat is, és megfigyelheti az implementációk közötti különbségeket. Az oktatóanyag befejezése után törölheti a gitHub hozzáférési jogkivonatát, amelyet akkor hozott létre, amikor elindította ezt az oktatóanyagot. Ha egy támadó hozzáfér ehhez a jogkivonathoz, az Ön hitelesítő adataival férhet hozzá a GitHub API-khoz.

Ebben az oktatóanyagban aszinkron streamekkel olvasta be az egyes elemeket egy hálózati API-ból, amely adatoldalakat ad vissza. Az aszinkron streamek "soha véget nem érő streamekből" is olvashatnak, például tőzsdei ketyegőből vagy érzékelőeszközből. A hívás MoveNextAsync a következő elemet adja vissza, amint elérhető.