Procedura: implementare un modello di flusso di dati producer-consumerHow to: Implement a Producer-Consumer Dataflow Pattern

In questo documento viene descritto come usare la libreria del flusso di dati TPL per implementare un modello producer-consumer.This document describes how to use the TPL Dataflow Library to implement a producer-consumer pattern. In questo modello, il producer invia messaggi a un blocco di messaggi e il consumer legge i messaggi dal blocco.In this pattern, the producer sends messages to a message block, and the consumer reads messages from that block.

Nota

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET.The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow.To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. In alternativa, per installarlo usando l'interfaccia della riga di comando di .Net Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.Alternatively, to install it using the .Net Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

EsempioExample

Nell'esempio seguente viene illustrato un modello di base producer-consumer in cui viene utilizzato il flusso di dati.The following example demonstrates a basic producer- consumer model that uses dataflow. Tramite il metodo Produce vengono scritte matrici contenenti byte casuali di dati in un oggetto System.Threading.Tasks.Dataflow.ITargetBlock<TInput> e tramite il metodo Consume vengono letti i byte da un oggetto System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.The Produce method writes arrays that contain random bytes of data to a System.Threading.Tasks.Dataflow.ITargetBlock<TInput> object and the Consume method reads bytes from a System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> object. Agendo sulle interfacce ISourceBlock<TOutput> e ITargetBlock<TInput>, anziché sui relativi tipi derivati, è possibile scrivere codice riutilizzabile che può agire su diversi tipi di blocchi di flussi di dati.By acting on the ISourceBlock<TOutput> and ITargetBlock<TInput> interfaces, instead of their derived types, you can write reusable code that can act on a variety of dataflow block types. Nell'esempio viene utilizzata la classe BufferBlock<T>.This example uses the BufferBlock<T> class. Poiché la classe BufferBlock<T> viene utilizzata sia come blocco di origine sia come blocco di destinazione, il producer e il consumer possono utilizzare un oggetto condiviso per il trasferimento dei dati.Because the BufferBlock<T> class acts as both a source block and as a target block, the producer and the consumer can use a shared object to transfer data.

Tramite il metodo Produce viene chiamato il metodo Post in un ciclo per scrivere i dati in modo sincrono nel blocco di destinazione.The Produce method calls the Post method in a loop to synchronously write data to the target block. Dopo che tramite il metodo Produce vengono scritti tutti i dati nel blocco di destinazione, viene chiamato il metodo Complete per indicare che nel blocco non saranno mai presenti dati aggiuntivi disponibili.After the Produce method writes all data to the target block, it calls the Complete method to indicate that the block will never have additional data available. Il metodo Consume usa gli operatori async e await (Async e Await in Visual Basic) per calcolare in modo asincrono il numero totale di byte ricevuti dall'oggetto ISourceBlock<TOutput>.The Consume method uses the async and await operators (Async and Await in Visual Basic) to asynchronously compute the total number of bytes that are received from the ISourceBlock<TOutput> object. Per agire in modo asincrono, tramite il metodo Consume viene chiamato il metodo OutputAvailableAsync per ricevere una notifica quando nel blocco di origine vi sono dati disponibili e quando non vi saranno mai dati aggiuntivi disponibili.To act asynchronously, the Consume method calls the OutputAvailableAsync method to receive a notification when the source block has data available and when the source block will never have additional data available.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a basic producer and consumer pattern that uses dataflow.
class DataflowProducerConsumer
{
   // Demonstrates the production end of the producer and consumer pattern.
   static void Produce(ITargetBlock<byte[]> target)
   {
      // Create a Random object to generate random data.
      Random rand = new Random();

      // In a loop, fill a buffer with random data and
      // post the buffer to the target block.
      for (int i = 0; i < 100; i++)
      {
         // Create an array to hold random byte data.
         byte[] buffer = new byte[1024];

         // Fill the buffer with random bytes.
         rand.NextBytes(buffer);

         // Post the result to the message block.
         target.Post(buffer);
      }

      // Set the target to the completed state to signal to the consumer
      // that no more data will be available.
      target.Complete();
   }

   // Demonstrates the consumption end of the producer and consumer pattern.
   static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
   {
      // Initialize a counter to track the number of bytes that are processed.
      int bytesProcessed = 0;

      // Read from the source buffer until the source buffer has no 
      // available output data.
      while (await source.OutputAvailableAsync())
      {
         byte[] data = source.Receive();

         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }

      return bytesProcessed;
   }

   static void Main(string[] args)
   {
      // Create a BufferBlock<byte[]> object. This object serves as the 
      // target block for the producer and the source block for the consumer.
      var buffer = new BufferBlock<byte[]>();

      // Start the consumer. The Consume method runs asynchronously. 
      var consumer = ConsumeAsync(buffer);

      // Post source data to the dataflow block.
      Produce(buffer);

      // Wait for the consumer to process all data.
      consumer.Wait();

      // Print the count of bytes processed to the console.
      Console.WriteLine("Processed {0} bytes.", consumer.Result);
   }
}

/* Output:
Processed 102400 bytes.
*/
Imports System
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a basic producer and consumer pattern that uses dataflow.
Friend Class DataflowProducerConsumer
   ' Demonstrates the production end of the producer and consumer pattern.
   Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
      ' Create a Random object to generate random data.
      Dim rand As New Random()

      ' In a loop, fill a buffer with random data and
      ' post the buffer to the target block.
      For i As Integer = 0 To 99
         ' Create an array to hold random byte data.
         Dim buffer(1023) As Byte

         ' Fill the buffer with random bytes.
         rand.NextBytes(buffer)

         ' Post the result to the message block.
         target.Post(buffer)
      Next i

      ' Set the target to the completed state to signal to the consumer
      ' that no more data will be available.
      target.Complete()
   End Sub

   ' Demonstrates the consumption end of the producer and consumer pattern.
   Private Shared async Function ConsumeAsync(ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
      ' Initialize a counter to track the number of bytes that are processed.
      Dim bytesProcessed As Integer = 0

      ' Read from the source buffer until the source buffer has no 
      ' available output data.
      Do While await source.OutputAvailableAsync()
         Dim data() As Byte = source.Receive()

         ' Increment the count of bytes received.
         bytesProcessed += data.Length
      Loop

      Return bytesProcessed
   End Function

   Shared Sub Main(ByVal args() As String)
      ' Create a BufferBlock<byte[]> object. This object serves as the 
      ' target block for the producer and the source block for the consumer.
      Dim buffer = New BufferBlock(Of Byte())()

      ' Start the consumer. The Consume method runs asynchronously. 
      Dim consumer = ConsumeAsync(buffer)

      ' Post source data to the dataflow block.
      Produce(buffer)

      ' Wait for the consumer to process all data.
      consumer.Wait()

      ' Print the count of bytes processed to the console.
      Console.WriteLine("Processed {0} bytes.", consumer.Result)
   End Sub
End Class

' Output:
'Processed 102400 bytes.
'

Compilazione del codiceCompiling the Code

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollarlo in un file denominato DataflowProducerConsumer.cs (DataflowProducerConsumer.vb per Visual Basic) e quindi eseguire il comando riportato di seguito in una finestra del prompt dei comandi di Visual Studio.Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named DataflowProducerConsumer.cs (DataflowProducerConsumer.vb for Visual Basic), and then run the following command in a Visual Studio Command Prompt window.

Visual C#Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowProducerConsumer.cscsc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowProducerConsumer.cs

Visual BasicVisual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowProducerConsumer.vbvbc.exe /r:System.Threading.Tasks.Dataflow.dll DataflowProducerConsumer.vb

Programmazione efficienteRobust Programming

L'esempio precedente usa solo un consumer per elaborare i dati di origine.The preceding example uses just one consumer to process the source data. Se si dispone di più consumer nell'applicazione, utilizzare il metodo TryReceive per leggere i dati dal blocco di origine, come illustrato nell'esempio riportato di seguito.If you have multiple consumers in your application, use the TryReceive method to read data from the source block, as shown in the following example.

// Demonstrates the consumption end of the producer and consumer pattern.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
   // Initialize a counter to track the number of bytes that are processed.
   int bytesProcessed = 0;

   // Read from the source buffer until the source buffer has no 
   // available output data.
   while (await source.OutputAvailableAsync())
   {
      byte[] data;
      while (source.TryReceive(out data))
      {
         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }
   }

   return bytesProcessed;
}
' Demonstrates the consumption end of the producer and consumer pattern.
Private Shared async Function ConsumeAsync(ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
   ' Initialize a counter to track the number of bytes that are processed.
   Dim bytesProcessed As Integer = 0

   ' Read from the source buffer until the source buffer has no 
   ' available output data.
   Do While await source.OutputAvailableAsync()
      Dim data() As Byte
      Do While source.TryReceive(data)
         ' Increment the count of bytes received.
         bytesProcessed += data.Length
      Loop
   Loop

   Return bytesProcessed
End Function

Tramite il metodo TryReceive viene restituito False quando non vi sono dati disponibili.The TryReceive method returns False when no data is available. Quando più consumer devono accedere al blocco di origine contemporaneamente, questo meccanismo garantisce che i dati sono sempre disponibili dopo la chiamata a OutputAvailableAsync.When multiple consumers must access the source block concurrently, this mechanism guarantees that data is still available after the call to OutputAvailableAsync.

Vedere ancheSee Also

Flusso di datiDataflow