İzlenecek yol: Veri Akışı Ardışık Düzeni Oluşturma

Kaynak bloklardan ileti almak için , DataflowBlock.ReceiveAsyncve DataflowBlock.TryReceive yöntemlerini kullanabilirsinizDataflowBlock.Receive, ancak bir veri akışı işlem hattı oluşturmak için ileti bloklarını da bağlayabilirsiniz. Veri akışı işlem hattı, her biri daha büyük bir hedefe katkıda bulunan belirli bir görevi gerçekleştiren bir dizi bileşen veya veri akışı bloğudur. Veri akışı işlem hattındaki her veri akışı bloğu, başka bir veri akışı bloğundan ileti aldığında iş gerçekleştirir. Buna benzetme, otomobil üretimi için bir montaj hattıdır. Her araç montaj hattından geçerken, bir istasyon çerçeveyi monte eder, bir sonraki motoru yükler ve bu şekilde devam eder. Montaj hattı aynı anda birden fazla aracın monte edilmesine olanak sağladığından, tam araçların teker teker montajından daha iyi aktarım hızı sağlar.

Bu belgede, Homer'ın İlyadası kitabını bir web sitesinden indiren ve tek tek sözcükleri ilk sözcüğün karakterlerini ters çeviren sözcüklerle eşleştirmek için metinde arama yapılan bir veri akışı işlem hattı gösterilmektedir. Bu belgede veri akışı işlem hattının oluşumu aşağıdaki adımlardan oluşur:

  1. İşlem hattına katılan veri akışı bloklarını oluşturun.

  2. Her veri akışı bloğunu işlem hattındaki bir sonraki bloğa bağlayın. Her blok, işlem hattındaki önceki bloğun çıkışını girdi olarak alır.

  3. Her veri akışı bloğu için, bir sonraki bloğu önceki blok tamamlandıktan sonra tamamlanmış duruma ayarlayan bir devamlılık görevi oluşturun.

  4. Verileri işlem hattının başına gönderin.

  5. İşlem hattının başını tamamlandı olarak işaretleyin.

  6. İşlem hattının tüm çalışmayı tamamlanmasını bekleyin.

Önkoşullar

Bu kılavuza başlamadan önce Veri Akışı'nı okuyun.

Konsol Uygulaması Oluşturma

Visual Studio'da bir Visual C# veya Visual Basic Konsol Uygulaması projesi oluşturun. System.Threading.Tasks.Dataflow NuGet paketini yükleyin.

Not

TPL Veri Akışı Kitaplığı ( System.Threading.Tasks.Dataflow ad alanı) .NET ile dağıtılmaz. Ad alanını System.Threading.Tasks.Dataflow Visual Studio'ya yüklemek için projenizi açın, Projemenüsünden NuGet Paketlerini Yönet'i seçin ve çevrimiçi ortamda System.Threading.Tasks.Dataflow paketi arayın. Alternatif olarak, .NET Core CLI kullanarak yüklemek için komutunu çalıştırın dotnet add package System.Threading.Tasks.Dataflow.

Temel uygulamayı oluşturmak için projenize aşağıdaki kodu ekleyin.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Veri Akışı Blokları Oluşturma

İşlem hattına Main katılan veri akışı bloklarını oluşturmak için yöntemine aşağıdaki kodu ekleyin. Aşağıdaki tabloda işlem hattının her üyesinin rolü özetlenir.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Üye Tür Description
downloadString TransformBlock<TInput,TOutput> Kitap metnini Web'den indirir.
createWordList TransformBlock<TInput,TOutput> Kitap metnini bir sözcük dizisine ayırır.
filterWordList TransformBlock<TInput,TOutput> Kısa sözcükleri ve yinelenenleri sözcük dizisinden kaldırır.
findReversedWords TransformManyBlock<TInput,TOutput> Filtrelenmiş sözcük dizisi koleksiyonundaki, tam tersi de sözcük dizisinde olan tüm sözcükleri bulur.
printReversedWords ActionBlock<TInput> Konsolda sözcükleri ve karşılık gelen ters sözcükleri görüntüler.

Bu örnekteki veri akışı işlem hattındaki birden çok adımı tek adımda birleştirebilse de, örnekte daha büyük bir görevi gerçekleştirmek için birden çok bağımsız veri akışı görevi oluşturma kavramı gösterilmektedir. Örnek, işlem hattının her üyesinin giriş verileri üzerinde bir işlem gerçekleştirmesini ve sonuçları işlem hattındaki bir sonraki adıma göndermesini sağlamak için kullanır TransformBlock<TInput,TOutput> . İşlem findReversedWords hattının üyesi, her giriş için birden çok bağımsız çıkış ürettiğinden bir TransformManyBlock<TInput,TOutput> nesnedir. İşlem hattının kuyruğu, printReversedWordsgirişinde bir ActionBlock<TInput> eylem gerçekleştirdiğinden ve bir sonuç üretmediğinden bir nesnedir.

İşlem Hattını Oluşturma

Her bloğu işlem hattındaki bir sonraki bloğa bağlamak için aşağıdaki kodu ekleyin.

Bir kaynak veri akışı bloğunu LinkTo hedef veri akışı bloğuna bağlamak için yöntemini çağırdığınızda, kaynak veri akışı bloğu verileri kullanılabilir hale geldikçe hedef bloğa yayılır. Ayrıca, işlem hattındaki bir bloğun true, başarılı veya başarısız bir şekilde tamamlanmasını sağlarsanız DataflowLinkOptionsPropagateCompletion işlem hattında bir sonraki bloğun tamamlanmasına neden olur.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

İşlem Hattına Veri Gönderme

The Iliad of Homer kitabının URL'sini veri akışı işlem hattının başına göndermek için aşağıdaki kodu ekleyin.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

Bu örnek, işlem hattının başına zaman uyumlu olarak veri göndermek için kullanır DataflowBlock.Post . Veri akışı düğümüne DataflowBlock.SendAsync zaman uyumsuz olarak veri göndermeniz gerektiğinde yöntemini kullanın.

İşlem Hattı Etkinliği Tamamlanıyor

İşlem hattının başını tamamlandı olarak işaretlemek için aşağıdaki kodu ekleyin. İşlem hattının başı, arabelleğe alınan tüm iletileri işledikten sonra tamamlanmasını yayılır.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Bu örnek, işlenecek veri akışı işlem hattı üzerinden bir URL gönderir. bir işlem hattı üzerinden birden fazla giriş gönderirseniz, tüm girişi gönderdikten sonra yöntemini çağırın IDataflowBlock.Complete . Uygulamanızın artık kullanılabilir durumda olmayan iyi tanımlanmış bir noktası yoksa veya uygulamanın işlem hattının bitmesini beklemesi gerekmediyse bu adımı atlayabilirsiniz.

İşlem Hattının Bitmeye Hazır Olması

İşlem hattının bitmesini beklemek için aşağıdaki kodu ekleyin. İşlem hattının kuyruğu tamamlandığında genel işlem tamamlanır.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Herhangi bir iş parçacığından veya birden çok iş parçacığından aynı anda veri akışının tamamlanmasını bekleyebilirsiniz.

Tam Örnek

Aşağıdaki örnekte bu kılavuzun tam kodu gösterilmektedir.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Sonraki Adımlar

Bu örnek, veri akışı işlem hattı aracılığıyla işlenmek üzere bir URL gönderir. İşlem hattı üzerinden birden fazla giriş değeri gönderirseniz, uygulamanıza parçaların bir otomobil fabrikasında nasıl hareket ettirebileceğine benzeyen bir paralellik biçimi ekleyebilirsiniz. İşlem hattının ilk üyesi sonucunu ikinci üyeye gönderdiğinde, ikinci üye ilk sonucu işlerken başka bir öğeyi paralel olarak işleyebilir.

Veri akışı işlem hatları kullanılarak elde edilen paralellik, genellikle daha az, daha büyük görevlerden oluştuğundan kaba ayrıntılı paralellik olarak bilinir. Ayrıca, veri akışı işlem hattında daha küçük, kısa çalışan görevlerin daha ayrıntılı paralelliğini de kullanabilirsiniz. Bu örnekte işlem hattının findReversedWords üyesi, iş listesindeki birden çok öğeyi paralel olarak işlemek için PLINQ kullanır. Kaba bir işlem hattında ince ayrıntılı paralellik kullanılması genel aktarım hızını geliştirebilir.

Veri akışı ağı oluşturmak için bir kaynak veri akışı bloğunu birden çok hedef bloğuna da bağlayabilirsiniz. yönteminin LinkTo aşırı yüklenmiş sürümü, hedef bloğun değerine göre her iletiyi kabul edip etmediğini tanımlayan bir Predicate<T> nesnesi alır. Kaynak olarak davranan veri akışı bloğu türlerinin çoğu, bloklardan biri bu iletiyi kabul edene kadar, bağlı oldukları sırada tüm bağlı hedef bloklara ileti sunar. Bu filtreleme mekanizmasını kullanarak, belirli verileri bir yol ve diğer verileri başka bir yol üzerinden yönlendiren bağlı veri akışı blokları sistemleri oluşturabilirsiniz. Veri akışı ağı oluşturmak için filtreleme kullanan bir örnek için bkz. İzlenecek Yol: Windows Forms Bir Uygulamada Veri Akışı Kullanma.

Ayrıca bkz.