Share via


Walkthrough: Een gegevensstroompijplijn maken

Hoewel u de DataflowBlock.Receivemethoden , DataflowBlock.ReceiveAsyncen DataflowBlock.TryReceive kunt gebruiken om berichten van bronblokken te ontvangen, kunt u ook berichtblokken verbinden om een gegevensstroompijplijn te vormen. Een gegevensstroompijplijn is een reeks onderdelen of gegevensstroomblokken, die elk een specifieke taak uitvoeren die bijdraagt aan een groter doel. Elk gegevensstroomblok in een gegevensstroompijplijn werkt wanneer het een bericht van een ander gegevensstroomblok ontvangt. Een analogie hierop is een assemblagelijn voor de automobielindustrie. Als elk voertuig door de assemblagelijn gaat, wordt het ene station het frame geassembleerd, het volgende installeert de motor, enzovoort. Omdat met een assemblagelijn meerdere voertuigen tegelijk kunnen worden geassembleerd, biedt het een betere doorvoer dan het een voor een assembleren van complete voertuigen.

Dit document toont een gegevensstroompijplijn waarmee het boek De Ilias van Homer van een website wordt gedownload en de tekst wordt gezocht om afzonderlijke woorden te vinden met woorden die de tekens van het eerste woord omkeren. De vorming van de gegevensstroompijplijn in dit document bestaat uit de volgende stappen:

  1. Maak de gegevensstroomblokken die deel uitmaken van de pijplijn.

  2. Verbind elk gegevensstroomblok met het volgende blok in de pijplijn. Elk blok ontvangt als invoer de uitvoer van het vorige blok in de pijplijn.

  3. Maak voor elk gegevensstroomblok een vervolgtaak waarmee het volgende blok wordt ingesteld op de voltooide status nadat het vorige blok is voltooid.

  4. Plaats gegevens op het hoofd van de pijplijn.

  5. Markeer het hoofd van de pijplijn als voltooid.

  6. Wacht totdat de pijplijn alle werkzaamheden heeft voltooid.

Vereisten

Lees Gegevensstroom voordat u aan deze procedure begint.

Een consoletoepassing maken

Maak in Visual Studio een Visual C#- of Visual Basic-consoletoepassingsproject. Installeer het NuGet-pakket System.Threading.Tasks.Dataflow.

Notitie

De TPL-gegevensstroombibliotheek (de System.Threading.Tasks.Dataflow naamruimte) wordt niet gedistribueerd met .NET. Als u de System.Threading.Tasks.Dataflow naamruimte in Visual Studio wilt installeren, opent u uw project, kiest u NuGet-pakketten beheren in het menu Project en zoekt u online naar het System.Threading.Tasks.Dataflow pakket. U kunt het ook installeren met behulp van de .NET Core CLI door uit te voeren dotnet add package System.Threading.Tasks.Dataflow.

Voeg de volgende code toe aan uw project om de basistoepassing te maken.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

De gegevensstroomblokken maken

Voeg de volgende code toe aan de Main methode om de gegevensstroomblokken te maken die deel uitmaken van de pijplijn. De volgende tabel bevat een overzicht van de rol van elk lid van de pijplijn.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Lid Type Beschrijving
downloadString TransformBlock<TInput,TOutput> Hiermee downloadt u de boektekst van het web.
createWordList TransformBlock<TInput,TOutput> Hiermee wordt de boektekst gescheiden in een matrix met woorden.
filterWordList TransformBlock<TInput,TOutput> Hiermee verwijdert u korte woorden en dubbele waarden uit het woordmatrix.
findReversedWords TransformManyBlock<TInput,TOutput> Hiermee vindt u alle woorden in de gefilterde woordmatrixverzameling waarvan het omgekeerde ook voorkomt in het woord matrix.
printReversedWords ActionBlock<TInput> Geeft woorden en de bijbehorende omgekeerde woorden weer in de console.

Hoewel u in dit voorbeeld meerdere stappen in de gegevensstroompijplijn in één stap kunt combineren, illustreert het voorbeeld het concept van het opstellen van meerdere onafhankelijke gegevensstroomtaken om een grotere taak uit te voeren. In het voorbeeld wordt gebruikt TransformBlock<TInput,TOutput> om elk lid van de pijplijn in staat te stellen een bewerking uit te voeren op de invoergegevens en de resultaten naar de volgende stap in de pijplijn te verzenden. Het findReversedWords lid van de pijplijn is een TransformManyBlock<TInput,TOutput> -object omdat het meerdere onafhankelijke uitvoer voor elke invoer produceert. De staart van de pijplijn, printReversedWords, is een ActionBlock<TInput> object omdat het een actie uitvoert op de invoer en geen resultaat produceert.

De pijplijn vormen

Voeg de volgende code toe om elk blok te verbinden met het volgende blok in de pijplijn.

Wanneer u de LinkTo methode aanroept om een brongegevensstroomblok te verbinden met een doelgegevensstroomblok, geeft het brongegevensstroomblok gegevens door aan het doelblok zodra er gegevens beschikbaar zijn. Als u ook DataflowLinkOptions instelt op PropagateCompletion waar, zorgt een geslaagde of mislukte voltooiing van een blok in de pijplijn ervoor dat het volgende blok in de pijplijn wordt voltooid.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Gegevens posten in de pijplijn

Voeg de volgende code toe om de URL van het boek The Ilias of Homer te posten op het hoofd van de gegevensstroompijplijn.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

In dit voorbeeld wordt gebruikt DataflowBlock.Post om gegevens synchroon naar het hoofd van de pijplijn te verzenden. Gebruik de DataflowBlock.SendAsync methode wanneer u asynchroon gegevens naar een gegevensstroomknooppunt moet verzenden.

Pijplijnactiviteit voltooien

Voeg de volgende code toe om de kop van de pijplijn als voltooid te markeren. De kop van de pijplijn voert de voltooiing door nadat alle gebufferde berichten zijn verwerkt.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

In dit voorbeeld wordt één URL verzonden via de gegevensstroompijplijn die moet worden verwerkt. Als u meer dan één invoer via een pijplijn verzendt, roept u de IDataflowBlock.Complete methode aan nadat u alle invoer hebt verzonden. U kunt deze stap weglaten als uw toepassing geen goed gedefinieerd punt heeft waarop gegevens niet meer beschikbaar zijn of als de toepassing niet hoeft te wachten tot de pijplijn is voltooid.

Wachten tot de pijplijn is voltooid

Voeg de volgende code toe om te wachten tot de pijplijn is voltooid. De algehele bewerking is voltooid wanneer het einde van de pijplijn is voltooid.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

U kunt wachten tot de gegevensstroom is voltooid vanuit een thread of vanuit meerdere threads tegelijk.

Het volledige voorbeeld

In het volgende voorbeeld ziet u de volledige code voor dit scenario.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Volgende stappen

In dit voorbeeld wordt één URL verzonden om te verwerken via de gegevensstroompijplijn. Als u meer dan één invoerwaarde via een pijplijn verzendt, kunt u een vorm van parallelle uitvoering in uw toepassing introduceren die lijkt op hoe onderdelen zich door een autofabriek kunnen verplaatsen. Wanneer het eerste lid van de pijplijn het resultaat naar het tweede lid verzendt, kan het een ander item parallel verwerken terwijl het tweede lid het eerste resultaat verwerkt.

De parallelle uitvoering die wordt bereikt door het gebruik van gegevensstroompijplijnen wordt grof parallellisme genoemd, omdat deze doorgaans uit minder, grotere taken bestaat. U kunt ook een meer verfijnde parallelle uitvoering van kleinere, kortlopende taken in een gegevensstroompijplijn gebruiken. In dit voorbeeld gebruikt het findReversedWords lid van de pijplijn PLINQ om meerdere items in de werklijst parallel te verwerken. Het gebruik van fijnmazig parallellisme in een grofkorrelige pijplijn kan de algehele doorvoer verbeteren.

U kunt ook een brongegevensstroomblok verbinden met meerdere doelblokken om een gegevensstroomnetwerk te maken. De overbelaste versie van de LinkTo methode gebruikt een Predicate<T> -object dat bepaalt of het doelblok elk bericht accepteert op basis van de waarde. De meeste typen gegevensstroomblokken die als bron fungeren, bieden berichten aan alle verbonden doelblokken aan, in de volgorde waarin ze zijn verbonden, totdat een van de blokken dat bericht accepteert. Met behulp van dit filtermechanisme kunt u systemen van verbonden gegevensstroomblokken maken die bepaalde gegevens via het ene pad en andere gegevens via een ander pad leiden. Zie Walkthrough: Using Dataflow in a Windows Forms Application (Stapsgewijze instructies: Gegevensstroom gebruiken in een Windows Forms-toepassing) voor een voorbeeld waarin filteren wordt gebruikt om een gegevensstroomnetwerk te maken.

Zie ook