Procedimiento Implementación de un modelo de flujo de datos productor-consumidorHow to: Implement a Producer-Consumer Dataflow Pattern

Este documento describe cómo utilizar la biblioteca de flujos de datos TPL para implementar un modelo productor-consumidor.This document describes how to use the TPL Dataflow Library to implement a producer-consumer pattern. En este modelo, el productor envía mensajes a un bloque de mensajes y el consumidor lee los mensajes de este bloque.In this pattern, the producer sends messages to a message block, and the consumer reads messages from that block.

Nota

La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET.The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow.To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.Alternatively, to install it using the .Net Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

EjemploExample

En el ejemplo siguiente se muestra un modelo productor-consumidor básico que usa el flujo de datos.The following example demonstrates a basic producer- consumer model that uses dataflow. El método Produce escribe matrices que contienen bytes de datos aleatorios en un objeto System.Threading.Tasks.Dataflow.ITargetBlock<TInput> y el método Consume lee los bytes de un objeto System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>.The Produce method writes arrays that contain random bytes of data to a System.Threading.Tasks.Dataflow.ITargetBlock<TInput> object and the Consume method reads bytes from a System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> object. Al actuar en las interfaces ISourceBlock<TOutput> y ITargetBlock<TInput>, en lugar de en sus tipos derivados, puede escribir código reutilizable que puede actuar en una variedad de tipos de bloques de flujo de datos.By acting on the ISourceBlock<TOutput> and ITargetBlock<TInput> interfaces, instead of their derived types, you can write reusable code that can act on a variety of dataflow block types. Este ejemplo utiliza la clase BufferBlock<T>.This example uses the BufferBlock<T> class. Puesto que la clase BufferBlock<T> actúa como origen y como un bloque de origen y destino, el productor y el consumidor pueden utilizar un objeto compartido para transferir datos.Because the BufferBlock<T> class acts as both a source block and as a target block, the producer and the consumer can use a shared object to transfer data.

El método Produce llama al método Post en un bucle para escribir datos de forma sincrónica en el bloque de destino.The Produce method calls the Post method in a loop to synchronously write data to the target block. Después de que el método Produce escriba todos los datos en el bloque de destino, llama al método Complete para indicar que el bloque nunca tendrá datos adicionales disponibles.After the Produce method writes all data to the target block, it calls the Complete method to indicate that the block will never have additional data available. El método Consume usa los operadores async y await (Async y Await en Visual Basic) para calcular de forma asincrónica el número total de bytes recibidos del objeto ISourceBlock<TOutput>.The Consume method uses the async and await operators (Async and Await in Visual Basic) to asynchronously compute the total number of bytes that are received from the ISourceBlock<TOutput> object. Para que actúe de forma asincrónica, el método Consume llama al método OutputAvailableAsync para recibir una notificación cuando el bloque de origen tiene datos disponibles y cuando el bloque de origen nunca va a tener datos adicionales disponibles.To act asynchronously, the Consume method calls the OutputAvailableAsync method to receive a notification when the source block has data available and when the source block will never have additional data available.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a basic producer and consumer pattern that uses dataflow.
class DataflowProducerConsumer
{
   // Demonstrates the production end of the producer and consumer pattern.
   static void Produce(ITargetBlock<byte[]> target)
   {
      // Create a Random object to generate random data.
      Random rand = new Random();

      // In a loop, fill a buffer with random data and
      // post the buffer to the target block.
      for (int i = 0; i < 100; i++)
      {
         // Create an array to hold random byte data.
         byte[] buffer = new byte[1024];

         // Fill the buffer with random bytes.
         rand.NextBytes(buffer);

         // Post the result to the message block.
         target.Post(buffer);
      }

      // Set the target to the completed state to signal to the consumer
      // that no more data will be available.
      target.Complete();
   }

   // Demonstrates the consumption end of the producer and consumer pattern.
   static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
   {
      // Initialize a counter to track the number of bytes that are processed.
      int bytesProcessed = 0;

      // Read from the source buffer until the source buffer has no 
      // available output data.
      while (await source.OutputAvailableAsync())
      {
         byte[] data = source.Receive();

         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }

      return bytesProcessed;
   }

   static void Main(string[] args)
   {
      // Create a BufferBlock<byte[]> object. This object serves as the 
      // target block for the producer and the source block for the consumer.
      var buffer = new BufferBlock<byte[]>();

      // Start the consumer. The Consume method runs asynchronously. 
      var consumer = ConsumeAsync(buffer);

      // Post source data to the dataflow block.
      Produce(buffer);

      // Wait for the consumer to process all data.
      consumer.Wait();

      // Print the count of bytes processed to the console.
      Console.WriteLine("Processed {0} bytes.", consumer.Result);
   }
}

/* Output:
Processed 102400 bytes.
*/
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates a basic producer and consumer pattern that uses dataflow.
Friend Class DataflowProducerConsumer
   ' Demonstrates the production end of the producer and consumer pattern.
   Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
      ' Create a Random object to generate random data.
      Dim rand As New Random()

      ' In a loop, fill a buffer with random data and
      ' post the buffer to the target block.
      For i As Integer = 0 To 99
         ' Create an array to hold random byte data.
         Dim buffer(1023) As Byte

         ' Fill the buffer with random bytes.
         rand.NextBytes(buffer)

         ' Post the result to the message block.
         target.Post(buffer)
      Next i

      ' Set the target to the completed state to signal to the consumer
      ' that no more data will be available.
      target.Complete()
   End Sub

   ' Demonstrates the consumption end of the producer and consumer pattern.
   Private Shared async Function ConsumeAsync(ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
      ' Initialize a counter to track the number of bytes that are processed.
      Dim bytesProcessed As Integer = 0

      ' Read from the source buffer until the source buffer has no 
      ' available output data.
      Do While await source.OutputAvailableAsync()
         Dim data() As Byte = source.Receive()

         ' Increment the count of bytes received.
         bytesProcessed += data.Length
      Loop

      Return bytesProcessed
   End Function

   Shared Sub Main(ByVal args() As String)
      ' Create a BufferBlock<byte[]> object. This object serves as the 
      ' target block for the producer and the source block for the consumer.
      Dim buffer = New BufferBlock(Of Byte())()

      ' Start the consumer. The Consume method runs asynchronously. 
      Dim consumer = ConsumeAsync(buffer)

      ' Post source data to the dataflow block.
      Produce(buffer)

      ' Wait for the consumer to process all data.
      consumer.Wait()

      ' Print the count of bytes processed to the console.
      Console.WriteLine("Processed {0} bytes.", consumer.Result)
   End Sub
End Class

' Output:
'Processed 102400 bytes.
'

Programación sólidaRobust Programming

En el ejemplo anterior se usa un solo consumidor para procesar los datos de origen.The preceding example uses just one consumer to process the source data. Si tiene varios consumidores en la aplicación, use el método TryReceive para leer datos desde el bloque de origen, como se muestra en el siguiente ejemplo.If you have multiple consumers in your application, use the TryReceive method to read data from the source block, as shown in the following example.

// Demonstrates the consumption end of the producer and consumer pattern.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
   // Initialize a counter to track the number of bytes that are processed.
   int bytesProcessed = 0;

   // Read from the source buffer until the source buffer has no 
   // available output data.
   while (await source.OutputAvailableAsync())
   {
      byte[] data;
      while (source.TryReceive(out data))
      {
         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }
   }

   return bytesProcessed;
}
' Demonstrates the consumption end of the producer and consumer pattern.
Private Shared async Function ConsumeAsync(ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
   ' Initialize a counter to track the number of bytes that are processed.
   Dim bytesProcessed As Integer = 0

   ' Read from the source buffer until the source buffer has no 
   ' available output data.
   Do While await source.OutputAvailableAsync()
      Dim data() As Byte
      Do While source.TryReceive(data)
         ' Increment the count of bytes received.
         bytesProcessed += data.Length
      Loop
   Loop

   Return bytesProcessed
End Function

El método TryReceive devuelve False cuando no hay datos disponibles.The TryReceive method returns False when no data is available. Cuando varios consumidores deben tener acceso simultáneamente al bloque de origen, este mecanismo garantiza que los datos están disponibles después de la llamada a OutputAvailableAsync.When multiple consumers must access the source block concurrently, this mechanism guarantees that data is still available after the call to OutputAvailableAsync.

Vea tambiénSee also