Пошаговое руководство. Создание конвейера потока данных

Чтобы получать сообщения из блоков источника, можно использовать методы DataflowBlock.Receive, DataflowBlock.ReceiveAsync и DataflowBlock.TryReceive или же объединить блоки сообщений в конвейер потока данных. Конвейер потока данных — это цепочка компонентов, или блоков потока данных, каждый из которых выполняет конкретную задачу в рамках более крупной цели. Каждый блок потока данных в конвейере потока данных выполняет работу, когда получает сообщение от другого блока потока данных. Можно сравнить это с линией сборки автомобилей. Как при продвижении автомобиля по сборочной линии одна станция собирает раму, следующая — устанавливает двигатель и так далее. Так как при этом можно собирать одновременно много автомобилей, линия сборки обеспечивает большую производительность, чем полная сборка автомобилей по одному.

В этом документе демонстрируется конвейер потока данных, который скачивает с веб-сайта книгу "Илиада" Гомера и выполняет по ее тексту поиск, сопоставляющий каждое слово с другими словами, содержащими первые символы этого слова в обратном порядке. Формирование конвейера потока данных в этом документе состоит из следующих шагов.

  1. Создание блоков потока данных, которые участвуют в конвейере.

  2. Подсоединение каждого блока потока данных к следующему блоку в конвейере. Каждый блок получает в качестве входных данных выходные данные предыдущего блока в конвейере.

  3. Для каждого блока потока данных создайте задачу продолжения, которая переводит следующий блок в завершенное состояние, после того как заканчивается выполнение предыдущего блока.

  4. Отправьте данные в начало конвейера.

  5. Пометьте начало конвейера как завершенное.

  6. Подождите, пока конвейер не завершит всю работу.

Предварительные требования

Прежде чем начать выполнение этого пошагового руководства, ознакомьтесь с документом Поток данных.

Создание консольного приложения

В Visual Studio создайте проект "Консольное приложение" на Visual C# или Visual Basic. Установите пакет System.Threading.Tasks.Dataflow NuGet.

Примечание

Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не поставляется с .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Проект и выполните поиск пакета System.Threading.Tasks.Dataflow в Интернете. Вы также можете установить его, выполнив в .NET Core CLI команду dotnet add package System.Threading.Tasks.Dataflow.

Добавьте следующий код в проект для создания простого приложения.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Создание блоков потоков данных

Добавьте следующий код в метод Main для создания блоков потока данных, которые участвуют в конвейере. Таблица ниже описывает роль каждой части конвейера.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Член Type Описание
downloadString TransformBlock<TInput,TOutput> Загружает текст книги из Интернета.
createWordList TransformBlock<TInput,TOutput> Разбивает текст книги на массив слов.
filterWordList TransformBlock<TInput,TOutput> Удаляет из массива слов короткие слова и повторяющиеся значения.
findReversedWords TransformManyBlock<TInput,TOutput> Находит все слова в отфильтрованной коллекции массива слов, которые также встречаются в массиве слов с обратным порядком символов.
printReversedWords ActionBlock<TInput> Выводит на консоль слова и соответствующие обратные слова.

Хотя можно объединить несколько шагов конвейера потока данных в этом примере в один шаг, на примере демонстрируется концепция создания нескольких независимых задач потока данных для выполнения более крупной задачи. В примере используется TransformBlock<TInput,TOutput>, чтобы позволить каждой части конвейера выполнить операцию с ее входными данными и отправить результат на следующий этап конвейера. Часть конвейера findReversedWords — это объект TransformManyBlock<TInput,TOutput>, поскольку он создает несколько независимых выходных данных для каждого входного значения. Конец конвейера printReversedWords — это объект ActionBlock<TInput>, так как он выполняет действие со входными данными и не создает выходных данных.

Формирование конвейера

Добавьте следующий код, чтобы подсоединить каждый блок к следующему блоку конвейера.

При вызове метода LinkTo для подсоединения блока потока данных источника к блоку потока данных целевого объекта, блок потока данных источника передает данные блокам целевых объектов, как только данные становятся доступны. Если вы предоставите DataflowLinkOptions, у которого PropagateCompletion имеет значение true, успешное или неуспешное завершение одного блока в конвейере приведет к завершению следующего блока в конвейере.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Передача данных в конвейере

Добавьте следующий код, чтобы поместить url-адрес книги "Илиада" Гомера в начало конвейера потока данных.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

В этом примере используется DataflowBlock.Post для синхронной отправки данных в начало конвейера. Используйте метод DataflowBlock.SendAsync при необходимости асинхронной отправки данных в узел потока данных.

Завершение работы конвейера

Добавьте следующий код, чтобы пометить начало конвейера как завершенное. Начало конвейера отмечает свое выполнение, когда завершит обработку всех накопленных в буфере сообщений.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Этот пример отправляет один url-адрес через конвейер потока данных для обработки. Если на вход подается более одного элемента данных, вызовите метод IDataflowBlock.Complete после передачи всех входных данных. Этот шаг можно пропустить, если в вашем приложении нет определенной точки, в которой данные более недоступны или приложение не должно ожидать завершения работы конвейера.

Ожидание завершения работы конвейера

Добавьте следующий код, чтобы ожидать завершения работы конвейера. Операция в целом считается завершенной, когда завершит работу конец конвейера.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Можно ожидать завершения потока данных из любого потока или из нескольких потоков одновременно.

Полный пример

В следующем примере приведен полный код для этого руководства.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Следующие шаги

В этом примере один url-адрес отправляется на обработку через конвейер потока данных. Если через конвейер отправляется более одного входного значения, можно внедрить в приложении форму параллелизма, которая напоминает перемещение деталей по автомобильному заводу. Когда первая часть конвейера отправляет свой результат второй части, она может обрабатывать другой элемент параллельно с тем, как вторая часть обрабатывает первый результат.

Параллелизм, достигаемый за счет применения конвейеров потока данных, обозначается как грубый параллелизм, поскольку он включает небольшое количество крупных задач. Можно также использовать более точный параллелизм, включив в конвейер потока данных задачи меньшего размера, выполняющиеся быстрее. В этот пример конвейера включен элемент findReversedWords, который использует метод PLINQ для параллельной обработки нескольких элементов в списке работ. Использование точного параллелизма в конвейере с несколькими крупными функциями может повысить общую пропускную способность.

Вы также можете подсоединить блок-источник потока данных к нескольким целевым блокам для создания сети потока данных. Перегруженная версия метода LinkTo принимает объект Predicate<T>, который определяет, примет ли целевой объект сообщение, основываясь на его значении. Большинство типов блоков потока данных, выполняющих роль источников, предлагают сообщения всем подключенным целевым блокам в порядке их подсоединения, пока один из блоков не примет это сообщение. С помощью этого механизма фильтрации можно создавать системы связанных блоков потока данных, которые будут направлять определенные по одному пути, а другие данные — по другому пути. Пример, использующий фильтрацию для создания сети потока данных, см. в статье Пошаговое руководство. Использование потока данных в приложении Windows Forms

См. также