Vorgehensweise: Implementieren eines Producer-Consumer-MustersHow to: Implement a Producer-Consumer Dataflow Pattern

Dieses Dokument beschreibt, wie die TPL-Datenflussbibliothek verwendet wird, um ein Producer-Consumer-Muster zu implementieren.This document describes how to use the TPL Dataflow Library to implement a producer-consumer pattern. Bei diesem Muster sendet der Producer Nachrichten an einen Nachrichtenblock, während der Consumer Nachrichten aus diesem Block ausliest.In this pattern, the producer sends messages to a message block, and the consumer reads messages from that block.

Hinweis

Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert.The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow-Paket.To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow ausführen.Alternatively, to install it using the .NET Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

BeispielExample

Das folgende Beispiel zeigt ein grundlegendes Producer-Consumer-Modell, das Datenfluss verwendet.The following example demonstrates a basic producer- consumer model that uses dataflow. Die Produce-Methode schreibt Arrays, die zufällige Datenbytes enthalten, in ein System.Threading.Tasks.Dataflow.ITargetBlock<TInput>-Objekt, und die Consume-Methode liest Bytes aus einem System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>-Objekt.The Produce method writes arrays that contain random bytes of data to a System.Threading.Tasks.Dataflow.ITargetBlock<TInput> object and the Consume method reads bytes from a System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> object. Da Sie sich mit den Schnittstellen ISourceBlock<TOutput> und ITargetBlock<TInput> befassen, anstatt mit ihren abgeleiteten Typen, können Sie wiederverwendbaren Code schreiben und diesen auf eine Vielzahl von Datenflussblock-Typen anwenden.By acting on the ISourceBlock<TOutput> and ITargetBlock<TInput> interfaces, instead of their derived types, you can write reusable code that can act on a variety of dataflow block types. In diesem Beispiel wird die BufferBlock<T>-Klasse verwendet.This example uses the BufferBlock<T> class. Da die BufferBlock<T>-Klasse sowohl als Quell- als auch als Zielblock fungiert, können der Producer und der Consumer ein freigegebenes Objekt zum Übertragen von Daten verwenden.Because the BufferBlock<T> class acts as both a source block and as a target block, the producer and the consumer can use a shared object to transfer data.

Die Produce-Methode ruft die Post-Methode in einer Schleife auf, um die Daten synchron in den Zielblock zu schreiben.The Produce method calls the Post method in a loop to synchronously write data to the target block. Nachdem die Produce-Methode alle Daten in den Zielblock geschrieben hat, ruft sie die Complete-Methode auf, um anzugeben, dass der Block niemals über zusätzliche Daten verfügen wird.After the Produce method writes all data to the target block, it calls the Complete method to indicate that the block will never have additional data available. Die Consume-Methode verwendet die Operatoren async und await (Async und Await in Visual Basic) zur asynchronen Berechnung der Gesamtzahl von Bytes, die vom ISourceBlock<TOutput>-Objekt empfangen werden.The Consume method uses the async and await operators (Async and Await in Visual Basic) to asynchronously compute the total number of bytes that are received from the ISourceBlock<TOutput> object. Für die Asynchronität ruft die Consume-Methode die OutputAvailableAsync-Methode auf, um eine Benachrichtigung zu erhalten, wenn der Quellblock über Daten verfügt und wenn dem Quellblock niemals zusätzliche Daten zur Verfügung stehen werden.To act asynchronously, the Consume method calls the OutputAvailableAsync method to receive a notification when the source block has data available and when the source block will never have additional data available.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a basic producer and consumer pattern that uses dataflow.
class DataflowProducerConsumer
{
   // Demonstrates the production end of the producer and consumer pattern.
   static void Produce(ITargetBlock<byte[]> target)
   {
      // Create a Random object to generate random data.
      Random rand = new Random();

      // In a loop, fill a buffer with random data and
      // post the buffer to the target block.
      for (int i = 0; i < 100; i++)
      {
         // Create an array to hold random byte data.
         byte[] buffer = new byte[1024];

         // Fill the buffer with random bytes.
         rand.NextBytes(buffer);

         // Post the result to the message block.
         target.Post(buffer);
      }

      // Set the target to the completed state to signal to the consumer
      // that no more data will be available.
      target.Complete();
   }

   // Demonstrates the consumption end of the producer and consumer pattern.
   static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
   {
      // Initialize a counter to track the number of bytes that are processed.
      int bytesProcessed = 0;

      // Read from the source buffer until the source buffer has no
      // available output data.
      while (await source.OutputAvailableAsync())
      {
         byte[] data = source.Receive();

         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }

      return bytesProcessed;
   }

   static void Main(string[] args)
   {
      // Create a BufferBlock<byte[]> object. This object serves as the
      // target block for the producer and the source block for the consumer.
      var buffer = new BufferBlock<byte[]>();

      // Start the consumer. The Consume method runs asynchronously.
      var consumer = ConsumeAsync(buffer);

      // Post source data to the dataflow block.
      Produce(buffer);

      // Wait for the consumer to process all data.
      consumer.Wait();

      // Print the count of bytes processed to the console.
      Console.WriteLine("Processed {0} bytes.", consumer.Result);
   }
}

/* Output:
Processed 102400 bytes.
*/
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a basic producer and consumer pattern that uses dataflow.
Friend Class DataflowProducerConsumer
    ' Demonstrates the production end of the producer and consumer pattern.
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        ' Create a Random object to generate random data.
        Dim rand As New Random()

        ' In a loop, fill a buffer with random data and
        ' post the buffer to the target block.
        For i As Integer = 0 To 99
            ' Create an array to hold random byte data.
            Dim buffer(1023) As Byte

            ' Fill the buffer with random bytes.
            rand.NextBytes(buffer)

            ' Post the result to the message block.
            target.Post(buffer)
        Next i

        ' Set the target to the completed state to signal to the consumer
        ' that no more data will be available.
        target.Complete()
    End Sub

    ' Demonstrates the consumption end of the producer and consumer pattern.
    Private Shared async Function ConsumeAsync(ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        ' Initialize a counter to track the number of bytes that are processed.
        Dim bytesProcessed As Integer = 0

        ' Read from the source buffer until the source buffer has no 
        ' available output data.
        Do While await source.OutputAvailableAsync()
            Dim data() As Byte = source.Receive()

            ' Increment the count of bytes received.
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main(ByVal args() As String)
        ' Create a BufferBlock<byte[]> object. This object serves as the 
        ' target block for the producer and the source block for the consumer.
        Dim buffer = New BufferBlock(Of Byte())()

        ' Start the consumer. The Consume method runs asynchronously. 
        Dim consumer = ConsumeAsync(buffer)

        ' Post source data to the dataflow block.
        Produce(buffer)

        ' Wait for the consumer to process all data.
        consumer.Wait()

        ' Print the count of bytes processed to the console.
        Console.WriteLine("Processed {0} bytes.", consumer.Result)
    End Sub
End Class

' Output:
'Processed 102400 bytes.
'

Stabile ProgrammierungRobust Programming

Im obigen Beispiel wird nur ein Consumer verwendet, um die Quelldaten zu verarbeiten.The preceding example uses just one consumer to process the source data. Wenn Sie in Ihrer Anwendung über mehrere Consumer verfügen, verwenden Sie die TryReceive-Methode, um wie im folgenden Beispiel Daten aus dem Quellblock zu lesen.If you have multiple consumers in your application, use the TryReceive method to read data from the source block, as shown in the following example.

// Demonstrates the consumption end of the producer and consumer pattern.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
   // Initialize a counter to track the number of bytes that are processed.
   int bytesProcessed = 0;

   // Read from the source buffer until the source buffer has no
   // available output data.
   while (await source.OutputAvailableAsync())
   {
      byte[] data;
      while (source.TryReceive(out data))
      {
         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }
   }

   return bytesProcessed;
}
' Demonstrates the consumption end of the producer and consumer pattern.
Private Shared async Function ConsumeAsync(ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    ' Initialize a counter to track the number of bytes that are processed.
    Dim bytesProcessed As Integer = 0

    ' Read from the source buffer until the source buffer has no 
    ' available output data.
    Do While await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            ' Increment the count of bytes received.
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Die TryReceive-Methode gibt False zurück, wenn keine Daten verfügbar sind.The TryReceive method returns False when no data is available. Wenn mehrere Consumer gleichzeitig auf den Quellblock zugreifen müssen, gewährleistet dieser Mechanismus, dass Daten nach dem Aufruf von OutputAvailableAsync weiterhin verfügbar sind.When multiple consumers must access the source block concurrently, this mechanism guarantees that data is still available after the call to OutputAvailableAsync.

Siehe auchSee also