Samouczek: generowanie strumieni asynchronicznych i korzystanie z tych strumieni przy użyciu języka C# 8.0 i .NET Core 3.0

W języku C# 8.0 wprowadzono strumienie asynchroniczne, które modelować źródło danych przesyłania strumieniowego. Strumienie danych często pobierają lub generują elementy asynchronicznie. Strumienie asynchroniczne korzystają z nowych interfejsów wprowadzonych w .NET Standard 2.1. Te interfejsy są obsługiwane w programie .NET Core 3.0 i nowszych. Zapewniają naturalny model programowania dla asynchronicznych źródeł danych przesyłania strumieniowego.

Z tego samouczka dowiesz się, jak wykonywać następujące czynności:

  • Utwórz źródło danych, które generuje sekwencję elementów danych asynchronicznie.
  • Asynchroniczne wykorzystanie tego źródła danych.
  • Obsługa anulowania i przechwyconych kontekstów dla strumieni asynchronicznych.
  • Rozpoznaj, kiedy nowy interfejs i źródło danych są preferowane przez wcześniejsze synchroniczne sekwencje danych.

Wymagania wstępne

Musisz skonfigurować maszynę do uruchamiania .NET Core, w tym kompilator języka C# 8.0. Kompilator języka C# 8 jest dostępny od wersji Visual Studio 2019 w wersji 16.3 lub zestawu .NET Core 3.0 SDK.

Musisz utworzyć token dostępu do GitHub, aby uzyskać dostęp do punktu końcowego GitHub GraphQL. Wybierz następujące uprawnienia dla tokenu GitHub access:

  • repo:status
  • public_repo

Zapisz token dostępu w bezpiecznym miejscu, aby można go było użyć do uzyskania dostępu do punktu końcowego GitHub API.

Ostrzeżenie

Dbaj o bezpieczeństwo osobistego tokenu dostępu. Każde oprogramowanie korzystające z osobistego tokenu dostępu może GitHub wywołań interfejsu API przy użyciu Twoich praw dostępu.

W tym samouczku założono, że znasz język C# i program .NET, w tym język Visual Studio lub interfejs wiersza polecenia platformy .NET Core.

Uruchamianie aplikacji startowej

Kod aplikacji startowej używanej w tym samouczku można uzyskać z repozytorium dotnet/docs w folderze csharp/whats-new/tutorials.

Aplikacja startowa to aplikacja konsolowa, która używa interfejsu GitHub GraphQL do pobierania ostatnich problemów napisanych w repozytorium dotnet/docs. Zacznij od oglądania następującego kodu dla metody aplikacji Main startowej:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Możesz ustawić zmienną środowiskową na osobisty token dostępu lub zastąpić ostatni argument w wywołaniu funkcji osobistym GitHubKey GetEnvVariable tokenem dostępu. Nie umieszczaj kodu dostępu w kodzie źródłowym, jeśli będziesz udostępniać źródło innym osobom. Nigdy nie należy przekazywać kodów dostępu do udostępnionego repozytorium źródłowego.

Po utworzeniu GitHub programu kod w programie tworzy obiekt raportowania postępu i Main token anulowania. Po utworzeniu tych obiektów wywołuje w celu Main runPagedQueryAsync pobrania ostatnich 250 utworzonych problemów. Po zakończeniu tego zadania zostaną wyświetlone wyniki.

Po uruchomieniu aplikacji startowej można dokonać pewnych ważnych obserwacji dotyczących sposobu uruchamiania tej aplikacji. Zobaczysz postęp zgłoszony dla każdej strony zwróconej z GitHub. Możesz obserwować zauważalną przerwę przed GitHub zwraca każdą nową stronę problemów. Na koniec problemy są wyświetlane dopiero po pobraniu wszystkich 10 stron z GitHub.

Badanie implementacji

Implementacja pokazuje, dlaczego zaobserwowano zachowanie omówione w poprzedniej sekcji. Sprawdź kod dla runPagedQueryAsync :

private static async Task<JArray> runPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();
        finalResults.Merge(issues(results)["nodes"]);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Skoncentrujmy się na algorytmie stronicowania i strukturze asynchronicznej poprzedniego kodu. (Szczegółowe informacje na temat interfejsu API programu GraphQ GitHub L GitHub GraphQL można znaleźć w dokumentacji programu GraphQL). Metoda runPagedQueryAsync wylicza problemy od najnowszych do najstarszych. Żąda 25 problemów na stronę i analizuje strukturę odpowiedzi, aby kontynuować pageInfo poprzednią stronę. Jest to kontynuacja standardowej obsługi stronicowania w programie GraphQL w przypadku odpowiedzi wielostronicowych. Odpowiedź zawiera pageInfo obiekt, który zawiera hasPreviousPages wartość i wartość startCursor używaną do żądania poprzedniej strony. Problemy znajdują się w nodes tablicy . Metoda runPagedQueryAsync dołącza te węzły do tablicy zawierającej wszystkie wyniki ze wszystkich stron.

Po odzyskaniu i przywróceniu strony wyników raportuje postęp i runPagedQueryAsync sprawdza, czy anulowano. Jeśli zażądano anulowania, element runPagedQueryAsync zgłasza wyjątek OperationCanceledException .

Ten kod ma kilka elementów, które można ulepszyć. Co najważniejsze, runPagedQueryAsync program musi przydzielić magazyn dla wszystkich zwróconych problemów. Ten przykład zatrzymuje się na poziomie 250 problemów, ponieważ pobranie wszystkich otwartych problemów wymagałoby znacznie więcej pamięci do przechowywania wszystkich pobranych problemów. Protokoły do obsługi raportów postępu i anulowania sprawiają, że algorytm jest trudniejszy do zrozumienia podczas pierwszego czytania. Zaangażowanych jest więcej typów i interfejsów API. Należy śledzić komunikację za pośrednictwem i skojarzonych z nią danych, aby zrozumieć, gdzie jest żądane anulowanie i gdzie CancellationTokenSource CancellationToken je udzielono.

Strumienie asynchroniczne zapewniają lepszy sposób

Strumienie asynchroniczne i skojarzony język obsługują wszystkie te problemy. Kod, który generuje sekwencję, może teraz używać funkcji do zwracania elementów w yield return metodzie, która została zadeklarowana za pomocą async modyfikatora. Strumień asynchroniczny można używać w pętli, tak jak w przypadku korzystania z await foreach dowolnej sekwencji w foreach pętli.

Te nowe funkcje językowe są zależne od trzech nowych interfejsów dodanych do programu .NET Standard 2.1 i zaimplementowanych w programie .NET Core 3.0:

Te trzy interfejsy powinny być znane większości deweloperów języka C#. Zachowują się one w sposób podobny do ich synchronicznych odpowiedników:

Jeden z typów, który może być nieznany, to System.Threading.Tasks.ValueTask . Struktura ValueTask udostępnia interfejs API podobny do klasy System.Threading.Tasks.Task . ValueTask Program jest używany w tych interfejsach ze względu na wydajność.

Konwertowanie na strumienie asynchroniczne

Następnie przekonwertuj runPagedQueryAsync metodę , aby wygenerować strumień asynchroniczny. Najpierw zmień sygnaturę obiektu , aby zwrócić element , i usuń token anulowania oraz obiekty postępu z listy parametrów, jak runPagedQueryAsync IAsyncEnumerable<JToken> pokazano w poniższym kodzie:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Kod początkowy przetwarza każdą stronę podczas pobierania strony, jak pokazano w poniższym kodzie:

finalResults.Merge(issues(results)["nodes"]);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Zastąp te trzy wiersze następującym kodem:

foreach (JObject issue in issues(results)["nodes"])
    yield return issue;

Można również usunąć deklarację wcześniejszej części tej metody i instrukcji , która finalResults return następuje po zmodyfikowanej pętli.

Zakończono zmiany w celu wygenerowania strumienia asynchronicznego. Gotowa metoda powinna przypominać następujący kod:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();

        foreach (JObject issue in issues(results)["nodes"])
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Następnie zmienisz kod, który zużywa kolekcję, aby korzystać ze strumienia asynchronicznego. Znajdź w programie następujący Main kod, który przetwarza kolekcję problemów:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await runPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Zastąp ten kod następującą await foreach pętlą:

int num = 0;
await foreach (var issue in runPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Nowy interfejs pochodzi IAsyncEnumerator<T> od . IAsyncDisposable Oznacza to, że poprzednia pętla będzie asynchronicznie likwidować strumień po zakończeniu pętli. Możesz sobie wyobrazić, że pętla wygląda podobnie do następującego kodu:

int num = 0;
var enumerator = runPagedQueryAsync(client, PagedIssueQuery, "docs").GetEnumeratorAsync();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Domyślnie elementy strumienia są przetwarzane w przechwyconym kontekście. Jeśli chcesz wyłączyć przechwytywanie kontekstu, użyj TaskAsyncEnumerableExtensions.ConfigureAwait metody rozszerzenia . Aby uzyskać więcej informacji na temat kontekstów synchronizacji i przechwytywania bieżącego kontekstu, zobacz artykuł na temat korzystania ze wzorca asynchronicznego opartego na zadaniach.

Strumienie asynchroniczne obsługują anulowanie przy użyciu tego samego protokołu co inne async metody. Aby obsługiwać anulowanie, należy zmodyfikować podpis metody iteratora asynchronicznego w następujący sposób:

private static async IAsyncEnumerable<JToken> runPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString());

        int totalCount = (int)issues(results)["totalCount"];
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"];
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"].ToString();
        issuesReturned += issues(results)["nodes"].Count();

        foreach (JObject issue in issues(results)["nodes"])
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]["repository"]["issues"];
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"];
}

Atrybut powoduje, że kompilator generuje kod dla , który sprawia, że token przekazany do treści EnumeratorCancellationAttribute IAsyncEnumerator<T> GetAsyncEnumerator iteratora asynchronicznego jest widoczny jako ten argument. W programie można sprawdzić stan tokenu i anulować runQueryAsync dalsze prace, jeśli jest to wymagane.

Użyj innej metody rozszerzenia, WithCancellation , aby przekazać token anulowania do strumienia asynchronicznego. Pętlę wyliczając problemy można zmodyfikować w następujący sposób:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in runPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Kod do ukończonego samouczka można uzyskać z repozytorium dotnet/docs w folderze csharp/whats-new/tutorials.

Uruchamianie ukończonej aplikacji

Uruchom ponownie aplikację. Zrównowazyj jej zachowanie z zachowaniem aplikacji startowej. Pierwsza strona wyników jest wyliczana natychmiast po ich wyliczeniu. Istnieje zauważalna pauza, ponieważ każda nowa strona jest żądana i pobierana, a wyniki następnej strony są szybko wyliczane. Blok nie jest wymagany do obsługi anulowania: wywołujący może zatrzymać try / catch wyliczanie kolekcji. Postęp jest wyraźnie raportowany, ponieważ strumień asynchroniczny generuje wyniki podczas pobierania każdej strony. Stan każdego zwróconego problemu jest bezproblemowo uwzględniany w await foreach pętli. Do śledzenia postępu nie jest potrzebny obiekt wywołania zwrotnego.

Ulepszenia użycia pamięci można zobaczyć, sprawdzając kod. Nie trzeba już przydzielać kolekcji do przechowywania wszystkich wyników przed ich wyliczeniem. Wywołujący może określić, jak korzystać z wyników i czy kolekcja magazynu jest potrzebna.

Uruchom zarówno aplikacje startowe, jak i gotowe, aby samodzielnie zaobserwować różnice między implementacjami. Po zakończeniu tego samouczka GitHub token dostępu utworzony podczas rozpoczynania pracy z tym samouczkiem. Jeśli osoba atakująca uzyska dostęp do tego tokenu, będzie mogła uzyskać dostęp do GitHub API przy użyciu Twoich poświadczeń.