Tutorial: Generieren und Nutzen asynchroner Datenströme mit C# 8.0 und .NET Core 3.0Tutorial: Generate and consume async streams using C# 8.0 and .NET Core 3.0

In C# 8.0 werden asynchrone Datenströme eingeführt. Diese modellieren eine Datenstromquelle, wenn die Elemente im Datenstrom asynchron abgerufen oder generiert werden können.C# 8.0 introduces async streams, which model a streaming source of data when the elements in the data stream may be retrieved or generated asynchronously. Asynchrone Datenströme beruhen auf Schnittstellen, die in .NET Standard 2.1 neu eingeführt und in .NET Core 3.0 implementiert wurden, um ein natürliches Programmiermodell für Datenquellen für asynchrone Datenströme bereitzustellen.Async streams rely on new interfaces introduced in .NET Standard 2.1 and implemented in .NET Core 3.0 to provide a natural programming model for asynchronous streaming data sources.

In diesem Tutorial lernen Sie, wie die folgenden Aufgaben ausgeführt werden:In this tutorial, you'll learn how to:

  • Erstellen einer Datenquelle, die eine Sequenz von Datenelementen asynchron generiertCreate a data source that generates a sequence of data elements asynchronously.
  • Asynchrones Nutzen dieser DatenquelleConsume that data source asynchronously.
  • Erkennen, wenn die neue Schnittstelle und Datenquelle früheren synchronen Datensequenzen vorgezogen werdenRecognize when the new interface and data source are preferred to earlier synchronous data sequences.

Erforderliche KomponentenPrerequisites

Sie müssen Ihren Computer zur Ausführung von .NET Core einrichten, einschließlich des C# 8.0-Compilers.You’ll need to set up your machine to run .NET Core, including the C# 8.0 compiler. Der C# 8-Compiler steht ab Visual Studio 2019 Version 16.3 oder mit dem .NET Core 3.0 SDK zur Verfügung.The C# 8 compiler is available starting with Visual Studio 2019 version 16.3 or .NET Core 3.0 SDK.

Sie müssen ein GitHub-Zugriffstoken erstellen, damit Sie auf den GitHub GraphQL-Endpunkt zugreifen können.You'll need to create a GitHub access token so that you can access the GitHub GraphQL endpoint. Wählen Sie die folgenden Berechtigungen für Ihr GitHub-Zugriffstoken aus:Select the following permissions for your GitHub Access Token:

  • repo:statusrepo:status
  • public_repopublic_repo

Speichern Sie das Zugriffstoken an einem sicheren Ort, damit Sie es für den Zugriff auf den GitHub-API-Endpunkt verwenden können.Save the access token in a safe place so you can use it to gain access to the GitHub API endpoint.

Warnung

Schützen Sie Ihr persönliches Zugriffstoken.Keep your personal access token secure. Jede Software mit Ihrem persönlichen Zugriffstoken kann mit Ihren Zugriffsrechten GitHub-API-Aufrufe ausführen.Any software with your personal access token could make GitHub API calls using your access rights.

In diesem Tutorial wird vorausgesetzt, dass Sie C# und .NET, einschließlich Visual Studio oder die .NET Core-CLI kennen.This tutorial assumes you're familiar with C# and .NET, including either Visual Studio or the .NET Core CLI.

Ausführen der StartanwendungRun the starter application

Sie können den Code für die in diesem Tutorial verwendete Startanwendung aus unserem Repository dotnet/samples im Ordner csharp/tutorials/AsyncStreams abrufen.You can get the code for the starter application used in this tutorial from our dotnet/samples repository in the csharp/tutorials/AsyncStreams folder.

Die Startanwendung ist eine Konsolenanwendung, die die GitHub GraphQL-Schnittstelle zum Abrufen aktueller Issues verwendet, die in das Repository dotnet/docs geschrieben wurden.The starter application is a console application that uses the GitHub GraphQL interface to retrieve recent issues written in the dotnet/docs repository. Sehen Sie sich zunächst folgenden Code für die Main-Methode der Starter-App an:Start by looking at the following code for the starter app Main method:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Sie können entweder eine GitHubKey-Umgebungsvariable auf Ihr persönliches Zugriffstoken festlegen, oder Sie können das letzte Argument im Aufruf von GenEnvVariable durch Ihr persönliches Zugriffstoken ersetzen.You can either set a GitHubKey environment variable to your personal access token, or you can replace the last argument in the call to GenEnvVariable with your personal access token. Schließen Sie Ihren Zugriffscode nicht im Quellcode ein, wenn Sie die Quelle zusammen mit anderen nutzen oder in einem Repository mit gemeinsamer Quelle ablegen.Don't put your access code in source code if you'll be saving the source with others, or putting it in a shared source repository.

Nach dem Erstellen des GitHub-Clients werden durch den Code in Main ein Objekt für Fortschrittsberichte und ein Abbruchtoken erstellt.After creating the GitHub client, the code in Main creates a progress reporting object and a cancellation token. Nachdem die Objekte erstellt wurden, wird runPagedQueryAsync durch Main aufgerufen, um die neuesten 250 Issues abzurufen.Once those objects are created, Main calls runPagedQueryAsync to retrieve the most recent 250 created issues. Nach Abschluss dieser Aufgabe werden die Ergebnisse angezeigt.After that task has finished, the results are displayed.

Bei Ausführung der Startanwendung können Sie einige wichtige Details zur Ausführung dieser Anwendung beobachten.When you run the starter application, you can make some important observations about how this application runs. Für jede von GitHub zurückgegebene Seite wird der Fortschritt gemeldet.You'll see progress reported for each page returned from GitHub. Sie können eine deutliche Pause beobachten, bevor GitHub eine weitere neue Seite mit Issues zurückgibt.You can observe a noticeable pause before GitHub returns each new page of issues. Die Issues werden erst angezeigt, nachdem alle zehn Seiten aus GitHub abgerufen wurden.Finally, the issues are displayed only after all 10 pages have been retrieved from GitHub.

Untersuchen der ImplementierungExamine the implementation

Die Implementierung zeigt, warum Sie das im vorherigen Abschnitt beschriebene Verhalten beobachten konnten.The implementation reveals why you observed the behavior discussed in the previous section. Untersuchen Sie den Code für runPagedQueryAsync:Examine the code for runPagedQueryAsync:

private static async Task<JArray> runPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();
        finalResults.Merge(issues(results)["nodes"]);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Konzentrieren wir uns auf den Paginierungsalgorithmus und die asynchrone Struktur des obigen Codes.Let's concentrate on the paging algorithm and async structure of the preceding code. (Details zur GitHub GraphQL-API finden Sie in der GitHub GraphQL-Dokumentation.) Die runPagedQueryAsync-Methode listet die Issues vom neuesten zum ältesten auf.(You can consult the GitHub GraphQL documentation for details on the GitHub GraphQL API.) The runPagedQueryAsync method enumerates the issues from most recent to oldest. Sie fordert 25 Issues pro Seite an und untersucht die pageInfo-Struktur der Antwort, um mit der vorherigen Seite fortzufahren.It requests 25 issues per page and examines the pageInfo structure of the response to continue with the previous page. Dies entspricht der GraphQL-Standardpaginierungsunterstützung für mehrseitige Antworten.That follows GraphQL's standard paging support for multi-page responses. Die Antwort enthält ein pageInfo-Objekt mit einem hasPreviousPages-Wert und einem startCursor-Wert zum Anfordern der vorherigen Seite.The response includes a pageInfo object that includes a hasPreviousPages value and a startCursor value used to request the previous page. Die Issues befinden sich im nodes-Array.The issues are in the nodes array. Die runPagedQueryAsync-Methode fügt diese Knoten einem Array an, das alle Ergebnisse aus allen Seiten enthält.The runPagedQueryAsync method appends these nodes to an array that contains all the results from all pages.

Nach dem Abrufen und Wiederherstellen einer Seite mit Ergebnissen meldet runPagedQueryAsync den Fortschritt und prüft auf Abbruch.After retrieving and restoring a page of results, runPagedQueryAsync reports progress and checks for cancellation. Wenn ein Abbruch angefordert wurde, löst runPagedQueryAsync eine OperationCanceledException aus.If cancellation has been requested, runPagedQueryAsync throws an OperationCanceledException.

Es gibt mehrere Elemente in diesem Code, die verbessert werden können.There are several elements in this code that can be improved. Vor allem muss runPagedQueryAsync Speicherplatz für alle zurückgegebenen Issues zuordnen.Most importantly, runPagedQueryAsync must allocate storage for all the issues returned. In diesem Beispiel wird der Vorgang bei 250 Issues beendet, weil das Abrufen aller offenen Issues wesentlich mehr Arbeitsspeicher zum Speichern aller abgerufenen Issues erfordern würde.This sample stops at 250 issues because retrieving all open issues would require much more memory to store all the retrieved issues. Zudem ist der Algorithmus durch die Protokolle zur Unterstützung von Fortschritt und Abbruch beim ersten Lesen schwieriger zu verstehen.In addition, the protocols for supporting progress and supporting cancellation make the algorithm harder to understand on its first reading. Sie müssen die Fortschrittsklasse suchen, um herauszufinden, wo der Fortschritt gemeldet wird.You must look for the progress class to find where progress is reported. Außerdem müssen Sie die Kommunikation über die CancellationTokenSource und das zugehörige CancellationToken verfolgen, um nachzuvollziehen, wo der Abbruch angefordert und wo er gewährt wird.You also have to trace the communications through the CancellationTokenSource and its associated CancellationToken to understand where cancellation is requested and where it's granted.

Bessere Möglichkeiten durch asynchrone DatenströmeAsync streams provide a better way

Mit asynchronen Datenströmen und der zugehörigen Sprachunterstützung lassen sich diese Probleme beheben.Async streams and the associated language support address all those concerns. Der Code, der die Sequenz generiert, kann mit yield return jetzt Elemente in einer Methode zurückgeben, die mit dem async-Modifizierer deklariert wurde.The code that generates the sequence can now use yield return to return elements in a method that was declared with the async modifier. Sie können einen asynchronen Datenstrom mit einer await foreach-Schleife genau so nutzen, wie Sie eine beliebige Sequenz mit einer foreach-Schleife einsetzen.You can consume an async stream using an await foreach loop just as you consume any sequence using a foreach loop.

Diese neuen Sprachfeatures hängen von drei neuen Schnittstellen ab, die dem .NET Standard 2.1 hinzugefügt und in .NET Core 3.0 implementiert wurden:These new language features depend on three new interfaces added to .NET Standard 2.1 and implemented in .NET Core 3.0:

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        T Current { get; }

        ValueTask<bool> MoveNextAsync();
    }
}

namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

Diese drei Schnittstellen sollten den meisten C#-Entwicklern vertraut sein.These three interfaces should be familiar to most C# developers. Sie verhalten sich ähnlich wie ihre synchronen Gegenstücke:They behave in a manner similar to their synchronous counterparts:

Ein möglicherweise weniger bekannter Typ ist System.Threading.Tasks.ValueTask.One type that may be unfamiliar is System.Threading.Tasks.ValueTask. Die ValueTask-Struktur bietet eine ähnliche API für die System.Threading.Tasks.Task-Klasse.The ValueTask struct provides a similar API to the System.Threading.Tasks.Task class. ValueTask wird in diesen Schnittstellen aus Leistungsgründen verwendet.ValueTask is used in these interfaces for performance reasons.

Konvertieren in asynchrone DatenströmeConvert to async streams

Als Nächstes konvertieren Sie die runPagedQueryAsync-Methode, um einen asynchronen Datenstrom zu generieren.Next, convert the runPagedQueryAsync method to generate an async stream. Ändern Sie zunächst die Signatur von runPagedQueryAsync so, dass ein IAsyncEnumerable<JToken> zurückgegeben wird, und entfernen Sie das Abbruchtoken und die Fortschrittsobjekte aus der Parameterliste, wie im folgenden Code gezeigt:First, change the signature of runPagedQueryAsync to return an IAsyncEnumerable<JToken>, and remove the cancellation token and progress objects from the parameter list as shown in the following code:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client, 
    string queryText, string repoName)

Der Startcode verarbeitet die einzelnen Seiten, während sie abgerufen werden, wie im folgenden Code gezeigt:The starter code processes each page as the page is retrieved, as shown in the following code:

finalResults.Merge(issues(results)["nodes"]);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Ersetzen Sie diese drei Zeilen durch den folgenden Code:Replace those three lines with the following code:

foreach (JObject issue in issues(results)["nodes"])
    yield return issue;

Sie können auch die Deklaration von finalResults weiter oben in dieser Methode sowie die return-Anweisung entfernen, die der von Ihnen geänderten Schleife folgt.You can also remove the declaration of finalResults earlier in this method and the return statement that follows the loop you modified.

Sie haben die Änderungen zum Generieren eines asynchronen Datenstroms abgeschlossen.You've finished the changes to generate an async stream. Die endgültige Methode sollte dem folgenden Code entsprechen:The finished method should resemble the code below:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client, 
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();

        foreach (JObject issue in issues(results)["nodes"])
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Als Nächstes ändern Sie den Code, der die Sammlung nutzt, um den asynchronen Datenstrom zu verwenden.Next, you change the code that consumes the collection to consume the async stream. Suchen Sie in Main den folgenden Code, der die Sammlung der Issues verarbeitet:Find the following code in Main that processes the collection of issues:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Ersetzen Sie den Code durch die folgende await foreach-Schleife:Replace that code with the following await foreach loop:

int num = 0;
await foreach (var issue in runPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Sie können den Code für das abgeschlossene Tutorial aus dem Repository dotnet/samples im Ordner csharp/tutorials/AsyncStreams abrufen.You can get the code for the finished tutorial from the dotnet/samples repository in the csharp/tutorials/AsyncStreams folder.

Ausführen der fertig gestellten AnwendungRun the finished application

Führen Sie die Anwendung erneut aus.Run the application again. Vergleichen Sie deren Verhalten mit dem Verhalten der Startanwendung.Contrast its behavior with the behavior of the starter application. Die erste Seite mit Ergebnissen wird aufgelistet, sobald sie verfügbar ist.The first page of results is enumerated as soon as it's available. Es gibt eine wahrnehmbare Pause, wenn eine neue Seite angefordert und abgerufen wird, und dann werden die Ergebnisse der nächsten Seite schnell aufgelistet.There's an observable pause as each new page is requested and retrieved, then the next page's results are quickly enumerated. Der try / catch-Block ist zur Verarbeitung eines Abbruchs nicht erforderlich: Der Aufrufer kann das Auflisten der Sammlung beenden.The try / catch block isn't needed to handle cancellation: the caller can stop enumerating the collection. Der Fortschritt wird deutlich gemeldet, weil der asynchrone Datenstrom die Ergebnisse generiert, während die einzelnen Seiten heruntergeladen werden.Progress is clearly reported because the async stream generates results as each page is downloaded. Der Status jedes zurückgegebenen Problems ist nahtlos in der await foreach-Schleife enthalten.The status for each issue returned is seamlessly included in the await foreach loop. Sie benötigen kein Rückrufobjekt, um den Fortschritt nachzuverfolgen.You don't need a callback object to track progress.

Sie können Verbesserungen in der Arbeitsspeichernutzung erkennen, indem Sie den Code untersuchen.You can see improvements in memory use by examining the code. Sie müssen eine Sammlung nicht mehr zuordnen, um alle Ergebnisse zu speichern, bevor sie aufgelistet werden.You no longer need to allocate a collection to store all the results before they're enumerated. Der Aufrufer kann festlegen, wie die Ergebnisse genutzt werden und ob eine Speichersammlung erforderlich ist.The caller can determine how to consume the results and if a storage collection is needed.

Führen Sie sowohl die Startanwendung als auch die fertig gestellte Anwendung aus, um die Unterschiede zwischen den Implementierungen selbst zu beobachten.Run both the starter and finished applications and you can observe the differences between the implementations for yourself. Wenn Sie fertig sind, können Sie das zu Beginn des Tutorials erstellte GitHub-Zugriffstoken löschen.You can delete the GitHub access token you created when you started this tutorial after you've finished. Wenn ein Angreifer Zugriff auf dieses Token erlangt hat, kann er mit Ihren Anmeldeinformationen auf GitHub-APIs zugreifen.If an attacker gained access to that token, they could access GitHub APIs using your credentials.