Przewodnik: Tworzenie potoku przepływu danych

Chociaż można używać DataflowBlock.Receivemetod , DataflowBlock.ReceiveAsynci DataflowBlock.TryReceive do odbierania komunikatów z bloków źródłowych, można również połączyć bloki komunikatów w celu utworzenia potoku przepływu danych. Potok przepływu danych to seria składników lub bloków przepływu danych, z których każdy wykonuje określone zadanie, które przyczynia się do większego celu. Każdy blok przepływu danych w potoku przepływu danych wykonuje pracę, gdy odbiera komunikat z innego bloku przepływu danych. Analogią do tego jest linia montażowa do produkcji samochodów. Gdy każdy pojazd przechodzi przez linię montażową, jedna stacja montuje ramkę, następny instaluje silnik itd. Ponieważ linia montażowa umożliwia montaż wielu pojazdów jednocześnie, zapewnia lepszą przepływność niż montaż kompletnych pojazdów jednocześnie.

Ten dokument przedstawia potok przepływu danych, który pobiera książkę Iliad of Homer z witryny internetowej i wyszukuje tekst, aby dopasować poszczególne wyrazy do wyrazów, które odwracają znaki pierwszego słowa. Tworzenie potoku przepływu danych w tym dokumencie składa się z następujących kroków:

  1. Utwórz bloki przepływu danych, które uczestniczą w potoku.

  2. Połącz każdy blok przepływu danych z następnym blokiem w potoku. Każdy blok otrzymuje jako dane wejściowe danych wyjściowych poprzedniego bloku w potoku.

  3. Dla każdego bloku przepływu danych utwórz zadanie kontynuacji, które ustawia następny blok na stan ukończony po zakończeniu poprzedniego bloku.

  4. Opublikuj dane w nagłówku potoku.

  5. Oznacz głowę potoku jako ukończoną.

  6. Poczekaj, aż potok zakończy całą pracę.

Wymagania wstępne

Przeczytaj przepływ danych przed rozpoczęciem tego przewodnika.

Tworzenie aplikacji konsoli

W programie Visual Studio utwórz projekt aplikacji konsolowej Visual C# lub Visual Basic. Zainstaluj pakiet NuGet System.Threading.Tasks.Dataflow.

Uwaga

Biblioteka przepływu danych TPL ( System.Threading.Tasks.Dataflow przestrzeń nazw) nie jest dystrybuowana za pomocą platformy .NET. Aby zainstalować System.Threading.Tasks.Dataflow przestrzeń nazw w programie Visual Studio, otwórz projekt, wybierz pozycję Zarządzaj pakietami NuGet z menu Project i wyszukaj pakiet w trybie online System.Threading.Tasks.Dataflow . Alternatywnie, aby zainstalować go przy użyciu interfejsu wiersza polecenia platformy .NET Core, uruchom polecenie dotnet add package System.Threading.Tasks.Dataflow.

Dodaj następujący kod do projektu, aby utworzyć podstawową aplikację.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Tworzenie bloków przepływu danych

Dodaj następujący kod do metody , Main aby utworzyć bloki przepływu danych, które uczestniczą w potoku. Poniższa tabela zawiera podsumowanie roli każdego elementu członkowskiego potoku.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Członek Typ Opis
downloadString TransformBlock<TInput,TOutput> Pobiera tekst książki z Sieci Web.
createWordList TransformBlock<TInput,TOutput> Oddziela tekst książki na tablicę wyrazów.
filterWordList TransformBlock<TInput,TOutput> Usuwa krótkie wyrazy i duplikaty z tablicy wyrazów.
findReversedWords TransformManyBlock<TInput,TOutput> Znajduje wszystkie wyrazy w przefiltrowanej kolekcji tablicy wyrazów, których odwrotnie występuje również w tablicy wyrazów.
printReversedWords ActionBlock<TInput> Wyświetla wyrazy i odpowiadające im odwrotne wyrazy do konsoli.

Chociaż można połączyć wiele kroków w potoku przepływu danych w tym przykładzie w jednym kroku, w przykładzie przedstawiono koncepcję tworzenia wielu niezależnych zadań przepływu danych w celu wykonania większego zadania. W przykładzie użyto TransformBlock<TInput,TOutput> polecenia , aby umożliwić każdemu elementowi członkowskiemu potoku wykonanie operacji na danych wejściowych i wysłanie wyników do następnego kroku w potoku. Element findReversedWords członkowski potoku jest obiektem TransformManyBlock<TInput,TOutput> , ponieważ generuje wiele niezależnych danych wyjściowych dla każdego danych wejściowych. Ogon potoku , jest obiektemActionBlock<TInput>, printReversedWordsponieważ wykonuje akcję na danych wejściowych i nie generuje wyniku.

Tworzenie potoku

Dodaj następujący kod, aby połączyć każdy blok z następnym blokiem w potoku.

Po wywołaniu LinkTo metody w celu połączenia bloku przepływu danych źródłowego z docelowym blokiem przepływu danych blok przepływu danych źródłowego propaguje dane do bloku docelowego, gdy dane staną się dostępne. Jeśli zostanie również podane ustawienie DataflowLinkOptionsPropagateCompletion true, pomyślne lub nieudane ukończenie jednego bloku w potoku spowoduje ukończenie następnego bloku w potoku.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Publikowanie danych w potoku

Dodaj następujący kod, aby opublikować adres URL książki Iliad of Homer na czele potoku przepływu danych.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

W tym przykładzie użyto DataflowBlock.Post do synchronicznego wysyłania danych do głowy potoku. DataflowBlock.SendAsync Użyj metody , gdy musisz asynchronicznie wysyłać dane do węzła przepływu danych.

Kończenie działania potoku

Dodaj następujący kod, aby oznaczyć nagłówek potoku jako ukończony. Szef potoku propaguje jego ukończenie po przetwarzaniu wszystkich buforowanych komunikatów.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Ten przykład wysyła jeden adres URL za pośrednictwem potoku przepływu danych do przetworzenia. Jeśli wysyłasz więcej niż jedno dane wejściowe za pośrednictwem potoku, wywołaj IDataflowBlock.Complete metodę po przesłaniu wszystkich danych wejściowych. Ten krok można pominąć, jeśli aplikacja nie ma dobrze zdefiniowanego punktu, w którym dane nie są już dostępne lub aplikacja nie musi czekać na zakończenie potoku.

Oczekiwanie na zakończenie potoku

Dodaj następujący kod, aby poczekać na zakończenie potoku. Ogólna operacja jest zakończona po zakończeniu końca potoku.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Możesz poczekać na ukończenie przepływu danych z dowolnego wątku lub z wielu wątków jednocześnie.

Kompletny przykład

Poniższy przykład przedstawia kompletny kod dla tego przewodnika.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Następne kroki

W tym przykładzie jest wysyłany jeden adres URL do przetwarzania za pośrednictwem potoku przepływu danych. Jeśli wysyłasz więcej niż jedną wartość wejściową za pośrednictwem potoku, możesz wprowadzić formę równoległości do aplikacji, która przypomina sposób przechodzenia części przez fabrykę samochodów. Gdy pierwszy element członkowski potoku wyśle jego wynik do drugiego elementu członkowskiego, może przetworzyć inny element równolegle, ponieważ drugi element członkowski przetwarza pierwszy wynik.

Równoległość, która jest osiągana przy użyciu potoków przepływu danych, jest nazywana gruboziarnistym równoległością , ponieważ zwykle składa się z mniejszej liczby większych zadań. Można również użyć bardziej szczegółowego równoległości mniejszych, krótko działających zadań w potoku przepływu danych. W tym przykładzie findReversedWords element członkowski potoku używa plINQ do przetwarzania wielu elementów na liście roboczej równolegle. Zastosowanie precyzyjnego równoległości w grubo ziarnistym potoku może poprawić ogólną przepływność.

Blok przepływu danych źródłowych można również połączyć z wieloma blokami docelowymi w celu utworzenia sieci przepływu danych. Przeciążona wersja LinkTo metody przyjmuje Predicate<T> obiekt określający, czy blok docelowy akceptuje każdy komunikat na podstawie jego wartości. Większość typów bloków przepływu danych, które działają jako źródła, oferują komunikaty do wszystkich połączonych bloków docelowych, w kolejności, w której zostały połączone, dopóki jeden z bloków nie zaakceptuje tego komunikatu. Za pomocą tego mechanizmu filtrowania można tworzyć systemy połączonych bloków przepływu danych, które kierują określone dane za pośrednictwem jednej ścieżki i innych danych przez inną ścieżkę. Aby zapoznać się z przykładem, który używa filtrowania do tworzenia sieci przepływu danych, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms.

Zobacz też