Flujo de datos (biblioteca TPL)

La biblioteca TPL (Task Parallel Library, biblioteca de procesamiento paralelo basado en tareas) proporciona componentes de flujo de datos que ayudan a aumentar la solidez de aplicaciones habilitadas para simultaneidad. Se conoce colectivamente a estos componentes de flujo de datos como biblioteca TDF (biblioteca de TPL Dataflow) pero nos referiremos descriptivamente a ella como "biblioteca de flujos de datos TPL". Este modelo de flujo de datos promueve una programación basada en actores mediante el paso de mensajes en proceso para tareas de canalización y de flujo de datos de grano grueso. Los componentes de flujo de datos se basan en los tipos y la infraestructura de programación de la biblioteca TPL y se integran con la compatibilidad de los lenguajes C#, Visual Basic y F# para proporcionar programación asincrónica. Estos componentes de flujo de datos son útiles cuando se tienen varias operaciones que deben comunicarse entre sí de forma asincrónica, o cuando se desea procesar datos a medida que estén disponibles. Por ejemplo, piense en una aplicación que procesa datos de imagen de una cámara web. Con el modelo de flujo de datos, la aplicación puede procesar fotogramas de imagen a medida que estén disponibles. Si la aplicación mejora fotogramas de imagen, por ejemplo, corrigiendo la luz o reduciendo ojos rojos, puede crear una canalización de los componentes de flujo de datos. Cada fase de la canalización puede utilizar más funcionalidad de paralelismo de grano grueso, como la funcionalidad proporcionada por la biblioteca TPL, para transformar la imagen.

En este documento se proporciona información general sobre la biblioteca de flujos de datos TPL. Se describe el modelo de programación, los tipos de bloques de flujo de datos predefinidos y cómo configurar bloques de flujo de datos para satisfacer las necesidades específicas de las aplicaciones.

Nota

La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.

Modelo de programación

La biblioteca de flujos de datos TPL proporciona una base para el paso de mensajes y para paralelizar aplicaciones que consumen mucha CPU, así como aplicaciones intensivas de entrada y salida con alto rendimiento y latencia baja. También ofrece el control explícito sobre cómo almacenar los datos en búfer y desplazarlos alrededor del sistema. Para entender mejor el modelo de programación de flujo de datos, piense en una aplicación que de forma asincrónica carga imágenes desde el disco y crea un compuesto de esas imágenes. Los modelos de programación tradicionales suelen usar devoluciones de llamada y objetos de sincronización, como bloqueos, para coordinar tareas y tener acceso a datos compartidos. Con el modelo de programación de flujo de datos, puede crear objetos de flujo de datos que procesan las imágenes como se leen desde el disco. Bajo el modelo de flujo de datos, se declara cómo se controlan los datos cuando están disponibles, así como las dependencias entre datos. Dado que el runtime administra las dependencias entre datos, se puede evitar a menudo la necesidad de sincronizar el acceso a los datos compartidos. Además, dado que el runtime programa el trabajo según la llegada asincrónica de datos, el flujo de datos puede mejorar la capacidad de respuesta y el nivel de rendimiento administrando de forma eficaz los subprocesos subyacentes. Para ver un ejemplo en donde se usa el modelo de programación de flujo de datos para implementar procesamiento de imágenes en una aplicación de Windows Forms, vea Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms.

Orígenes y destinos

La biblioteca de flujos de datos TPL consta de bloques de flujo de datos, que son estructuras de datos que almacenan datos en búfer y procesan datos. La biblioteca TPL define tres tipos de bloques de flujos de datos: bloques de origen, bloques de destino y bloques propagadores. Un bloque de origen actúa como un origen de datos y se puede leer desde él. Un bloque de destino actúa como un receptor de datos y se puede escribir en él. Un bloque propagador actúa como un bloque de origen y un bloque de destino, y se puede leer desde él y escribir en él. La biblioteca TPL define la interfaz System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> para representar orígenes, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> para representar destinos y System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> para representar propagadores. IPropagatorBlock<TInput,TOutput> hereda de ISourceBlock<TOutput> y de ITargetBlock<TInput>.

La biblioteca de flujos de datos TPL proporciona varios tipos de bloques de flujo de datos predefinidos que implementan las interfaces ISourceBlock<TOutput>, ITargetBlock<TInput> y IPropagatorBlock<TInput,TOutput>. Estos tipos de bloques de flujo de datos se describen en este documento en la sección Tipos de bloques de flujo de datos predefinidos.

Conectar bloques

Puede conectar los bloques de flujo de datos para establecer canalizaciones, que son secuencias lineales de bloques de flujo de datos, o redes, que son gráficos de bloques de flujo de datos. Una canalización es una forma de red. En una canalización o red, los orígenes propagan datos de forma asincrónica en destinos a medida que esos datos están disponibles. El método ISourceBlock<TOutput>.LinkTo vincula un bloque de flujo de datos de origen a un bloque de destino. Un origen puede vincularse a cero o más destinos; los destinos se pueden vincular a partir de cero o más orígenes. Puede agregar o quitar bloques de flujo de datos hacia o desde una canalización o red simultáneamente. Los tipos de bloques de flujo de datos predefinidos controlan todos los aspectos de la seguridad para subprocesos de vinculación y desvinculación.

Para ver un ejemplo en donde se conectan bloques de flujo de datos para formar una canalización básica, vea Tutorial: Creación de una canalización de flujos de datos. Para ver un ejemplo en donde se conectan bloques de flujo de datos para formar una red más compleja, vea Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms. Para obtener un ejemplo en donde un destino se desvincula de un origen después de que el origen le ofrezca un mensaje, vea Procedimiento: Desvinculación de bloques de flujo de datos.

Filtrado

Cuando se llama al método ISourceBlock<TOutput>.LinkTo para vincular un origen a un destino, puede proporcionar un delegado que determina si el bloque de destino acepta o rechaza un mensaje basado en el valor de ese mensaje. Este mecanismo de filtrado resulta útil para garantizar que un bloque de flujo de datos recibe solo ciertos valores. Para la mayoría de los tipos de bloques de flujo de datos predefinidos, si un bloque de origen está conectado a varios bloques de destino, cuando un bloque de destino rechaza un mensaje, el origen proporciona ese mensaje al destino siguiente. El orden en el que un origen proporciona mensajes a los destinos se define mediante el origen y puede variar según el tipo de origen. La mayoría de los tipos de bloques de origen dejan de proporcionar un mensaje después de que un destino acepta ese mensaje. Una excepción a esta regla es la clase BroadcastBlock<T>, que proporciona cada mensaje a todos los destinos, aunque algunos destinos rechacen el mensaje. Para ver un ejemplo en donde se usa el filtrado para procesar únicamente determinados mensajes, consulte Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms.

Importante

Dado que cada tipo de bloque de flujo de datos de origen predefinido garantiza que los mensajes se propaguen en el orden en que se reciben, se debe leer cada mensaje desde el bloque de origen antes de que el bloque de origen pueda procesar el mensaje siguiente. Por consiguiente, si usa el filtrado para conectarse varios destinos a un origen, asegúrese de que al menos un bloque de destino recibe cada mensaje. De lo contrario, la aplicación podría generar un interbloqueo.

Paso de mensajes

El modelo de programación basado en el flujo de datos está relacionado con el concepto paso de mensajes, donde los componentes independientes de un programa comunican entre sí enviándose mensajes. Una manera de propagar mensajes entre componentes de la aplicación es llamar a los métodos Post (sincrónico) y SendAsync (asincrónico) para enviar mensajes a los bloques de flujo de datos de destino, y a los métodos Receive, ReceiveAsync y TryReceive para recibir mensajes de los bloques de origen. Puede combinar estos métodos con las canalizaciones o redes de flujo de datos enviando datos de entrada al nodo principal (un bloque de destino) y recibiendo datos de salida del nodo terminal de la canalización o de los nodos terminales de la red (uno o varios bloques de origen). También puede utilizar el método Choose para leer desde el primero de los orígenes proporcionados siempre que tenga datos disponibles y realice acciones sobre esos datos.

Los bloques de origen proporcionan datos a los bloques de destino llamando al método ITargetBlock<TInput>.OfferMessage. El bloque de destino responde a un mensaje proporcionado de una de estas tres maneras: puede aceptar el mensaje, rechazar el mensaje o posponer el mensaje. Cuando el destino acepta el mensaje, el método OfferMessage devuelve Accepted. Cuando el destino rechaza el mensaje, el método OfferMessage devuelve Declined. Cuando el destino requiere que ya no recibe ningún mensaje del origen, OfferMessage devuelve DecliningPermanently. Los tipos de bloques de origen predefinidos no proporcionan mensajes a los destinos vinculados después de recibir este valor devuelto, y automáticamente se desvinculan de estos destinos.

Cuando un bloque de destino pospone el mensaje para su uso posterior, el método OfferMessage devuelve Postponed. Un bloque de destino que pospone un mensaje puede llamar posteriormente al método ISourceBlock<TOutput>.ReserveMessage para tratar de reservar el mensaje proporcionado. En este punto, el mensaje todavía permanece disponible y lo puede usar el bloque de destino, o puede que otro destino haya tomado el mensaje. Cuando el bloque de destino requiere el mensaje posteriormente o cuando ya no necesita el mensaje, llama al método ISourceBlock<TOutput>.ConsumeMessage o ReleaseReservation, respectivamente. La reserva de mensajes la utilizan normalmente los tipos de bloques de flujo de datos que trabajan en modo no expansivo. El modo no expansivo se explica más adelante en este documento. En lugar de reservar un mensaje pospuesto, un bloque de destino puede utilizar el método ISourceBlock<TOutput>.ConsumeMessage para intentar utilizar directamente el mensaje pospuesto.

Finalización del bloque de flujo de datos

Los bloques de flujo de datos también admiten el concepto de finalización. Un bloque de flujo de datos que está en estado completado no realiza ningún trabajo posterior. Cada bloque de flujo de datos tiene un objeto System.Threading.Tasks.Task asociado, conocido como tarea de finalización, que representa el estado de finalización del bloque. Dado que para finalizar se puede esperar un objeto Task, mediante tareas de finalización, para finalizar se pueden esperar uno o más nodos terminales de una red de flujo de datos. La interfaz IDataflowBlock define el método Complete, que informa al bloque de flujo de datos de una solicitud para que se complete, y la propiedad Completion, que devuelve la tarea de finalización para el bloque de flujo de datos. Tanto ISourceBlock<TOutput> como ITargetBlock<TInput> heredan de la interfaz IDataflowBlock.

Hay dos maneras de determinar si un bloque de flujo de datos se completó sin error, encontró uno o más errores, o se canceló. La primera manera consiste en llamar al método Task.Wait en la tarea de finalización en un bloque try-catch (Try-Catch en Visual Basic). En el siguiente ejemplo se crea un objeto ActionBlock<TInput> que produce ArgumentOutOfRangeException si su valor de entrada es menor que cero. AggregateException se produce cuando este ejemplo llama a Wait en la tarea de finalización. Se obtiene acceso a ArgumentOutOfRangeException mediante la propiedad InnerExceptions del objeto AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

En este ejemplo se muestra el caso en el que una excepción no está controlada en el delegado de un bloque de flujo de datos de ejecución. Se recomienda controlar las excepciones en los cuerpos de estos bloques. Sin embargo, si no puede hacerlo, el bloque se comporta como si estuviera cancelado y no procesa los mensajes entrantes.

Cuando un bloque de flujo de datos se cancela explícitamente, el objeto AggregateException contiene OperationCanceledException en la propiedad InnerExceptions. Para obtener más información sobre la cancelación del flujo de datos, vea la sección Habilitar la cancelación.

La segunda manera de determinar el estado de finalización de un bloque de flujo de datos es usar una continuación de la tarea de finalización o utilizar las características de lenguaje asincrónicas de C# y Visual Basic para esperar a la tarea de finalización de forma asincrónica. El delegado que se proporciona al método Task.ContinueWith toma un objeto Task que representa la tarea anterior. En el caso de la propiedad Completion, el delegado de continuación toma la propia tarea de finalización. El siguiente ejemplo se parece el anterior, con la salvedad de que también utiliza el método ContinueWith para crear una tarea de continuación que imprime el estado de la operación total de flujo de datos.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

También puede usar propiedades como IsCanceled en el cuerpo de la tarea de continuación para determinar información adicional sobre el estado de finalización de un bloque de flujo de datos. Para obtener más información sobre las tareas de continuación y cómo se relacionan con la cancelación y el control de errores, vea Encadenar tareas mediante tareas de continuación, Cancelación de tareas y Control de excepciones.

Tipos de bloques de flujo de datos predefinidos

La biblioteca de flujos de datos TPL proporciona varios tipos de bloques de flujo de datos predefinidos. Estos tipos se dividen en tres categorías: bloques de almacenamiento en búfer, bloques de ejecución y bloques de agrupación. En las secciones siguientes se describen los tipos de bloques que componen estas categorías.

Bloques de almacenamiento en búfer

Los bloques de almacenamiento en búfer contiene datos para su uso por los consumidores de datos. La biblioteca de flujos de datos TPL proporciona tres tipos de bloques de almacenamiento en búfer: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T> y System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

La clase BufferBlock<T> representa una estructura de mensajería asincrónica de uso general. Esta clase almacena una cola FIFO (primero en entrar, primero en salir) de mensajes donde varios orígenes pueden escribir o de los que varios destinos pueden leer. Cuando un destino recibe un mensaje de un objeto BufferBlock<T>, ese mensaje se quita de la cola de mensajes. Por tanto, aunque un objeto BufferBlock<T> puede tener varios destinos, solo uno recibirá cada mensaje. La clase BufferBlock<T> resulta útil si desea pasar varios mensajes a otro componente, y ese componente debe recibir cada mensaje.

En el siguiente ejemplo básico se exponen varios valores Int32 a un objeto BufferBlock<T> y después se leen esos valores desde ese objeto.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Para obtener un ejemplo completo en donde se muestra cómo escribir y leer mensajes desde un objeto BufferBlock<T>, vea Procedimiento: Escritura y lectura de mensajes en un bloque de flujo de datos.

BroadcastBlock<T>

La clase BroadcastBlock<T> resulta útil si debe pasar varios mensajes a otro componente, pero este componente solo necesita el valor más reciente. Esta clase también resulta útil si desea difundir un mensaje a varios componentes.

En el siguiente ejemplo básico se publica un valor Double a un objeto BroadcastBlock<T> y después se lee ese valor desde el objeto varias veces. Dado que los valores no se quitan de los objetos BroadcastBlock<T> después de leerlos, el mismo valor está disponible cada vez.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Para obtener un ejemplo completo en donde se muestra cómo utilizar BroadcastBlock<T> para difundir un mensaje a varios bloques de destino, vea Procedimiento: Especificación de un Programador de tareas en un bloque de flujo de datos.

WriteOnceBlock<T>

La clase WriteOnceBlock<T> se asemeja a la clase BroadcastBlock<T>, salvo que un objeto WriteOnceBlock<T> se puede escribir una sola una vez. Puede pensar que WriteOnceBlock<T> es similar a la palabra clave de C# readonly (Readonly en Visual Basic), salvo que un objeto WriteOnceBlock<T> se vuelve inalterable después de recibir un valor en lugar de una construcción. Al igual que la clase BroadcastBlock<T>, cuando un destino recibe un mensaje de un objeto WriteOnceBlock<T>, el mensaje no se quita de dicho objeto. Por tanto, varios destinos reciben una copia del mensaje. La clase WriteOnceBlock<T> es útil si desea difundir solamente el primero de varios de mensajes.

En el siguiente ejemplo básico se exponen varios valores String a un objeto WriteOnceBlock<T> y después se lee ese valor desde el objeto. Dado que un objeto WriteOnceBlock<T> solo se puede escribir una vez, a partir de que un objeto WriteOnceBlock<T> recibe un mensaje, ese objeto descarta los mensajes subsiguientes.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Para ver un ejemplo completo en donde se muestra cómo utilizar WriteOnceBlock<T> para recibir el valor de la primera operación que finaliza, consulte Procedimiento: Desvinculación de bloques de flujo de datos.

Bloques de ejecución

Los bloques de ejecución llaman a un delegado proporcionado por el usuario para cada fragmento de datos recibidos. La biblioteca de flujos de datos TPL proporciona tres tipos de bloques de ejecución: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> y System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

La clase ActionBlock<TInput> es un bloque de destino que llama a un delegado cuando recibe datos. Piense en un objeto ActionBlock<TInput> como un delegado que se ejecuta de forma asincrónica cuando los datos están disponibles. El delegado que se proporciona a un objeto ActionBlock<TInput> puede ser de tipo Action<T> o tipo System.Func<TInput, Task>. Si se utiliza un objeto ActionBlock<TInput> con Action<T>, se considera que el procesamiento de cada elemento de entrada se ha completado cuando devuelve el delegado. Si se utiliza un objeto ActionBlock<TInput> con System.Func<TInput, Task>, se considera que el procesamiento de cada elemento de entrada se ha completado solamente cuando el objeto devuelto Task está completo. Mediante estos dos mecanismos, se puede utilizar ActionBlock<TInput> para el procesamiento sincrónico y asincrónico de cada elemento de entrada.

En el siguiente ejemplo básico se exponen varios valores Int32 a un objeto ActionBlock<TInput>. El objeto ActionBlock<TInput> imprime esos valores en la consola. Después, en este ejemplo se establece el bloque en estado completado y se espera hasta que finalicen todas las tareas de flujo de datos.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Para obtener ejemplos completos en donde se muestra cómo usar delegados con la clase ActionBlock<TInput>, vea Procedimiento: Ejecución de una acción cuando un bloque de flujo de datos recibe datos.

TransformBlock<TInput, TOutput>

La clase TransformBlock<TInput,TOutput> es similar a la clase ActionBlock<TInput>, salvo que actúa como origen y como destino. El delegado que pasa a un objeto TransformBlock<TInput,TOutput> devuelve un valor de tipo TOutput. El delegado que se proporciona a un objeto TransformBlock<TInput,TOutput> puede ser de tipo System.Func<TInput, TOutput> o tipo System.Func<TInput, Task<TOutput>>. Si se utiliza un objeto TransformBlock<TInput,TOutput> con System.Func<TInput, TOutput>, se considera que el procesamiento de cada elemento de entrada se ha completado cuando devuelve el delegado. Si se utiliza un objeto TransformBlock<TInput,TOutput> que se usa con System.Func<TInput, Task<TOutput>>, se considera que el procesamiento de cada elemento de entrada se ha completado solamente cuando el objeto devuelto Task<TResult> está completo. Al igual que sucede con ActionBlock<TInput>, mediante estos dos mecanismos, se puede utilizar TransformBlock<TInput,TOutput> para el procesamiento sincrónico y asincrónico de cada elemento de entrada.

En el siguiente ejemplo básico se crea un objeto TransformBlock<TInput,TOutput> que calcula la raíz cuadrada de la entrada. El objeto TransformBlock<TInput,TOutput> toma valores Int32 como entrada y genera valores Double como salida.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Para obtener ejemplos completos que utilizan TransformBlock<TInput,TOutput> en una red de bloques de flujo de datos que realiza procesamiento de imágenes en una aplicación de Windows Forms, vea Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms.

TransformManyBlock<TInput, TOutput>

La clase TransformManyBlock<TInput,TOutput> es similar a la clase TransformBlock<TInput,TOutput>, salvo que TransformManyBlock<TInput,TOutput> genere cero o más valores de salida por cada valor de entrada, en lugar de generar un solo valor de salida por cada valor de entrada. El delegado que se proporciona a un objeto TransformManyBlock<TInput,TOutput> puede ser de tipo System.Func<TInput, IEnumerable<TOutput>> o tipo System.Func<TInput, Task<IEnumerable<TOutput>>>. Si se utiliza un objeto TransformManyBlock<TInput,TOutput> con System.Func<TInput, IEnumerable<TOutput>>, se considera que el procesamiento de cada elemento de entrada se ha completado cuando devuelve el delegado. Si se utiliza un objeto TransformManyBlock<TInput,TOutput> con System.Func<TInput, Task<IEnumerable<TOutput>>>, se considera que el procesamiento de cada elemento de entrada se ha completado solo cuando el objeto devuelto System.Threading.Tasks.Task<IEnumerable<TOutput>> está completo.

En el siguiente ejemplo básico se crea un objeto TransformManyBlock<TInput,TOutput> que divide las cadenas en sus secuencias de caracteres individuales. El objeto TransformManyBlock<TInput,TOutput> toma valores String como entrada y genera valores Char como salida.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Para obtener ejemplos completos que usan TransformManyBlock<TInput,TOutput> para generar varios resultados independientes para cada entrada en una canalización de flujo de datos, vea Tutorial: Creación de una canalización de flujos de datos.

Grado de paralelismo

Cada uno de los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput> almacena en búfer los mensajes de entrada hasta que el bloque está listo para procesarlos. De forma predeterminada, estas clases procesan los mensajes en el orden en el que se recibieron, un mensaje cada vez. También puede especificar el grado de paralelismo para permitir que los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput> puedan procesar varios mensajes simultáneamente. Para obtener más información sobre la ejecución simultánea, vea la sección Especificar el grado de paralelismo más adelante en este documento. Para obtener un ejemplo en donde se establece el grado de paralelismo que permite que un bloque de flujo de datos de ejecución pueda procesar varios mensajes al mismo tiempo, vea Procedimiento: Especificación del grado de paralelismo en un bloque de flujo de datos.

Resumen de tipos de delegado

En la tabla siguiente se resumen los tipos de delegado que puede proporcionar a los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>. En esta tabla también se especifica si el tipo de delegado funciona de forma sincrónica o asincrónica.

Tipo Tipo de delegado sincrónico Tipo de delegado asincrónico
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

También puede utilizar expresiones lambda cuando trabaja con tipos de bloque de ejecución. Para obtener un ejemplo en donde se muestra cómo usar una expresión lambda con un bloque de ejecución, vea Procedimiento: Ejecución de una acción cuando un bloque de flujo de datos recibe datos.

Bloques de agrupación

Los bloques de agrupación combinan datos de uno o más orígenes y con distintas restricciones. La biblioteca de flujos de datos TPL proporciona tres tipos de bloques de combinación: BatchBlock<T>, JoinBlock<T1,T2> y BatchedJoinBlock<T1,T2>.

BatchBlock<T>

La clase BatchBlock<T> combina conjuntos de datos de entrada, que se conocen como lotes, en las matrices de datos de salida. Especifique el tamaño de cada lote cuando crea un objeto BatchBlock<T>. Cuando el objeto BatchBlock<T> recibe el número especificado de elementos de entrada, propaga de forma asincrónica una matriz que contiene esos elementos. Si un objeto BatchBlock<T> se establece en el estado completado pero no contiene elementos suficientes para formar un lote, propaga una matriz final que contiene los elementos de entrada restantes.

La clase BatchBlock<T> funciona en modo expansivo o no expansivo. En modo expansivo, que es el valor predeterminado, un objeto BatchBlock<T> acepta cada mensaje que se proporciona y propaga una matriz después de recibir el número especificado de elementos. En modo no expansivo, un objeto BatchBlock<T> pospone todos los mensajes entrantes hasta que haya suficientes orígenes que proporcionen mensajes al bloque para formar un lote. Normalmente, el modo expansivo se comporta mejor que el modo no expansivo porque requiere menos sobrecarga de procesamiento. Sin embargo, se puede usar el modo no expansivo cuando se debe coordinar el consumo de varios orígenes en modo atómico. Especifique el modo no expansivo estableciendo Greedy en False en el parámetro dataflowBlockOptions del constructor BatchBlock<T>.

En el siguiente ejemplo básico se exponen varios valores Int32 a un objeto BatchBlock<T> que contiene diez elementos en un lote. Para garantizar que todos los valores se propagan fuera de BatchBlock<T>, este ejemplo llama al método Complete. El método Complete establece el objeto BatchBlock<T> en el estado completado y, por consiguiente, el objeto BatchBlock<T> propaga cualquier elemento restante como un lote final.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Para obtener un ejemplo completo que usa BatchBlock<T> para mejorar la eficacia de las operaciones de inserción de la base de datos, vea Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficacia.

JoinBlock<T1, T2, ...>

Las clases JoinBlock<T1,T2> y JoinBlock<T1,T2,T3> obtienen elementos de entrada y propagan objetos System.Tuple<T1,T2> o System.Tuple<T1,T2,T3> que contienen esos elementos. Las clases JoinBlock<T1,T2> y JoinBlock<T1,T2,T3> no heredan de ITargetBlock<TInput>. En su lugar, proporcionan propiedades, Target1, Target2 y Target3, que implementan ITargetBlock<TInput>.

Al igual que BatchBlock<T>, JoinBlock<T1,T2> y JoinBlock<T1,T2,T3> funcionan en modo expansivo o no expansivo. En modo expansivo, que es el valor predeterminado, un objeto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> acepta cada mensaje que se proporciona y propaga una tupla después de que cada uno de sus destinos reciba por lo menos un mensaje. En modo no expansivo, un objeto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> pospone todos los mensajes entrantes hasta que todos los destinos han proporcionado los datos necesarios para crear una tupla. En este punto, el bloque se involucra en un protocolo de confirmación en dos fases para recuperar atómicamente todos los elementos necesarios de los orígenes. Este aplazamiento permite que, mientras tanto, otra entidad consuma datos, para permitir que el sistema global progrese.

En el siguiente ejemplo básico se muestra un caso en el que un objeto JoinBlock<T1,T2,T3> requiere varios datos para calcular un valor. En este ejemplo se crea un objeto JoinBlock<T1,T2,T3> que requiere dos valores Int32 y un valor Char para realizar una operación aritmética.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Para obtener un ejemplo completo en donde se usan objetos JoinBlock<T1,T2> en modo no expansivo para compartir conjuntamente un recurso, vea Procedimiento: Uso de JoinBlock para leer datos de varios orígenes.

BatchedJoinBlock<T1, T2, ...>

Las clases BatchedJoinBlock<T1,T2> y BatchedJoinBlock<T1,T2,T3> obtienen lotes de elementos de entrada y propagan objetos System.Tuple(IList(T1), IList(T2)) o System.Tuple(IList(T1), IList(T2), IList(T3)) que contienen esos elementos. Piense en BatchedJoinBlock<T1,T2> como una combinación de BatchBlock<T> y JoinBlock<T1,T2>. Especifique el tamaño de cada lote cuando crea un objeto BatchedJoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> también proporciona propiedades, Target1 y Target2, que implementan ITargetBlock<TInput>. Cuando el número especificado de elementos de entrada se recibe a través de todos los destinos, el objeto BatchedJoinBlock<T1,T2> propaga de forma asincrónica un objeto System.Tuple(IList(T1), IList(T2)) que contiene esos elementos.

En el siguiente ejemplo básico se crea un objeto BatchedJoinBlock<T1,T2> que contiene resultados, valores Int32 y errores que son objetos Exception. En este ejemplo se realizan varias operaciones y se escriben los resultados en la propiedad Target1, y los errores en la propiedad Target2, del objeto BatchedJoinBlock<T1,T2>. Dado que el número de operaciones correctas y las que dieron error no se conoce de antemano, los objetos IList<T> permiten que cada destino reciba cero o más valores.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Para obtener un ejemplo completo que usa BatchedJoinBlock<T1,T2> para capturar resultados y cualquier excepción que se produzca mientras el programa lee de una base de datos, vea Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficacia.

Configurar el comportamiento del bloque de flujo de datos

Puede habilitar opciones adicionales si proporciona un objeto System.Threading.Tasks.Dataflow.DataflowBlockOptions al constructor de los tipos de bloques de flujo de datos. Estas opciones controlan el comportamiento, como el del programador que administra la tarea subyacente, y el grado de paralelismo. El objeto DataflowBlockOptions también tiene tipos derivados que especifican el comportamiento específico de ciertos tipos de bloques de flujo de datos. En la tabla siguiente se resumen los tipo de opciones que se asocian a cada tipo de bloques de flujo de datos.

Tipo de bloques de flujo de datos Tipo de DataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Las secciones siguientes proporcionan información adicional sobre las clases importantes de opciones de bloques de flujo de datos que están disponibles a través de las clases System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions y System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Especificar el programador de tareas

Cada bloque de flujo de datos predefinido utiliza el mecanismo de programación de tareas de la biblioteca TPL para realizar actividades como propagar datos a un destino, recibir datos de un origen y ejecutar delegados definido por el usuario si los datos están disponibles. TaskScheduler es una clase abstracta que representa un programador de tareas que pone en cola las tareas en los subprocesos. El programador de tareas predeterminado, Default, utiliza la clase ThreadPool para poner en cola y ejecutar el trabajo. Puede reemplazar el programador de tareas predeterminado estableciendo la propiedad TaskScheduler al crear un objeto de bloque de flujo de datos.

Cuando el mismo programador de tareas administra varios bloques de flujo de datos, puede aplicar directivas entre ellos. Por ejemplo, si cada uno de los bloques de flujo de datos se configuran como destino del programador exclusivo del mismo objeto ConcurrentExclusiveSchedulerPair, todo el trabajo que se ejecuta a través de estos bloques se serializa. De igual forma, si estos bloques se configuran como destino del programador simultáneo del mismo objeto ConcurrentExclusiveSchedulerPair, y el programador se configura para tener un nivel de simultaneidad máximo, todo el trabajo de estos bloques se limita a ese número de operaciones simultáneas. Para obtener un ejemplo en donde se usa la clase ConcurrentExclusiveSchedulerPair para permitir que las operaciones de lectura se produzcan en paralelo, pero las operaciones de escritura sean exclusivas del resto de operaciones, vea Procedimiento: Especificación de un Programador de tareas en un bloque de flujo de datos. Para obtener más información sobre los programadores de tareas en la biblioteca TPL, consulte el tema sobre la clase TaskScheduler.

Especificar el grado de Paralelismo

De forma predeterminada, los tres tipos de bloques de ejecución que la biblioteca de flujos de datos TPL proporciona, ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>, procesan un mensaje al mismo tiempo. Estos tipos de bloques de flujo de datos también procesan mensajes en el orden en que se reciben. Para permitir que estos bloques de flujo de datos procesen mensajes simultáneamente, establezca la propiedad ExecutionDataflowBlockOptions.MaxDegreeOfParallelism cuando construya el objeto de bloques de flujo de datos.

El valor predeterminado de MaxDegreeOfParallelism es 1, que garantiza que el bloque de flujo de datos procesa un mensaje al mismo tiempo. Al establecer esta propiedad en un valor mayor de 1 se permite que el bloque de flujo de datos procese varios mensajes simultáneamente. Al establecer esta propiedad en DataflowBlockOptions.Unbounded se permite que el programador de tareas subyacente administre el grado máximo de simultaneidad.

Importante

Cuando se especifica un grado máximo de paralelismo mayor que 1, varios mensajes se procesan simultáneamente y, por tanto, los mensajes no se pueden procesar en el orden en que se reciben. Pero los mensajes salen del bloque en el mismo orden en que se reciben.

Dado que la propiedad MaxDegreeOfParallelism representa el grado máximo de paralelismo, el bloque de flujo de datos puede ejecutarse con un menor grado de paralelismo que el especificado. El bloque de flujo de datos puede utilizar un menor grado de paralelismo para cumplir los requisitos funcionales o porque hay una falta de recursos del sistema disponibles. Un flujo de datos bloqueado nunca elige más paralelismo que el especificado.

El valor de la propiedad MaxDegreeOfParallelism es exclusivo para cada objeto de bloque de flujo de datos. Por ejemplo, si cuatro objetos de bloques de flujo de datos especifican 1 como el grado máximo de paralelismo, los cuatro objetos de bloques de flujo de datos podrían ejecutarse en paralelo.

Para obtener un ejemplo en donde se establece el grado máximo de paralelismo que permite que se produzcan operaciones largas en paralelo, vea Procedimiento: Especificación del grado de paralelismo en un bloque de flujo de datos.

Especificar el número de mensajes por tarea

Los tipos predefinidos de bloques de flujo de datos utilizan tareas para procesar varios elementos de entrada. Esto ayuda a minimizar el número de objetos de tarea necesarios para procesar datos, lo que permite que las aplicaciones se ejecuten más eficazmente. Sin embargo, cuando las tareas de un conjunto de bloques de flujo de datos están procesando datos, es posible que las tareas de otros bloques de flujo de datos tengan que esperar el tiempo de procesamiento en la cola mensajes. Para permitir una mejor equidad entre tareas de flujo de datos, establezca la propiedad MaxMessagesPerTask. Cuando MaxMessagesPerTask se establece en DataflowBlockOptions.Unbounded, que es el valor predeterminado, la tarea utilizada por un bloque de flujo de datos procesa tantos mensajes como están disponibles. Cuando MaxMessagesPerTask se establece en un valor distinto de Unbounded, el bloque de flujo de datos procesa como máximo este número de mensajes por objeto Task. Aunque al establecer la propiedad MaxMessagesPerTask se puede aumentar la equidad entre tareas, puede provocar que el sistema cree más tareas que las necesarias, lo que puede reducir el rendimiento.

Habilitar la cancelación

La biblioteca TPL proporciona un mecanismo que habilita las tareas para coordinar la cancelación de manera cooperativa. Para permitir que los bloques de flujo de datos puedan participar en este mecanismo de cancelación, establezca la propiedad CancellationToken. Cuando este objeto CancellationToken se establece en el estado cancelado, todos los bloques de flujo de datos que controlan este token finalizan la ejecución de su elemento actual pero no comienzan a procesar los elementos siguientes. Estos bloques de flujo de datos también borran los mensajes almacenados en búfer, conexiones de inicio para los bloques de origen y de destino, y la transición al estado cancelado. Al realizar la transición al estado cancelado, la propiedad Completion tiene la propiedad Status establecida en Canceled, a menos que se produzca una excepción durante el procesamiento. En ese caso, Status se establece en Faulted.

Para obtener un ejemplo en donde se muestra cómo usar la cancelación en una aplicación de Windows Forms, vea Procedimiento: Cómo: Cancelar un bloque de flujos de datos. Para más información sobre la cancelación en la biblioteca TPL, consulte Task Cancellation (Cancelación de tareas).

Especificar el comportamiento expansivo frente al no expansivo

Varios tipos de bloques de flujo de datos de agrupación pueden trabajar en modo expansivo o no expansivo. De forma predeterminada, los tipos de bloques de flujo de datos predefinidos funcionan en modo expansivo.

Para los tipos de bloques de combinación como JoinBlock<T1,T2>, el modo expansivo significa que el bloque acepta datos inmediatamente aunque los correspondientes datos con los que se combinará aún no estén disponibles. El modo no expansivo significa que el bloque pospone todos los mensajes entrantes hasta que uno esté disponible para cada uno de sus destinos para completar la combinación. Si los mensajes pospuestos ya no están disponibles, el bloque de combinación libera todos los mensajes pospuestos y reinicia el proceso. Para la clase BatchBlock<T>, el comportamiento expansivo y no expansivo es similar, salvo que en modo no expansivo, un objeto BatchBlock<T> pospone todos los mensajes entrantes hasta que haya suficientes mensajes disponibles de orígenes distintos para completar un lote.

Para especificar el modo no expansivo para un bloque de flujo de datos, establezca Greedy en False. Para obtener un ejemplo en donde se muestra cómo usar el modo no expansivo para permitir que varios bloques de combinación compartan un origen de datos con mayor eficacia, vea Procedimiento: Uso de JoinBlock para leer datos de varios orígenes.

Bloques de flujo de datos personalizados

Aunque la biblioteca de flujos de datos TPL proporciona muchos tipos de bloques predefinidos, puede crear tipos de bloques adicionales que tengan un comportamiento personalizado. Implemente las interfaces ISourceBlock<TOutput> o ITargetBlock<TInput> directamente o use el método Encapsulate para compilar un bloque complejo que encapsule el comportamiento de los tipos de bloques existentes. Para obtener ejemplos que muestran cómo implementar funcionalidad en bloques de flujo de datos personalizados, vea Tutorial: Creación de tipos de bloques de flujo de datos personalizados.

Title Descripción
Cómo: Escritura y lectura de mensajes en un bloque de flujo de datos Muestra cómo escribir y leer los mensajes de un objeto BufferBlock<T>.
Cómo: Implementación de un modelo de flujo de datos productor-consumidor Describe cómo utilizar el modelo de flujo de datos para implementar un patrón consumidor-productor, cuando el productor envía mensajes a un bloque de flujo de datos y el consumidor lee mensajes de ese bloque.
Cómo: Ejecución de una acción cuando un bloque de flujo de datos recibe datos Describe cómo proporcionar delegados a los tipos de bloques de flujo de datos de ejecución, ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>.
Tutorial: Creación de una canalización de flujos de datos Describe cómo crear una canalización de flujo de datos que descarga texto desde Internet y realiza operaciones en ese texto.
Cómo: Desvinculación de bloques de flujo de datos Muestra cómo utilizar el método LinkTo para desvincular un bloque de destino de su origen después de que el origen proporciona un mensaje al destino.
Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms Muestra cómo crear una red de bloques de flujo de datos que realizan procesamiento de imágenes en una aplicación de Windows Forms.
Cómo: Cancelación de un bloque de flujo de datos Muestra cómo se usa la cancelación en una aplicación de Windows Forms.
Cómo: Uso de JoinBlock para leer datos de diferentes orígenes Explica cómo utilizar la clase JoinBlock<T1,T2> para realizar una operación cuando los datos están disponibles a partir de varios orígenes, y cómo utilizar el modo no expansivo para permitir que varios bloques de combinación puedan compartir un origen de datos más eficazmente.
Cómo: Especificación del grado de paralelismo en un bloque de flujo de datos Describe cómo establecer la propiedad MaxDegreeOfParallelism para permitir que un bloque de flujo de datos de ejecución pueda procesar varios mensajes al mismo tiempo.
Cómo: Especificación de un Programador de tareas en un bloque de flujo de datos Muestra cómo asociar un programador de tareas específico cuando se usa flujo de datos en la aplicación.
Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficacia Describe cómo utilizar la clase BatchBlock<T> para mejorar la eficacia de las operaciones de inserción de la base de datos y cómo utilizar la clase BatchedJoinBlock<T1,T2> para capturar los resultados y cualquier excepción que se produzca mientras el programa lee de una base de datos.
Tutorial: Creación de tipos de bloques de flujo de datos personalizados Muestra dos maneras de crear un tipo de bloque de flujo de datos que implementa un comportamiento personalizado.
Biblioteca TPL Presenta la biblioteca TPL, una biblioteca que simplifica la programación paralela y simultánea en aplicaciones de .NET Framework.