Samouczek: generowanie i używanie strumieni asynchronicznych przy użyciu języka C# i platformy .NET

Asynchroniczne strumienie modeluje źródło danych przesyłanych strumieniowo. Strumienie danych często pobierają lub generują elementy asynchronicznie. Zapewniają one naturalny model programowania dla asynchronicznych źródeł danych przesyłanych strumieniowo.

Z tego samouczka dowiesz się, jak wykonywać następujące czynności:

  • Utwórz źródło danych, które generuje sekwencję elementów danych asynchronicznie.
  • Zużyj to źródło danych asynchronicznie.
  • Obsługa anulowania i przechwyconych kontekstów dla strumieni asynchronicznych.
  • Rozpoznaj, kiedy nowy interfejs i źródło danych są preferowane do wcześniejszych synchronicznych sekwencji danych.

Wymagania wstępne

Musisz skonfigurować maszynę do uruchamiania platformy .NET, w tym kompilatora języka C#. Kompilator języka C# jest dostępny w programie Visual Studio 2022 lub zestawie .NET SDK.

Należy utworzyć token dostępu usługi GitHub, aby uzyskać dostęp do punktu końcowego gitHub GraphQL. Wybierz następujące uprawnienia dla tokenu dostępu usługi GitHub:

  • repo:status
  • public_repo

Zapisz token dostępu w bezpiecznym miejscu, aby można było go użyć do uzyskania dostępu do punktu końcowego interfejsu API usługi GitHub.

Ostrzeżenie

Zachowaj bezpieczeństwo osobistego tokenu dostępu. Każde oprogramowanie z osobistym tokenem dostępu może wykonywać wywołania interfejsu API usługi GitHub przy użyciu praw dostępu.

W tym samouczku założono, że znasz języki C# i .NET, w tym program Visual Studio lub interfejs wiersza polecenia platformy .NET.

Uruchamianie aplikacji startowej

Kod aplikacji początkowej używanej w tym samouczku można pobrać z repozytorium dotnet/docs w folderze asynchronous-programming/snippets .

Aplikacja startowa to aplikacja konsolowa, która używa interfejsu GitHub GraphQL do pobierania ostatnich problemów napisanych w repozytorium dotnet/docs . Zacznij od przyjrzenia się następującej kodzie dla metody aplikacji Main początkowej:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Możesz ustawić zmienną środowiskową GitHubKey na osobisty token dostępu lub zamienić ostatni argument wywołania na GetEnvVariable osobisty token dostępu. Nie umieszczaj kodu dostępu w kodzie źródłowym, jeśli udostępnisz źródło innym osobom. Nigdy nie przekazuj kodów dostępu do udostępnionego repozytorium źródłowego.

Po utworzeniu klienta usługi GitHub kod w Main programie tworzy obiekt raportowania postępu i token anulowania. Po utworzeniu Main tych obiektów wywołania RunPagedQueryAsync w celu pobrania najnowszych 250 utworzonych problemów. Po zakończeniu tego zadania zostaną wyświetlone wyniki.

Po uruchomieniu aplikacji startowej możesz wykonać pewne ważne obserwacje dotyczące sposobu działania tej aplikacji. Zobaczysz postęp zgłoszony dla każdej strony zwróconej z usługi GitHub. Możesz zaobserwować zauważalną przerwę, zanim usługa GitHub zwróci każdą nową stronę problemów. Na koniec problemy są wyświetlane dopiero po pobraniu wszystkich 10 stron z usługi GitHub.

Badanie implementacji

Implementacja pokazuje, dlaczego zaobserwowano zachowanie omówione w poprzedniej sekcji. Sprawdź kod dla RunPagedQueryAsyncelementu :

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Pierwszą rzeczą, jaką wykonuje ta metoda, jest utworzenie obiektu POST przy użyciu GraphQLRequest klasy :

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

który pomaga utworzyć treść obiektu POST i poprawnie przekonwertować go na kod JSON przedstawiony jako pojedynczy ciąg za ToJsonText pomocą metody , która usuwa wszystkie znaki nowego wiersza z treści żądania oznaczając je znakiem \ ucieczki (ukośnik odwrotny).

Skoncentrujmy się na algorytmie stronicowania i strukturze asynchronicznego poprzedniego kodu. (Możesz skonsultować się z Dokumentacja narzędzia GitHub GraphQL zawierająca szczegółowe informacje na temat interfejsu API języka GraphQL w usłudze GitHub). Metoda RunPagedQueryAsync wylicza problemy od najnowszych do najstarszych. Żąda 25 problemów na stronę i analizuje pageInfo strukturę odpowiedzi, aby kontynuować pracę z poprzednią stroną. Jest to zgodne ze standardową obsługą stronicowania programu GraphQL dla odpowiedzi wielostronicowych. Odpowiedź zawiera pageInfo obiekt zawierający hasPreviousPages wartość i startCursor wartość używaną do żądania poprzedniej strony. Problemy znajdują się w tablicy nodes . Metoda RunPagedQueryAsync dołącza te węzły do tablicy zawierającej wszystkie wyniki ze wszystkich stron.

Po pobraniu i przywróceniu strony wyników RunPagedQueryAsync raporty postępu i sprawdzania anulowania. Jeśli zażądano anulowania, RunPagedQueryAsync zgłasza błąd OperationCanceledException.

Ten kod zawiera kilka elementów, które można ulepszyć. Co najważniejsze, RunPagedQueryAsync należy przydzielić magazyn dla wszystkich zwróconych problemów. Ten przykład zatrzymuje się na 250 problemach, ponieważ pobieranie wszystkich otwartych problemów wymagałoby znacznie większej ilości pamięci do przechowywania wszystkich pobranych problemów. Protokoły do obsługi raportów postępu i anulowania utrudniają zrozumienie algorytmu podczas pierwszego czytania. Jest zaangażowanych więcej typów i interfejsów API. Należy prześledzić komunikację za pośrednictwem elementu i skojarzonego CancellationTokenSourceCancellationToken z nim, aby zrozumieć, gdzie zażądano anulowania i gdzie zostało przyznane.

Strumienie asynchroniczne zapewniają lepszy sposób

Strumienie asynchroniczne i obsługa skojarzonego języka dotyczą wszystkich tych problemów. Kod, który generuje sekwencję, może teraz służyć yield return do zwracania elementów w metodzie, która została zadeklarowana za pomocą async modyfikatora. Możesz korzystać ze strumienia asynchronicznego przy użyciu await foreach pętli, tak jak w przypadku korzystania z dowolnej sekwencji przy użyciu foreach pętli.

Te nowe funkcje językowe zależą od trzech nowych interfejsów dodanych do platformy .NET Standard 2.1 i zaimplementowanych na platformie .NET Core 3.0:

Te trzy interfejsy powinny być znane większości deweloperów języka C#. Zachowują się w sposób podobny do ich synchronicznych odpowiedników:

Jednym z typów, które mogą być nieznane, jest System.Threading.Tasks.ValueTask. Struktura ValueTask udostępnia podobny interfejs API do System.Threading.Tasks.Task klasy . ValueTask jest używany w tych interfejsach ze względu na wydajność.

Konwertowanie na strumienie asynchroniczne

Następnie przekonwertuj metodę RunPagedQueryAsync , aby wygenerować strumień asynchroniczny. Najpierw zmień podpis RunPagedQueryAsync , aby zwrócić IAsyncEnumerable<JToken>element , i usuń token anulowania i obiekty postępu z listy parametrów, jak pokazano w poniższym kodzie:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Kod początkowy przetwarza każdą stronę podczas pobierania strony, jak pokazano w poniższym kodzie:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Zastąp te trzy wiersze następującym kodem:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Można również usunąć deklarację wcześniejszej finalResults części tej metody i return instrukcję zgodną z zmodyfikowaną pętlą.

Ukończono zmiany w celu wygenerowania strumienia asynchronicznego. Zakończona metoda powinna przypominać następujący kod:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Następnie zmienisz kod, który używa kolekcji do korzystania ze strumienia asynchronicznego. Znajdź w tym pliku następujący kod Main , który przetwarza kolekcję problemów:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Zastąp ten kod następującą await foreach pętlą:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Nowy interfejs IAsyncEnumerator<T> pochodzi z klasy IAsyncDisposable. Oznacza to, że poprzednia pętla asynchronicznie usunie strumień po zakończeniu pętli. Możesz sobie wyobrazić, że pętla wygląda podobnie do następującego kodu:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Domyślnie elementy strumienia są przetwarzane w przechwyconym kontekście. Jeśli chcesz wyłączyć przechwytywanie kontekstu, użyj TaskAsyncEnumerableExtensions.ConfigureAwait metody rozszerzenia. Aby uzyskać więcej informacji na temat kontekstów synchronizacji i przechwytywania bieżącego kontekstu, zobacz artykuł dotyczący korzystania ze wzorca asynchronicznego opartego na zadaniach.

Strumienie asynchroniczne obsługują anulowanie przy użyciu tego samego protokołu co inne async metody. Podpis metody iteratora asynchronicznego można zmodyfikować w następujący sposób, aby obsługiwać anulowanie:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Atrybut System.Runtime.CompilerServices.EnumeratorCancellationAttribute powoduje, że kompilator generuje kod dla IAsyncEnumerator<T> elementu , który sprawia, że token jest przekazywany do GetAsyncEnumerator widocznej treści iteratora asynchronicznego jako tego argumentu. Wewnątrz runQueryAsyncprogramu można sprawdzić stan tokenu i anulować dalszą pracę, jeśli jest to wymagane.

Aby przekazać token anulowania do strumienia asynchronicznego, należy użyć innej metody rozszerzenia . WithCancellation Zmodyfikujesz pętlę wyliczającą problemy w następujący sposób:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Kod ukończonego samouczka można pobrać z repozytorium dotnet/docs w folderze asynchronous-programming/snippets .

Uruchamianie gotowej aplikacji

Uruchom ponownie aplikację. Kontrastuje z zachowaniem aplikacji początkowej. Pierwsza strona wyników jest wyliczana natychmiast po udostępnieniu. Istnieje zauważalna pauza, ponieważ każda nowa strona jest żądana i pobierana, a wyniki następnej strony są szybko wyliczane. Blok try / catch nie jest wymagany do obsługi anulowania: obiekt wywołujący może zatrzymać wyliczanie kolekcji. Postęp jest wyraźnie zgłaszany, ponieważ strumień asynchroniczny generuje wyniki podczas pobierania każdej strony. Stan każdego zwróconego problemu await foreach jest bezproblemowo uwzględniony w pętli. Do śledzenia postępu nie jest potrzebny obiekt wywołania zwrotnego.

Ulepszenia użycia pamięci można zobaczyć, sprawdzając kod. Nie trzeba już przydzielać kolekcji do przechowywania wszystkich wyników przed ich wyliczeniem. Obiekt wywołujący może określić, jak korzystać z wyników i czy potrzebna jest kolekcja magazynu.

Uruchom zarówno aplikacje początkowe, jak i gotowe, a różnice między implementacjami można zaobserwować samodzielnie. Po zakończeniu tego samouczka możesz usunąć token dostępu usługi GitHub utworzony podczas pracy z tym samouczkiem. Jeśli osoba atakująca uzyskała dostęp do tego tokenu, może uzyskać dostęp do interfejsów API usługi GitHub przy użyciu poświadczeń.

W tym samouczku użyto strumieni asynchronicznych do odczytania poszczególnych elementów z interfejsu API sieciowego zwracającego strony danych. Strumienie asynchroniczne mogą również odczytywać ze strumieni "nigdy nie kończących się", takich jak znacznik giełdowy lub urządzenie czujnika. Wywołanie , aby MoveNextAsync zwrócić następny element, gdy tylko jest dostępny.