Flujo de datos (biblioteca TPL)Dataflow (Task Parallel Library)

La biblioteca TPL (Task Parallel Library, biblioteca de procesamiento paralelo basado en tareas) proporciona componentes de flujo de datos que ayudan a aumentar la solidez de aplicaciones habilitadas para simultaneidad.The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications. Se conoce colectivamente a estos componentes de flujo de datos como biblioteca TDF (biblioteca de TPL Dataflow) pero nos referiremos descriptivamente a ella como "biblioteca de flujos de datos TPL".These dataflow components are collectively referred to as the TPL Dataflow Library. Este modelo de flujo de datos promueve una programación basada en actores mediante el paso de mensajes en proceso para tareas de canalización y de flujo de datos de grano grueso.This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks. Los componentes de flujo de datos se basan en los tipos y la infraestructura de programación de la biblioteca TPL y se integran con la compatibilidad de los lenguajes C#, Visual Basic y F# para proporcionar programación asincrónica.The dataflow components build on the types and scheduling infrastructure of the TPL and integrate with the C#, Visual Basic, and F# language support for asynchronous programming. Estos componentes de flujo de datos son útiles cuando se tienen varias operaciones que deben comunicarse entre sí de forma asincrónica, o cuando se desea procesar datos a medida que estén disponibles.These dataflow components are useful when you have multiple operations that must communicate with one another asynchronously or when you want to process data as it becomes available. Por ejemplo, piense en una aplicación que procesa datos de imagen de una cámara web.For example, consider an application that processes image data from a web camera. Con el modelo de flujo de datos, la aplicación puede procesar fotogramas de imagen a medida que estén disponibles.By using the dataflow model, the application can process image frames as they become available. Si la aplicación mejora fotogramas de imagen, por ejemplo, corrigiendo la luz o reduciendo ojos rojos, puede crear una canalización de los componentes de flujo de datos.If the application enhances image frames, for example, by performing light correction or red-eye reduction, you can create a pipeline of dataflow components. Cada fase de la canalización puede utilizar más funcionalidad de paralelismo de grano grueso, como la funcionalidad proporcionada por la biblioteca TPL, para transformar la imagen.Each stage of the pipeline might use more coarse-grained parallelism functionality, such as the functionality that is provided by the TPL, to transform the image.

En este documento se proporciona información general sobre la biblioteca de flujos de datos TPL.This document provides an overview of the TPL Dataflow Library. Se describe el modelo de programación, los tipos de bloques de flujo de datos predefinidos y cómo configurar bloques de flujo de datos para satisfacer las necesidades específicas de las aplicaciones.It describes the programming model, the predefined dataflow block types, and how to configure dataflow blocks to meet the specific requirements of your applications.

Nota

La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET.The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow.To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.Alternatively, to install it using the .Net Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

Este documento contiene las siguientes secciones:This document contains the following sections:

Modelo de programaciónProgramming Model

La biblioteca de flujos de datos TPL proporciona una base para el paso de mensajes y para paralelizar aplicaciones que consumen mucha CPU, así como aplicaciones intensivas de entrada y salida con alto rendimiento y latencia baja.The TPL Dataflow Library provides a foundation for message passing and parallelizing CPU-intensive and I/O-intensive applications that have high throughput and low latency. También ofrece el control explícito sobre cómo almacenar los datos en búfer y desplazarlos alrededor del sistema.It also gives you explicit control over how data is buffered and moves around the system. Para entender mejor el modelo de programación de flujo de datos, piense en una aplicación que de forma asincrónica carga imágenes desde el disco y crea un compuesto de esas imágenes.To better understand the dataflow programming model, consider an application that asynchronously loads images from disk and creates a composite of those images. Los modelos de programación tradicionales suelen usar devoluciones de llamada y objetos de sincronización, como bloqueos, para coordinar tareas y tener acceso a datos compartidos.Traditional programming models typically require that you use callbacks and synchronization objects, such as locks, to coordinate tasks and access to shared data. Con el modelo de programación de flujo de datos, puede crear objetos de flujo de datos que procesan las imágenes como se leen desde el disco.By using the dataflow programming model, you can create dataflow objects that process images as they are read from disk. Bajo el modelo de flujo de datos, se declara cómo se controlan los datos cuando están disponibles, así como las dependencias entre datos.Under the dataflow model, you declare how data is handled when it becomes available, and also any dependencies between data. Dado que el runtime administra las dependencias entre datos, se puede evitar a menudo la necesidad de sincronizar el acceso a los datos compartidos.Because the runtime manages dependencies between data, you can often avoid the requirement to synchronize access to shared data. Además, dado que el runtime programa el trabajo según la llegada asincrónica de datos, el flujo de datos puede mejorar la capacidad de respuesta y el nivel de rendimiento administrando de forma eficaz los subprocesos subyacentes.In addition, because the runtime schedules work based on the asynchronous arrival of data, dataflow can improve responsiveness and throughput by efficiently managing the underlying threads. Para ver un ejemplo en donde se usa el modelo de programación de flujo de datos para implementar procesamiento de imágenes en una aplicación de Windows Forms, vea Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms.For an example that uses the dataflow programming model to implement image processing in a Windows Forms application, see Walkthrough: Using Dataflow in a Windows Forms Application.

Orígenes y destinosSources and Targets

La biblioteca de flujos de datos TPL consta de bloques de flujo de datos, que son estructuras de datos que almacenan datos en búfer y procesan datos.The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. La biblioteca TPL define tres tipos de bloques de flujos de datos: bloques de origen, bloques de destino y bloques propagadores.The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. Un bloque de origen actúa como un origen de datos y se puede leer desde él.A source block acts as a source of data and can be read from. Un bloque de destino actúa como un receptor de datos y se puede escribir en él.A target block acts as a receiver of data and can be written to. Un bloque propagador actúa como un bloque de origen y un bloque de destino, y se puede leer desde él y escribir en él.A propagator block acts as both a source block and a target block, and can be read from and written to. La biblioteca TPL define la interfaz System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> para representar orígenes, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> para representar destinos y System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> para representar propagadores.The TPL defines the System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> interface to represent sources, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to represent targets, and System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> to represent propagators. IPropagatorBlock<TInput,TOutput> hereda de ISourceBlock<TOutput> y de ITargetBlock<TInput>.IPropagatorBlock<TInput,TOutput> inherits from both ISourceBlock<TOutput>, and ITargetBlock<TInput>.

La biblioteca de flujos de datos TPL proporciona varios tipos de bloques de flujo de datos predefinidos que implementan las interfaces ISourceBlock<TOutput>, ITargetBlock<TInput> y IPropagatorBlock<TInput,TOutput>.The TPL Dataflow Library provides several predefined dataflow block types that implement the ISourceBlock<TOutput>, ITargetBlock<TInput>, and IPropagatorBlock<TInput,TOutput> interfaces. Estos tipos de bloques de flujo de datos se describen en este documento en la sección Tipos de bloques de flujo de datos predefinidos.These dataflow block types are described in this document in the section Predefined Dataflow Block Types.

Conectar bloquesConnecting Blocks

Puede conectar los bloques de flujo de datos para establecer canalizaciones, que son secuencias lineales de bloques de flujo de datos, o redes, que son gráficos de bloques de flujo de datos.You can connect dataflow blocks to form pipelines, which are linear sequences of dataflow blocks, or networks, which are graphs of dataflow blocks. Una canalización es una forma de red.A pipeline is one form of network. En una canalización o red, los orígenes propagan datos de forma asincrónica en destinos a medida que esos datos están disponibles.In a pipeline or network, sources asynchronously propagate data to targets as that data becomes available. El método ISourceBlock<TOutput>.LinkTo vincula un bloque de flujo de datos de origen a un bloque de destino.The ISourceBlock<TOutput>.LinkTo method links a source dataflow block to a target block. Un origen puede vincularse a cero o más destinos; los destinos se pueden vincular a partir de cero o más orígenes.A source can be linked to zero or more targets; targets can be linked from zero or more sources. Puede agregar o quitar bloques de flujo de datos hacia o desde una canalización o red simultáneamente.You can add or remove dataflow blocks to or from a pipeline or network concurrently. Los tipos de bloques de flujo de datos predefinidos controlan todos los aspectos de la seguridad para subprocesos de vinculación y desvinculación.The predefined dataflow block types handle all thread-safety aspects of linking and unlinking.

Para ver un ejemplo en donde se conectan bloques de flujo de datos para formar una canalización básica, vea Tutorial: Creación de una canalización de flujos de datos.For an example that connects dataflow blocks to form a basic pipeline, see Walkthrough: Creating a Dataflow Pipeline. Para ver un ejemplo en donde se conectan bloques de flujo de datos para formar una red más compleja, vea Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms.For an example that connects dataflow blocks to form a more complex network, see Walkthrough: Using Dataflow in a Windows Forms Application. Para obtener un ejemplo en donde un destino se desvincula de un origen después de que el origen le ofrezca un mensaje, vea Procedimiento: Desvinculación de bloques de flujo de datos.For an example that unlinks a target from a source after the source offers the target a message, see How to: Unlink Dataflow Blocks.

FiltradoFiltering

Cuando se llama al método ISourceBlock<TOutput>.LinkTo para vincular un origen a un destino, puede proporcionar un delegado que determina si el bloque de destino acepta o rechaza un mensaje basado en el valor de ese mensaje.When you call the ISourceBlock<TOutput>.LinkTo method to link a source to a target, you can supply a delegate that determines whether the target block accepts or rejects a message based on the value of that message. Este mecanismo de filtrado resulta útil para garantizar que un bloque de flujo de datos recibe solo ciertos valores.This filtering mechanism is a useful way to guarantee that a dataflow block receives only certain values. Para la mayoría de los tipos de bloques de flujo de datos predefinidos, si un bloque de origen está conectado a varios bloques de destino, cuando un bloque de destino rechaza un mensaje, el origen proporciona ese mensaje al destino siguiente.For most of the predefined dataflow block types, if a source block is connected to multiple target blocks, when a target block rejects a message, the source offers that message to the next target. El orden en el que un origen proporciona mensajes a los destinos se define mediante el origen y puede variar según el tipo de origen.The order in which a source offers messages to targets is defined by the source and can vary according to the type of the source. La mayoría de los tipos de bloques de origen dejan de proporcionar un mensaje después de que un destino acepta ese mensaje.Most source block types stop offering a message after one target accepts that message. Una excepción a esta regla es la clase BroadcastBlock<T>, que proporciona cada mensaje a todos los destinos, aunque algunos destinos rechacen el mensaje.One exception to this rule is the BroadcastBlock<T> class, which offers each message to all targets, even if some targets reject the message. Para ver un ejemplo en donde se usa el filtrado para procesar únicamente determinados mensajes, consulte Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms.For an example that uses filtering to process only certain messages, see Walkthrough: Using Dataflow in a Windows Forms Application.

Importante

Dado que cada tipo de bloque de flujo de datos de origen predefinido garantiza que los mensajes se propaguen en el orden en que se reciben, se debe leer cada mensaje desde el bloque de origen antes de que el bloque de origen pueda procesar el mensaje siguiente.Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, every message must be read from the source block before the source block can process the next message. Por consiguiente, si usa el filtrado para conectarse varios destinos a un origen, asegúrese de que al menos un bloque de destino recibe cada mensaje.Therefore, when you use filtering to connect multiple targets to a source, make sure that at least one target block receives each message. De lo contrario, la aplicación podría generar un interbloqueo.Otherwise, your application might deadlock.

Paso de mensajesMessage Passing

El modelo de programación basado en el flujo de datos está relacionado con el concepto paso de mensajes, donde los componentes independientes de un programa comunican entre sí enviándose mensajes.The dataflow programming model is related to the concept of message passing, where independent components of a program communicate with one another by sending messages. Una manera de propagar mensajes entre los componentes de la aplicación es llamar a los métodos Post y DataflowBlock.SendAsync para enviar mensajes a los bloques de flujo de datos de destino (Post actúa de forma sincrónica; SendAsync actúa de forma asincrónica) y a los métodos Receive, ReceiveAsync y TryReceive para recibir los mensajes desde los bloques de origen.One way to propagate messages among application components is to call the Post and DataflowBlock.SendAsync methods to send messages to target dataflow blocks post (Post acts synchronously; SendAsync acts asynchronously) and the Receive, ReceiveAsync, and TryReceive methods to receive messages from source blocks. Puede combinar estos métodos con las canalizaciones o redes de flujo de datos enviando datos de entrada al nodo principal (un bloque de destino) y recibiendo datos de salida del nodo terminal de la canalización o de los nodos terminales de la red (uno o varios bloques de origen).You can combine these methods with dataflow pipelines or networks by sending input data to the head node (a target block), and receiving output data from the terminal node of the pipeline or the terminal nodes of the network (one or more source blocks). También puede utilizar el método Choose para leer desde el primero de los orígenes proporcionados siempre que tenga datos disponibles y realice acciones sobre esos datos.You can also use the Choose method to read from the first of the provided sources that has data available and perform action on that data.

Los bloques de origen proporcionan datos a los bloques de destino llamando al método ITargetBlock<TInput>.OfferMessage.Source blocks offer data to target blocks by calling the ITargetBlock<TInput>.OfferMessage method. El bloque de destino responde a un mensaje proporcionado de una de estas tres maneras: puede aceptar el mensaje, rechazar el mensaje o posponer el mensaje.The target block responds to an offered message in one of three ways: it can accept the message, decline the message, or postpone the message. Cuando el destino acepta el mensaje, el método OfferMessage devuelve Accepted.When the target accepts the message, the OfferMessage method returns Accepted. Cuando el destino rechaza el mensaje, el método OfferMessage devuelve Declined.When the target declines the message, the OfferMessage method returns Declined. Cuando el destino requiere que ya no recibe ningún mensaje del origen, OfferMessage devuelve DecliningPermanently.When the target requires that it no longer receives any messages from the source, OfferMessage returns DecliningPermanently. Los tipos de bloques de origen predefinidos no proporcionan mensajes a los destinos vinculados después de recibir este valor devuelto, y automáticamente se desvinculan de estos destinos.The predefined source block types do not offer messages to linked targets after such a return value is received, and they automatically unlink from such targets.

Cuando un bloque de destino pospone el mensaje para su uso posterior, el método OfferMessage devuelve Postponed.When a target block postpones the message for later use, the OfferMessage method returns Postponed. Un bloque de destino que pospone un mensaje puede llamar posteriormente al método ISourceBlock<TOutput>.ReserveMessage para tratar de reservar el mensaje proporcionado.A target block that postpones a message can later call the ISourceBlock<TOutput>.ReserveMessage method to try to reserve the offered message. En este punto, el mensaje todavía permanece disponible y lo puede usar el bloque de destino, o puede que otro destino haya tomado el mensaje.At this point, the message is either still available and can be used by the target block, or the message has been taken by another target. Cuando el bloque de destino requiere el mensaje posteriormente o cuando ya no necesita el mensaje, llama al método ISourceBlock<TOutput>.ConsumeMessage o ReleaseReservation, respectivamente.When the target block later requires the message or no longer needs the message, it calls the ISourceBlock<TOutput>.ConsumeMessage or ReleaseReservation method, respectively. La reserva de mensajes la utilizan normalmente los tipos de bloques de flujo de datos que trabajan en modo no expansivo.Message reservation is typically used by the dataflow block types that operate in non-greedy mode. El modo no expansivo se explica más adelante en este documento.Non-greedy mode is explained later in this document. En lugar de reservar un mensaje pospuesto, un bloque de destino puede utilizar el método ISourceBlock<TOutput>.ConsumeMessage para intentar utilizar directamente el mensaje pospuesto.Instead of reserving a postponed message, a target block can also use the ISourceBlock<TOutput>.ConsumeMessage method to attempt to directly consume the postponed message.

Finalización del bloque de flujo de datosDataflow Block Completion

Los bloques de flujo de datos también admiten el concepto de finalización.Dataflow blocks also support the concept of completion. Un bloque de flujo de datos que está en estado completado no realiza ningún trabajo posterior.A dataflow block that is in the completed state does not perform any further work. Cada bloque de flujo de datos tiene un objeto System.Threading.Tasks.Task asociado, conocido como tarea de finalización, que representa el estado de finalización del bloque.Each dataflow block has an associated System.Threading.Tasks.Task object, known as a completion task, that represents the completion status of the block. Dado que para finalizar se puede esperar un objeto Task, mediante tareas de finalización, para finalizar se pueden esperar uno o más nodos terminales de una red de flujo de datos.Because you can wait for a Task object to finish, by using completion tasks, you can wait for one or more terminal nodes of a dataflow network to finish. La interfaz IDataflowBlock define el método Complete, que informa al bloque de flujo de datos de una solicitud para que se complete, y la propiedad Completion, que devuelve la tarea de finalización para el bloque de flujo de datos.The IDataflowBlock interface defines the Complete method, which informs the dataflow block of a request for it to complete, and the Completion property, which returns the completion task for the dataflow block. Tanto ISourceBlock<TOutput> como ITargetBlock<TInput> heredan de la interfaz IDataflowBlock.Both ISourceBlock<TOutput> and ITargetBlock<TInput> inherit the IDataflowBlock interface.

Hay dos maneras de determinar si un bloque de flujo de datos se completó sin error, encontró uno o más errores, o se canceló.There are two ways to determine whether a dataflow block completed without error, encountered one or more errors, or was canceled. La primera manera consiste en llamar al método Task.Wait en la tarea de finalización en un bloque try-catch (Try-Catch en Visual Basic).The first way is to call the Task.Wait method on the completion task in a try-catch block (Try-Catch in Visual Basic). En el siguiente ejemplo se crea un objeto ActionBlock<TInput> que produce ArgumentOutOfRangeException si su valor de entrada es menor que cero.The following example creates an ActionBlock<TInput> object that throws ArgumentOutOfRangeException if its input value is less than zero. AggregateException se produce cuando este ejemplo llama a Wait en la tarea de finalización.AggregateException is thrown when this example calls Wait on the completion task. Se obtiene acceso a ArgumentOutOfRangeException mediante la propiedad InnerExceptions del objeto AggregateException.The ArgumentOutOfRangeException is accessed through the InnerExceptions property of the AggregateException object.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}", 
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
         ' Create an ActionBlock<int> object that prints its input
         ' and throws ArgumentOutOfRangeException if the input
         ' is less than zero.
         Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
             Console.WriteLine("n = {0}", n)
             If n < 0 Then
                 Throw New ArgumentOutOfRangeException()
             End If
         End Sub)

         ' Post values to the block.
         throwIfNegative.Post(0)
         throwIfNegative.Post(-1)
         throwIfNegative.Post(1)
         throwIfNegative.Post(-2)
         throwIfNegative.Complete()

         ' Wait for completion in a try/catch block.
         Try
            throwIfNegative.Completion.Wait()
         Catch ae As AggregateException
            ' If an unhandled exception occurs during dataflow processing, all
            ' exceptions are propagated through an AggregateException object.
            ae.Handle(Function(e)
                Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                Return True
            End Function)
         End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

En este ejemplo se muestra el caso en el que una excepción no está controlada en el delegado de un bloque de flujo de datos de ejecución.This example demonstrates the case in which an exception goes unhandled in the delegate of an execution dataflow block. Se recomienda controlar las excepciones en los cuerpos de estos bloques.We recommend that you handle exceptions in the bodies of such blocks. Sin embargo, si no puede hacerlo, el bloque se comporta como si estuviera cancelado y no procesa los mensajes entrantes.However, if you are unable to do so, the block behaves as though it was canceled and does not process incoming messages.

Cuando un bloque de flujo de datos se cancela explícitamente, el objeto AggregateException contiene OperationCanceledException en la propiedad InnerExceptions.When a dataflow block is canceled explicitly, the AggregateException object contains OperationCanceledException in the InnerExceptions property. Para obtener más información sobre la cancelación del flujo de datos, vea la sección Habilitar la cancelación.For more information about dataflow cancellation, see Enabling Cancellation section.

La segunda manera de determinar el estado de finalización de un bloque de flujo de datos es usar una continuación de la tarea de finalización o utilizar las características de lenguaje asincrónicas de C# y Visual Basic para esperar a la tarea de finalización de forma asincrónica.The second way to determine the completion status of a dataflow block is to use a continuation of the completion task, or to use the asynchronous language features of C# and Visual Basic to asynchronously wait for the completion task. El delegado que se proporciona al método Task.ContinueWith toma un objeto Task que representa la tarea anterior.The delegate that you provide to the Task.ContinueWith method takes a Task object that represents the antecedent task. En el caso de la propiedad Completion, el delegado de continuación toma la propia tarea de finalización.In the case of the Completion property, the delegate for the continuation takes the completion task itself. El siguiente ejemplo se parece el anterior, con la salvedad de que también utiliza el método ContinueWith para crear una tarea de continuación que imprime el estado de la operación total de flujo de datos.The following example resembles the previous one, except that it also uses the ContinueWith method to create a continuation task that prints the status of the overall dataflow operation.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall 
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.", 
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
         ' Create an ActionBlock<int> object that prints its input
         ' and throws ArgumentOutOfRangeException if the input
         ' is less than zero.
         Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
             Console.WriteLine("n = {0}", n)
             If n < 0 Then
                 Throw New ArgumentOutOfRangeException()
             End If
         End Sub)

         ' Create a continuation task that prints the overall 
         ' task status to the console when the block finishes.
         throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

         ' Post values to the block.
         throwIfNegative.Post(0)
         throwIfNegative.Post(-1)
         throwIfNegative.Post(1)
         throwIfNegative.Post(-2)
         throwIfNegative.Complete()

         ' Wait for completion in a try/catch block.
         Try
            throwIfNegative.Completion.Wait()
         Catch ae As AggregateException
            ' If an unhandled exception occurs during dataflow processing, all
            ' exceptions are propagated through an AggregateException object.
            ae.Handle(Function(e)
                Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                Return True
            End Function)
         End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

También puede usar propiedades como IsCanceled en el cuerpo de la tarea de continuación para determinar información adicional sobre el estado de finalización de un bloque de flujo de datos.You can also use properties such as IsCanceled in the body of the continuation task to determine additional information about the completion status of a dataflow block. Para obtener más información sobre las tareas de continuación y cómo se relacionan con la cancelación y el control de errores, vea Encadenar tareas mediante tareas de continuación, Cancelación de tareas y Control de excepciones.For more information about continuation tasks and how they relate to cancellation and error handling, see Chaining Tasks by Using Continuation Tasks, Task Cancellation, and Exception Handling.

[ir al principio][go to top]

Tipos de bloques de flujo de datos predefinidosPredefined Dataflow Block Types

La biblioteca de flujos de datos TPL proporciona varios tipos de bloques de flujo de datos predefinidos.The TPL Dataflow Library provides several predefined dataflow block types. Estos tipos se dividen en tres categorías: bloques de almacenamiento en búfer, bloques de ejecución y bloques de agrupación.These types are divided into three categories: buffering blocks, execution blocks, and grouping blocks. En las secciones siguientes se describen los tipos de bloques que componen estas categorías.The following sections describe the block types that make up these categories.

Bloques de almacenamiento en búferBuffering Blocks

Los bloques de almacenamiento en búfer contiene datos para su uso por los consumidores de datos.Buffering blocks hold data for use by data consumers. La biblioteca de flujos de datos TPL proporciona tres tipos de bloques de almacenamiento en búfer: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T> y System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.The TPL Dataflow Library provides three buffering block types: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>, and System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock(T)BufferBlock(T)

La clase BufferBlock<T> representa una estructura de mensajería asincrónica de uso general.The BufferBlock<T> class represents a general-purpose asynchronous messaging structure. Esta clase almacena una cola FIFO (primero en entrar, primero en salir) de mensajes donde varios orígenes pueden escribir o de los que varios destinos pueden leer.This class stores a first in, first out (FIFO) queue of messages that can be written to by multiple sources or read from by multiple targets. Cuando un destino recibe un mensaje de un objeto BufferBlock<T>, ese mensaje se quita de la cola de mensajes.When a target receives a message from a BufferBlock<T> object, that message is removed from the message queue. Por tanto, aunque un objeto BufferBlock<T> puede tener varios destinos, solo uno recibirá cada mensaje.Therefore, although a BufferBlock<T> object can have multiple targets, only one target will receive each message. La clase BufferBlock<T> resulta útil si desea pasar varios mensajes a otro componente, y ese componente debe recibir cada mensaje.The BufferBlock<T> class is useful when you want to pass multiple messages to another component, and that component must receive each message.

En el siguiente ejemplo básico se exponen varios valores Int32 a un objeto BufferBlock<T> y después se leen esos valores desde ese objeto.The following basic example posts several Int32 values to a BufferBlock<T> object and then reads those values back from that object.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
         ' Create a BufferBlock<int> object.
         Dim bufferBlock = New BufferBlock(Of Integer)()

         ' Post several messages to the block.
         For i As Integer = 0 To 2
            bufferBlock.Post(i)
         Next i

         ' Receive the messages back from the block.
         For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
         Next i

'          Output:
'            0
'            1
'            2
'          

Para obtener un ejemplo completo en donde se muestra cómo escribir y leer mensajes desde un objeto BufferBlock<T>, vea Procedimiento: Escritura y lectura de mensajes en un bloque de flujo de datos.For a complete example that demonstrates how to write messages to and read messages from a BufferBlock<T> object, see How to: Write Messages to and Read Messages from a Dataflow Block.

BroadcastBlock(T)BroadcastBlock(T)

La clase BroadcastBlock<T> resulta útil si debe pasar varios mensajes a otro componente, pero este componente solo necesita el valor más reciente.The BroadcastBlock<T> class is useful when you must pass multiple messages to another component, but that component needs only the most recent value. Esta clase también resulta útil si desea difundir un mensaje a varios componentes.This class is also useful when you want to broadcast a message to multiple components.

En el siguiente ejemplo básico se publica un valor Double a un objeto BroadcastBlock<T> y después se lee ese valor desde el objeto varias veces.The following basic example posts a Double value to a BroadcastBlock<T> object and then reads that value back from that object several times. Dado que los valores no se quitan de los objetos BroadcastBlock<T> después de leerlos, el mismo valor está disponible cada vez.Because values are not removed from BroadcastBlock<T> objects after they are read, the same value is available every time.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
         ' Create a BroadcastBlock<double> object.
         Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

         ' Post a message to the block.
         broadcastBlock.Post(Math.PI)

         ' Receive the messages back from the block several times.
         For i As Integer = 0 To 2
            Console.WriteLine(broadcastBlock.Receive())
         Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Para obtener un ejemplo completo en donde se muestra cómo utilizar BroadcastBlock<T> para difundir un mensaje a varios bloques de destino, vea Procedimiento: Especificación de un Programador de tareas en un bloque de flujo de datos.For a complete example that demonstrates how to use BroadcastBlock<T> to broadcast a message to multiple target blocks, see How to: Specify a Task Scheduler in a Dataflow Block.

WriteOnceBlock(T)WriteOnceBlock(T)

La clase WriteOnceBlock<T> se asemeja a la clase BroadcastBlock<T>, salvo que un objeto WriteOnceBlock<T> se puede escribir una sola una vez.The WriteOnceBlock<T> class resembles the BroadcastBlock<T> class, except that a WriteOnceBlock<T> object can be written to one time only. Puede pensar que WriteOnceBlock<T> es similar a la palabra clave de C# readonly (Readonly en Visual Basic), salvo que un objeto WriteOnceBlock<T> se vuelve inalterable después de recibir un valor en lugar de una construcción.You can think of WriteOnceBlock<T> as being similar to the C# readonly (ReadOnly in Visual Basic) keyword, except that a WriteOnceBlock<T> object becomes immutable after it receives a value instead of at construction. Al igual que la clase BroadcastBlock<T>, cuando un destino recibe un mensaje de un objeto WriteOnceBlock<T>, el mensaje no se quita de dicho objeto.Like the BroadcastBlock<T> class, when a target receives a message from a WriteOnceBlock<T> object, that message is not removed from that object. Por tanto, varios destinos reciben una copia del mensaje.Therefore, multiple targets receive a copy of the message. La clase WriteOnceBlock<T> es útil si desea difundir solamente el primero de varios de mensajes.The WriteOnceBlock<T> class is useful when you want to propagate only the first of multiple messages.

En el siguiente ejemplo básico se exponen varios valores String a un objeto WriteOnceBlock<T> y después se lee ese valor desde el objeto.The following basic example posts multiple String values to a WriteOnceBlock<T> object and then reads the value back from that object. Dado que un objeto WriteOnceBlock<T> solo se puede escribir una vez, a partir de que un objeto WriteOnceBlock<T> recibe un mensaje, ese objeto descarta los mensajes subsiguientes.Because a WriteOnceBlock<T> object can be written to one time only, after a WriteOnceBlock<T> object receives a message, it discards subsequent messages.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first 
// message to be received is written to the block. 
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
         ' Create a WriteOnceBlock<string> object.
         Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

         ' Post several messages to the block in parallel. The first 
         ' message to be received is written to the block. 
         ' Subsequent messages are discarded.
         Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

         ' Receive the message from the block.
         Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Para ver un ejemplo completo en donde se muestra cómo utilizar WriteOnceBlock<T> para recibir el valor de la primera operación que finaliza, consulte Procedimiento: Desvinculación de bloques de flujo de datos.For a complete example that demonstrates how to use WriteOnceBlock<T> to receive the value of the first operation that finishes, see How to: Unlink Dataflow Blocks.

Bloques de ejecuciónExecution Blocks

Los bloques de ejecución llaman a un delegado proporcionado por el usuario para cada fragmento de datos recibidos.Execution blocks call a user-provided delegate for each piece of received data. La biblioteca de flujos de datos TPL proporciona tres tipos de bloques de ejecución: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> y System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.The TPL Dataflow Library provides three execution block types: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>, and System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock(T)ActionBlock(T)

La clase ActionBlock<TInput> es un bloque de destino que llama a un delegado cuando recibe datos.The ActionBlock<TInput> class is a target block that calls a delegate when it receives data. Piense en un objeto ActionBlock<TInput> como un delegado que se ejecuta de forma asincrónica cuando los datos están disponibles.Think of a ActionBlock<TInput> object as a delegate that runs asynchronously when data becomes available. El delegado que se proporciona a un objeto ActionBlock<TInput> puede ser de tipo Action<T> o tipo System.Func<TInput, Task>.The delegate that you provide to an ActionBlock<TInput> object can be of type Action<T> or type System.Func<TInput, Task>. Si se utiliza un objeto ActionBlock<TInput> con Action<T>, se considera que el procesamiento de cada elemento de entrada se ha completado cuando devuelve el delegado.When you use an ActionBlock<TInput> object with Action<T>, processing of each input element is considered completed when the delegate returns. Si se utiliza un objeto ActionBlock<TInput> con System.Func<TInput, Task>, se considera que el procesamiento de cada elemento de entrada se ha completado solamente cuando el objeto devuelto Task está completo.When you use an ActionBlock<TInput> object with System.Func<TInput, Task>, processing of each input element is considered completed only when the returned Task object is completed. Mediante estos dos mecanismos, se puede utilizar ActionBlock<TInput> para el procesamiento sincrónico y asincrónico de cada elemento de entrada.By using these two mechanisms, you can use ActionBlock<TInput> for both synchronous and asynchronous processing of each input element.

En el siguiente ejemplo básico se exponen varios valores Int32 a un objeto ActionBlock<TInput>.The following basic example posts multiple Int32 values to an ActionBlock<TInput> object. El objeto ActionBlock<TInput> imprime esos valores en la consola.The ActionBlock<TInput> object prints those values to the console. Después, en este ejemplo se establece el bloque en estado completado y se espera hasta que finalicen todas las tareas de flujo de datos.This example then sets the block to the completed state and waits for all dataflow tasks to finish.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all 
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
         ' Create an ActionBlock<int> object that prints values
         ' to the console.
         Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

         ' Post several messages to the block.
         For i As Integer = 0 To 2
            actionBlock.Post(i * 10)
         Next i

         ' Set the block to the completed state and wait for all 
         ' tasks to finish.
         actionBlock.Complete()
         actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Para obtener ejemplos completos en donde se muestra cómo usar delegados con la clase ActionBlock<TInput>, vea Procedimiento: Ejecución de una acción cuando un bloque de flujo de datos recibe datos.For complete examples that demonstrate how to use delegates with the ActionBlock<TInput> class, see How to: Perform Action When a Dataflow Block Receives Data.

TransformBlock(TInput, TOutput)TransformBlock(TInput, TOutput)

La clase TransformBlock<TInput,TOutput> es similar a la clase ActionBlock<TInput>, salvo que actúa como origen y como destino.The TransformBlock<TInput,TOutput> class resembles the ActionBlock<TInput> class, except that it acts as both a source and as a target. El delegado que pasa a un objeto TransformBlock<TInput,TOutput> devuelve un valor de tipo TOutput.The delegate that you pass to a TransformBlock<TInput,TOutput> object returns a value of type TOutput. El delegado que se proporciona a un objeto TransformBlock<TInput,TOutput> puede ser de tipo System.Func<TInput, TOutput> o tipo System.Func<TInput, Task<TOutput>>.The delegate that you provide to a TransformBlock<TInput,TOutput> object can be of type System.Func<TInput, TOutput> or type System.Func<TInput, Task<TOutput>>. Si se utiliza un objeto TransformBlock<TInput,TOutput> con System.Func<TInput, TOutput>, se considera que el procesamiento de cada elemento de entrada se ha completado cuando devuelve el delegado.When you use a TransformBlock<TInput,TOutput> object with System.Func<TInput, TOutput>, processing of each input element is considered completed when the delegate returns. Si se utiliza un objeto TransformBlock<TInput,TOutput> que se usa con System.Func<TInput, Task<TOutput>>, se considera que el procesamiento de cada elemento de entrada se ha completado solamente cuando el objeto devuelto Task<TResult> está completo.When you use a TransformBlock<TInput,TOutput> object used with System.Func<TInput, Task<TOutput>>, processing of each input element is considered completed only when the returned Task<TResult> object is completed. Al igual que sucede con ActionBlock<TInput>, mediante estos dos mecanismos, se puede utilizar TransformBlock<TInput,TOutput> para el procesamiento sincrónico y asincrónico de cada elemento de entrada.As with ActionBlock<TInput>, by using these two mechanisms, you can use TransformBlock<TInput,TOutput> for both synchronous and asynchronous processing of each input element.

En el siguiente ejemplo básico se crea un objeto TransformBlock<TInput,TOutput> que calcula la raíz cuadrada de la entrada.The following basic example creates a TransformBlock<TInput,TOutput> object that computes the square root of its input. El objeto TransformBlock<TInput,TOutput> toma valores Int32 como entrada y genera valores Double como salida.The TransformBlock<TInput,TOutput> object takes Int32 values as input and produces Double values as output.

// Create a TransformBlock<int, double> object that 
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
         ' Create a TransformBlock<int, double> object that 
         ' computes the square root of its input.
         Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

         ' Post several messages to the block.
         transformBlock.Post(10)
         transformBlock.Post(20)
         transformBlock.Post(30)

         ' Read the output messages from the block.
         For i As Integer = 0 To 2
            Console.WriteLine(transformBlock.Receive())
         Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Para obtener ejemplos completos que utilizan TransformBlock<TInput,TOutput> en una red de bloques de flujo de datos que realiza procesamiento de imágenes en una aplicación de Windows Forms, vea Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms.For complete examples that uses TransformBlock<TInput,TOutput> in a network of dataflow blocks that performs image processing in a Windows Forms application, see Walkthrough: Using Dataflow in a Windows Forms Application.

TransformManyBlock(TInput, TOutput)TransformManyBlock(TInput, TOutput)

La clase TransformManyBlock<TInput,TOutput> es similar a la clase TransformBlock<TInput,TOutput>, salvo que TransformManyBlock<TInput,TOutput> genere cero o más valores de salida por cada valor de entrada, en lugar de generar un solo valor de salida por cada valor de entrada.The TransformManyBlock<TInput,TOutput> class resembles the TransformBlock<TInput,TOutput> class, except that TransformManyBlock<TInput,TOutput> produces zero or more output values for each input value, instead of only one output value for each input value. El delegado que se proporciona a un objeto TransformManyBlock<TInput,TOutput> puede ser de tipo System.Func<TInput, IEnumerable<TOutput>> o tipo System.Func<TInput, Task<IEnumerable<TOutput>>>.The delegate that you provide to a TransformManyBlock<TInput,TOutput> object can be of type System.Func<TInput, IEnumerable<TOutput>> or type System.Func<TInput, Task<IEnumerable<TOutput>>>. Si se utiliza un objeto TransformManyBlock<TInput,TOutput> con System.Func<TInput, IEnumerable<TOutput>>, se considera que el procesamiento de cada elemento de entrada se ha completado cuando devuelve el delegado.When you use a TransformManyBlock<TInput,TOutput> object with System.Func<TInput, IEnumerable<TOutput>>, processing of each input element is considered completed when the delegate returns. Si se utiliza un objeto TransformManyBlock<TInput,TOutput> con System.Func<TInput, Task<IEnumerable<TOutput>>>, se considera que el procesamiento de cada elemento de entrada se ha completado solo cuando el objeto devuelto System.Threading.Tasks.Task<IEnumerable<TOutput>> está completo.When you use a TransformManyBlock<TInput,TOutput> object with System.Func<TInput, Task<IEnumerable<TOutput>>>, processing of each input element is considered complete only when the returned System.Threading.Tasks.Task<IEnumerable<TOutput>> object is completed.

En el siguiente ejemplo básico se crea un objeto TransformManyBlock<TInput,TOutput> que divide las cadenas en sus secuencias de caracteres individuales.The following basic example creates a TransformManyBlock<TInput,TOutput> object that splits strings into their individual character sequences. El objeto TransformManyBlock<TInput,TOutput> toma valores String como entrada y genera valores Char como salida.The TransformManyBlock<TInput,TOutput> object takes String values as input and produces Char values as output.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
         ' Create a TransformManyBlock<string, char> object that splits
         ' a string into its individual characters.
         Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

         ' Post two messages to the first block.
         transformManyBlock.Post("Hello")
         transformManyBlock.Post("World")

         ' Receive all output values from the block.
         For i As Integer = 0 To ("Hello" & "World").Length - 1
            Console.WriteLine(transformManyBlock.Receive())
         Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Para obtener ejemplos completos que usan TransformManyBlock<TInput,TOutput> para generar varios resultados independientes para cada entrada en una canalización de flujo de datos, vea Tutorial: Creación de una canalización de flujos de datos.For complete examples that use TransformManyBlock<TInput,TOutput> to produce multiple independent outputs for each input in a dataflow pipeline, see Walkthrough: Creating a Dataflow Pipeline.

Grado de paralelismoDegree of Parallelism

Cada uno de los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput> almacena en búfer los mensajes de entrada hasta que el bloque está listo para procesarlos.Every ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput> object buffers input messages until the block is ready to process them. De forma predeterminada, estas clases procesan los mensajes en el orden en el que se recibieron, un mensaje cada vez.By default, these classes process messages in the order in which they are received, one message at a time. También puede especificar el grado de paralelismo para permitir que los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput> puedan procesar varios mensajes simultáneamente.You can also specify the degree of parallelism to enable ActionBlock<TInput>, TransformBlock<TInput,TOutput> and TransformManyBlock<TInput,TOutput> objects to process multiple messages concurrently. Para obtener más información sobre la ejecución simultánea, vea la sección Especificar el grado de paralelismo más adelante en este documento.For more information about concurrent execution, see the section Specifying the Degree of Parallelism later in this document. Para obtener un ejemplo en donde se establece el grado de paralelismo que permite que un bloque de flujo de datos de ejecución pueda procesar varios mensajes al mismo tiempo, vea Procedimiento: Especificación del grado de paralelismo en un bloque de flujo de datos.For an example that sets the degree of parallelism to enable an execution dataflow block to process more than one message at a time, see How to: Specify the Degree of Parallelism in a Dataflow Block.

Resumen de tipos de delegadoSummary of Delegate Types

En la tabla siguiente se resumen los tipos de delegado que puede proporcionar a los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>.The following table summarizes the delegate types that you can provide to ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput> objects. En esta tabla también se especifica si el tipo de delegado funciona de forma sincrónica o asincrónica.This table also specifies whether the delegate type operates synchronously or asynchronously.

TipoType Tipo de delegado sincrónicoSynchronous Delegate Type Tipo de delegado asincrónicoAsynchronous Delegate Type
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

También puede utilizar expresiones lambda cuando trabaja con tipos de bloque de ejecución.You can also use lambda expressions when you work with execution block types. Para obtener un ejemplo en donde se muestra cómo usar una expresión lambda con un bloque de ejecución, vea Procedimiento: Ejecución de una acción cuando un bloque de flujo de datos recibe datos.For an example that shows how to use a lambda expression with an execution block, see How to: Perform Action When a Dataflow Block Receives Data.

Bloques de agrupaciónGrouping Blocks

Los bloques de agrupación combinan datos de uno o más orígenes y con distintas restricciones.Grouping blocks combine data from one or more sources and under various constraints. La biblioteca de flujos de datos TPL proporciona tres tipos de bloques de combinación: BatchBlock<T>, JoinBlock<T1,T2> y BatchedJoinBlock<T1,T2>.The TPL Dataflow Library provides three join block types: BatchBlock<T>, JoinBlock<T1,T2>, and BatchedJoinBlock<T1,T2>.

BatchBlock(T)BatchBlock(T)

La clase BatchBlock<T> combina conjuntos de datos de entrada, que se conocen como lotes, en las matrices de datos de salida.The BatchBlock<T> class combines sets of input data, which are known as batches, into arrays of output data. Especifique el tamaño de cada lote cuando crea un objeto BatchBlock<T>.You specify the size of each batch when you create a BatchBlock<T> object. Cuando el objeto BatchBlock<T> recibe el número especificado de elementos de entrada, propaga de forma asincrónica una matriz que contiene esos elementos.When the BatchBlock<T> object receives the specified count of input elements, it asynchronously propagates out an array that contains those elements. Si un objeto BatchBlock<T> se establece en el estado completado pero no contiene elementos suficientes para formar un lote, propaga una matriz final que contiene los elementos de entrada restantes.If a BatchBlock<T> object is set to the completed state but does not contain enough elements to form a batch, it propagates out a final array that contains the remaining input elements.

La clase BatchBlock<T> funciona en modo expansivo o no expansivo.The BatchBlock<T> class operates in either greedy or non-greedy mode. En modo expansivo, que es el valor predeterminado, un objeto BatchBlock<T> acepta cada mensaje que se proporciona y propaga una matriz después de recibir el número especificado de elementos.In greedy mode, which is the default, a BatchBlock<T> object accepts every message that it is offered and propagates out an array after it receives the specified count of elements. En modo no expansivo, un objeto BatchBlock<T> pospone todos los mensajes entrantes hasta que haya suficientes orígenes que proporcionen mensajes al bloque para formar un lote.In non-greedy mode, a BatchBlock<T> object postpones all incoming messages until enough sources have offered messages to the block to form a batch. Normalmente, el modo expansivo se comporta mejor que el modo no expansivo porque requiere menos sobrecarga de procesamiento.Greedy mode typically performs better than non-greedy mode because it requires less processing overhead. Sin embargo, se puede usar el modo no expansivo cuando se debe coordinar el consumo de varios orígenes en modo atómico.However, you can use non-greedy mode when you must coordinate consumption from multiple sources in an atomic fashion. Especifique el modo no expansivo estableciendo Greedy en False en el parámetro dataflowBlockOptions del constructor BatchBlock<T>.Specify non-greedy mode by setting Greedy to False in the dataflowBlockOptions parameter in the BatchBlock<T> constructor.

En el siguiente ejemplo básico se exponen varios valores Int32 a un objeto BatchBlock<T> que contiene diez elementos en un lote.The following basic example posts several Int32 values to a BatchBlock<T> object that holds ten elements in a batch. Para garantizar que todos los valores se propagan fuera de BatchBlock<T>, este ejemplo llama al método Complete.To guarantee that all values propagate out of the BatchBlock<T>, this example calls the Complete method. El método Complete establece el objeto BatchBlock<T> en el estado completado y, por consiguiente, el objeto BatchBlock<T> propaga cualquier elemento restante como un lote final.The Complete method sets the BatchBlock<T> object to the completed state, and therefore, the BatchBlock<T> object propagates out any remaining elements as a final batch.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
         ' Create a BatchBlock<int> object that holds ten
         ' elements per batch.
         Dim batchBlock = New BatchBlock(Of Integer)(10)

         ' Post several values to the block.
         For i As Integer = 0 To 12
            batchBlock.Post(i)
         Next i
         ' Set the block to the completed state. This causes
         ' the block to propagate out any any remaining
         ' values as a final batch.
         batchBlock.Complete()

         ' Print the sum of both batches.

         Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

         Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Para obtener un ejemplo completo que usa BatchBlock<T> para mejorar la eficacia de las operaciones de inserción de la base de datos, vea Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficacia.For a complete example that uses BatchBlock<T> to improve the efficiency of database insert operations, see Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency.

JoinBlock(T1, T2, ...)JoinBlock(T1, T2, ...)

Las clases JoinBlock<T1,T2> y JoinBlock<T1,T2,T3> obtienen elementos de entrada y propagan objetos System.Tuple<T1,T2> o System.Tuple<T1,T2,T3> que contienen esos elementos.The JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> classes collect input elements and propagate out System.Tuple<T1,T2> or System.Tuple<T1,T2,T3> objects that contain those elements. Las clases JoinBlock<T1,T2> y JoinBlock<T1,T2,T3> no heredan de ITargetBlock<TInput>.The JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> classes do not inherit from ITargetBlock<TInput>. En su lugar, proporcionan propiedades, Target1, Target2 y Target3, que implementan ITargetBlock<TInput>.Instead, they provide properties, Target1, Target2, and Target3, that implement ITargetBlock<TInput>.

Al igual que BatchBlock<T>, JoinBlock<T1,T2> y JoinBlock<T1,T2,T3> funcionan en modo expansivo o no expansivo.Like BatchBlock<T>, JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> operate in either greedy or non-greedy mode. En modo expansivo, que es el valor predeterminado, un objeto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> acepta cada mensaje que se proporciona y propaga una tupla después de que cada uno de sus destinos reciba por lo menos un mensaje.In greedy mode, which is the default, a JoinBlock<T1,T2> or JoinBlock<T1,T2,T3> object accepts every message that it is offered and propagates out a tuple after each of its targets receives at least one message. En modo no expansivo, un objeto JoinBlock<T1,T2> o JoinBlock<T1,T2,T3> pospone todos los mensajes entrantes hasta que todos los destinos han proporcionado los datos necesarios para crear una tupla.In non-greedy mode, a JoinBlock<T1,T2> or JoinBlock<T1,T2,T3> object postpones all incoming messages until all targets have been offered the data that is required to create a tuple. En este punto, el bloque se involucra en un protocolo de confirmación en dos fases para recuperar atómicamente todos los elementos necesarios de los orígenes.At this point, the block engages in a two-phase commit protocol to atomically retrieve all required items from the sources. Este aplazamiento permite que, mientras tanto, otra entidad consuma datos, para permitir que el sistema global progrese.This postponement makes it possible for another entity to consume the data in the meantime, to allow the overall system to make forward progress.

En el siguiente ejemplo básico se muestra un caso en el que un objeto JoinBlock<T1,T2,T3> requiere varios datos para calcular un valor.The following basic example demonstrates a case in which a JoinBlock<T1,T2,T3> object requires multiple data to compute a value. En este ejemplo se crea un objeto JoinBlock<T1,T2,T3> que requiere dos valores Int32 y un valor Char para realizar una operación aritmética.This example creates a JoinBlock<T1,T2,T3> object that requires two Int32 values and a Char value to perform an arithmetic operation.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
         ' Create a JoinBlock<int, int, char> object that requires
         ' two numbers and an operator.
         Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

         ' Post two values to each target of the join.

         joinBlock.Target1.Post(3)
         joinBlock.Target1.Post(6)

         joinBlock.Target2.Post(5)
         joinBlock.Target2.Post(4)

         joinBlock.Target3.Post("+"c)
         joinBlock.Target3.Post("-"c)

         ' Receive each group of values and apply the operator part
         ' to the number parts.

         For i As Integer = 0 To 1
            Dim data = joinBlock.Receive()
            Select Case data.Item3
               Case "+"c
                  Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
               Case "-"c
                  Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
               Case Else
                  Console.WriteLine("Unknown operator '{0}'.", data.Item3)
            End Select
         Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Para obtener un ejemplo completo en donde se usan objetos JoinBlock<T1,T2> en modo no expansivo para compartir conjuntamente un recurso, vea Procedimiento: Uso de JoinBlock para leer datos de varios orígenes.For a complete example that uses JoinBlock<T1,T2> objects in non-greedy mode to cooperatively share a resource, see How to: Use JoinBlock to Read Data From Multiple Sources.

BatchedJoinBlock(T1, T2, ...)BatchedJoinBlock(T1, T2, ...)

Las clases BatchedJoinBlock<T1,T2> y BatchedJoinBlock<T1,T2,T3> obtienen lotes de elementos de entrada y propagan objetos System.Tuple(IList(T1), IList(T2)) o System.Tuple(IList(T1), IList(T2), IList(T3)) que contienen esos elementos.The BatchedJoinBlock<T1,T2> and BatchedJoinBlock<T1,T2,T3> classes collect batches of input elements and propagate out System.Tuple(IList(T1), IList(T2)) or System.Tuple(IList(T1), IList(T2), IList(T3)) objects that contain those elements. Piense en BatchedJoinBlock<T1,T2> como una combinación de BatchBlock<T> y JoinBlock<T1,T2>.Think of BatchedJoinBlock<T1,T2> as a combination of BatchBlock<T> and JoinBlock<T1,T2>. Especifique el tamaño de cada lote cuando crea un objeto BatchedJoinBlock<T1,T2>.Specify the size of each batch when you create a BatchedJoinBlock<T1,T2> object. BatchedJoinBlock<T1,T2> también proporciona propiedades, Target1 y Target2, que implementan ITargetBlock<TInput>.BatchedJoinBlock<T1,T2> also provides properties, Target1 and Target2, that implement ITargetBlock<TInput>. Cuando el número especificado de elementos de entrada se recibe a través de todos los destinos, el objeto BatchedJoinBlock<T1,T2> propaga de forma asincrónica un objeto System.Tuple(IList(T1), IList(T2)) que contiene esos elementos.When the specified count of input elements are received from across all targets, the BatchedJoinBlock<T1,T2> object asynchronously propagates out a System.Tuple(IList(T1), IList(T2)) object that contains those elements.

En el siguiente ejemplo básico se crea un objeto BatchedJoinBlock<T1,T2> que contiene resultados, valores Int32 y errores que son objetos Exception.The following basic example creates a BatchedJoinBlock<T1,T2> object that holds results, Int32 values, and errors that are Exception objects. En este ejemplo se realizan varias operaciones y se escriben los resultados en la propiedad Target1, y los errores en la propiedad Target2, del objeto BatchedJoinBlock<T1,T2>.This example performs multiple operations and writes results to the Target1 property, and errors to the Target2 property, of the BatchedJoinBlock<T1,T2> object. Dado que el número de operaciones correctas y las que dieron error no se conoce de antemano, los objetos IList<T> permiten que cada destino reciba cero o más valores.Because the count of successful and failed operations is unknown in advance, the IList<T> objects enable each target to receive zero or more values.

 // For demonstration, create a Func<int, int> that 
 // returns its argument, or throws ArgumentOutOfRangeException
 // if the argument is less than zero.
 Func<int, int> DoWork = n =>
 {
    if (n < 0)
       throw new ArgumentOutOfRangeException();
    return n;
 };

 // Create a BatchedJoinBlock<int, Exception> object that holds 
 // seven elements per batch.
 var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

 // Post several items to the block.
 foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
 {
    try
    {
       // Post the result of the worker to the 
       // first target of the block.
       batchedJoinBlock.Target1.Post(DoWork(i));
    }
    catch (ArgumentOutOfRangeException e) 
    {
       // If an error occurred, post the Exception to the 
       // second target of the block.
       batchedJoinBlock.Target2.Post(e); 
    }
 }

 // Read the results from the block.
 var results = batchedJoinBlock.Receive();

 // Print the results to the console.

 // Print the results.
 foreach (int n in results.Item1)
 {
    Console.WriteLine(n);
 }
 // Print failures.
 foreach (Exception e in results.Item2)
 {
    Console.WriteLine(e.Message);
 }

 /* Output:
    5
    6
    13
    55
    0
    Specified argument was out of the range of valid values.
    Specified argument was out of the range of valid values.
  */
         ' For demonstration, create a Func<int, int> that 
         ' returns its argument, or throws ArgumentOutOfRangeException
         ' if the argument is less than zero.
         Dim DoWork As Func(Of Integer, Integer) = Function(n)
            If n < 0 Then
               Throw New ArgumentOutOfRangeException()
            End If
            Return n
         End Function

         ' Create a BatchedJoinBlock<int, Exception> object that holds 
         ' seven elements per batch.
         Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

         ' Post several items to the block.
         For Each i As Integer In New Integer() { 5, 6, -7, -22, 13, 55, 0 }
            Try
               ' Post the result of the worker to the 
               ' first target of the block.
               batchedJoinBlock.Target1.Post(DoWork(i))
            Catch e As ArgumentOutOfRangeException
               ' If an error occurred, post the Exception to the 
               ' second target of the block.
               batchedJoinBlock.Target2.Post(e)
            End Try
         Next i

         ' Read the results from the block.
         Dim results = batchedJoinBlock.Receive()

         ' Print the results to the console.

         ' Print the results.
         For Each n As Integer In results.Item1
            Console.WriteLine(n)
         Next n
         ' Print failures.
         For Each e As Exception In results.Item2
            Console.WriteLine(e.Message)
         Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Para obtener un ejemplo completo que usa BatchedJoinBlock<T1,T2> para capturar resultados y cualquier excepción que se produzca mientras el programa lee de una base de datos, vea Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficacia.For a complete example that uses BatchedJoinBlock<T1,T2> to capture both the results and any exceptions that occur while the program reads from a database, see Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency.

[ir al principio][go to top]

Configurar el comportamiento del bloque de flujo de datosConfiguring Dataflow Block Behavior

Puede habilitar opciones adicionales si proporciona un objeto System.Threading.Tasks.Dataflow.DataflowBlockOptions al constructor de los tipos de bloques de flujo de datos.You can enable additional options by providing a System.Threading.Tasks.Dataflow.DataflowBlockOptions object to the constructor of dataflow block types. Estas opciones controlan el comportamiento, como el del programador que administra la tarea subyacente, y el grado de paralelismo.These options control behavior such the scheduler that manages the underlying task and the degree of parallelism. El objeto DataflowBlockOptions también tiene tipos derivados que especifican el comportamiento específico de ciertos tipos de bloques de flujo de datos.The DataflowBlockOptions also has derived types that specify behavior that is specific to certain dataflow block types. En la tabla siguiente se resumen los tipo de opciones que se asocian a cada tipo de bloques de flujo de datos.The following table summarizes which options type is associated with each dataflow block type.

Tipo de bloques de flujo de datosDataflow Block Type Tipo de DataflowBlockOptionsDataflowBlockOptions type
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Las secciones siguientes proporcionan información adicional sobre las clases importantes de opciones de bloques de flujo de datos que están disponibles a través de las clases System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions y System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.The following sections provide additional information about the important kinds of dataflow block options that are available through the System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions, and System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions classes.

Especificar el programador de tareasSpecifying the Task Scheduler

Cada bloque de flujo de datos predefinido utiliza el mecanismo de programación de tareas de la biblioteca TPL para realizar actividades como propagar datos a un destino, recibir datos de un origen y ejecutar delegados definido por el usuario si los datos están disponibles.Every predefined dataflow block uses the TPL task scheduling mechanism to perform activities such as propagating data to a target, receiving data from a source, and running user-defined delegates when data becomes available. TaskScheduler es una clase abstracta que representa un programador de tareas que pone en cola las tareas en los subprocesos.TaskScheduler is an abstract class that represents a task scheduler that queues tasks onto threads. El programador de tareas predeterminado, Default, utiliza la clase ThreadPool para poner en cola y ejecutar el trabajo.The default task scheduler, Default, uses the ThreadPool class to queue and execute work. Puede reemplazar el programador de tareas predeterminado estableciendo la propiedad TaskScheduler al crear un objeto de bloque de flujo de datos.You can override the default task scheduler by setting the TaskScheduler property when you construct a dataflow block object.

Cuando el mismo programador de tareas administra varios bloques de flujo de datos, puede aplicar directivas entre ellos.When the same task scheduler manages multiple dataflow blocks, it can enforce policies across them. Por ejemplo, si cada uno de los bloques de flujo de datos se configuran como destino del programador exclusivo del mismo objeto ConcurrentExclusiveSchedulerPair, todo el trabajo que se ejecuta a través de estos bloques se serializa.For example, if multiple dataflow blocks are each configured to target the exclusive scheduler of the same ConcurrentExclusiveSchedulerPair object, all work that runs across these blocks is serialized. De igual forma, si estos bloques se configuran como destino del programador simultáneo del mismo objeto ConcurrentExclusiveSchedulerPair, y el programador se configura para tener un nivel de simultaneidad máximo, todo el trabajo de estos bloques se limita a ese número de operaciones simultáneas.Similarly, if these blocks are configured to target the concurrent scheduler of the same ConcurrentExclusiveSchedulerPair object, and that scheduler is configured to have a maximum concurrency level, all work from these blocks is limited to that number of concurrent operations. Para obtener un ejemplo en donde se usa la clase ConcurrentExclusiveSchedulerPair para permitir que las operaciones de lectura se produzcan en paralelo, pero las operaciones de escritura sean exclusivas del resto de operaciones, vea Procedimiento: Especificación de un Programador de tareas en un bloque de flujo de datos.For an example that uses the ConcurrentExclusiveSchedulerPair class to enable read operations to occur in parallel, but write operations to occur exclusively of all other operations, see How to: Specify a Task Scheduler in a Dataflow Block. Para obtener más información sobre los programadores de tareas en la biblioteca TPL, consulte el tema sobre la clase TaskScheduler.For more information about task schedulers in the TPL, see the TaskScheduler class topic.

Especificar el grado de ParalelismoSpecifying the Degree of Parallelism

De forma predeterminada, los tres tipos de bloques de ejecución que la biblioteca de flujos de datos TPL proporciona, ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>, procesan un mensaje al mismo tiempo.By default, the three execution block types that the TPL Dataflow Library provides, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput>, process one message at a time. Estos tipos de bloques de flujo de datos también procesan mensajes en el orden en que se reciben.These dataflow block types also process messages in the order in which they are received. Para permitir que estos bloques de flujo de datos procesen mensajes simultáneamente, establezca la propiedad ExecutionDataflowBlockOptions.MaxDegreeOfParallelism cuando construya el objeto de bloques de flujo de datos.To enable these dataflow blocks to process messages concurrently, set the ExecutionDataflowBlockOptions.MaxDegreeOfParallelism property when you construct the dataflow block object.

El valor predeterminado de MaxDegreeOfParallelism es 1, que garantiza que el bloque de flujo de datos procesa un mensaje al mismo tiempo.The default value of MaxDegreeOfParallelism is 1, which guarantees that the dataflow block processes one message at a time. Al establecer esta propiedad en un valor mayor de 1 se permite que el bloque de flujo de datos procese varios mensajes simultáneamente.Setting this property to a value that is larger than 1 enables the dataflow block to process multiple messages concurrently. Al establecer esta propiedad en DataflowBlockOptions.Unbounded se permite que el programador de tareas subyacente administre el grado máximo de simultaneidad.Setting this property to DataflowBlockOptions.Unbounded enables the underlying task scheduler to manage the maximum degree of concurrency.

Importante

Cuando se especifica un grado máximo de paralelismo mayor que 1, varios mensajes se procesan simultáneamente y, por tanto, los mensajes no se pueden procesar en el orden en que se reciben.When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore messages might not be processed in the order in which they are received. Pero los mensajes salen del bloque en el mismo orden en que se reciben.The order in which the messages are output from the block is, however, the same one in which they are received.

Dado que la propiedad MaxDegreeOfParallelism representa el grado máximo de paralelismo, el bloque de flujo de datos puede ejecutarse con un menor grado de paralelismo que el especificado.Because the MaxDegreeOfParallelism property represents the maximum degree of parallelism, the dataflow block might execute with a lesser degree of parallelism than you specify. El bloque de flujo de datos puede utilizar un menor grado de paralelismo para cumplir los requisitos funcionales o porque hay una falta de recursos del sistema disponibles.The dataflow block might use a lesser degree of parallelism to meet its functional requirements or because there is a lack of available system resources. Un flujo de datos bloqueado nunca elige más paralelismo que el especificado.A dataflow block never chooses more parallelism than you specify.

El valor de la propiedad MaxDegreeOfParallelism es exclusivo para cada objeto de bloque de flujo de datos.The value of the MaxDegreeOfParallelism property is exclusive to each dataflow block object. Por ejemplo, si cuatro objetos de bloques de flujo de datos especifican 1 como el grado máximo de paralelismo, los cuatro objetos de bloques de flujo de datos podrían ejecutarse en paralelo.For example, if four dataflow block objects each specify 1 for the maximum degree of parallelism, all four dataflow block objects can potentially run in parallel.

Para obtener un ejemplo en donde se establece el grado máximo de paralelismo que permite que se produzcan operaciones largas en paralelo, vea Procedimiento: Especificación del grado de paralelismo en un bloque de flujo de datos.For an example that sets the maximum degree of parallelism to enable lengthy operations to occur in parallel, see How to: Specify the Degree of Parallelism in a Dataflow Block.

Especificar el número de mensajes por tareaSpecifying the Number of Messages per Task

Los tipos predefinidos de bloques de flujo de datos utilizan tareas para procesar varios elementos de entrada.The predefined dataflow block types use tasks to process multiple input elements. Esto ayuda a minimizar el número de objetos de tarea necesarios para procesar datos, lo que permite que las aplicaciones se ejecuten más eficazmente.This helps minimize the number of task objects that are required to process data, which enables applications to run more efficiently. Sin embargo, cuando las tareas de un conjunto de bloques de flujo de datos están procesando datos, es posible que las tareas de otros bloques de flujo de datos tengan que esperar el tiempo de procesamiento en la cola mensajes.However, when the tasks from one set of dataflow blocks are processing data, the tasks from other dataflow blocks might need to wait for processing time by queuing messages. Para permitir una mejor equidad entre tareas de flujo de datos, establezca la propiedad MaxMessagesPerTask.To enable better fairness among dataflow tasks, set the MaxMessagesPerTask property. Cuando MaxMessagesPerTask se establece en DataflowBlockOptions.Unbounded, que es el valor predeterminado, la tarea utilizada por un bloque de flujo de datos procesa tantos mensajes como están disponibles.When MaxMessagesPerTask is set to DataflowBlockOptions.Unbounded, which is the default, the task used by a dataflow block processes as many messages as are available. Cuando MaxMessagesPerTask se establece en un valor distinto de Unbounded, el bloque de flujo de datos procesa como máximo este número de mensajes por objeto Task.When MaxMessagesPerTask is set to a value other than Unbounded, the dataflow block processes at most this number of messages per Task object. Aunque al establecer la propiedad MaxMessagesPerTask se puede aumentar la equidad entre tareas, puede provocar que el sistema cree más tareas que las necesarias, lo que puede reducir el rendimiento.Although setting the MaxMessagesPerTask property can increase fairness among tasks, it can cause the system to create more tasks than are necessary, which can decrease performance.

Habilitar la cancelaciónEnabling Cancellation

La biblioteca TPL proporciona un mecanismo que habilita las tareas para coordinar la cancelación de manera cooperativa.The TPL provides a mechanism that enables tasks to coordinate cancellation in a cooperative manner. Para permitir que los bloques de flujo de datos puedan participar en este mecanismo de cancelación, establezca la propiedad CancellationToken.To enable dataflow blocks to participate in this cancellation mechanism, set the CancellationToken property. Cuando este objeto CancellationToken se establece en el estado cancelado, todos los bloques de flujo de datos que controlan este token finalizan la ejecución de su elemento actual pero no comienzan a procesar los elementos siguientes.When this CancellationToken object is set to the canceled state, all dataflow blocks that monitor this token finish execution of their current item but do not start processing subsequent items. Estos bloques de flujo de datos también borran los mensajes almacenados en búfer, conexiones de inicio para los bloques de origen y de destino, y la transición al estado cancelado.These dataflow blocks also clear any buffered messages, release connections to any source and target blocks, and transition to the canceled state. Al realizar la transición al estado cancelado, la propiedad Completion tiene la propiedad Status establecida en Canceled, a menos que se produzca una excepción durante el procesamiento.By transitioning to the canceled state, the Completion property has the Status property set to Canceled, unless an exception occurred during processing. En ese caso, Status se establece en Faulted.In that case, Status is set to Faulted.

Para obtener un ejemplo en donde se muestra cómo usar la cancelación en una aplicación de Windows Forms, vea Procedimiento: Cómo: Cancelar un bloque de flujos de datos.For an example that demonstrates how to use cancellation in a Windows Forms application, see How to: Cancel a Dataflow Block. Para más información sobre la cancelación en la biblioteca TPL, consulte Task Cancellation (Cancelación de tareas).For more information about cancellation in the TPL, see Task Cancellation.

Especificar el comportamiento expansivo frente al no expansivoSpecifying Greedy Versus Non-Greedy Behavior

Varios tipos de bloques de flujo de datos de agrupación pueden trabajar en modo expansivo o no expansivo.Several grouping dataflow block types can operate in either greedy or non-greedy mode. De forma predeterminada, los tipos de bloques de flujo de datos predefinidos funcionan en modo expansivo.By default, the predefined dataflow block types operate in greedy mode.

Para los tipos de bloques de combinación como JoinBlock<T1,T2>, el modo expansivo significa que el bloque acepta datos inmediatamente aunque los correspondientes datos con los que se combinará aún no estén disponibles.For join block types such as JoinBlock<T1,T2>, greedy mode means that the block immediately accepts data even if the corresponding data with which to join is not yet available. El modo no expansivo significa que el bloque pospone todos los mensajes entrantes hasta que uno esté disponible para cada uno de sus destinos para completar la combinación.Non-greedy mode means that the block postpones all incoming messages until one is available on each of its targets to complete the join. Si los mensajes pospuestos ya no están disponibles, el bloque de combinación libera todos los mensajes pospuestos y reinicia el proceso.If any of the postponed messages are no longer available, the join block releases all postponed messages and restarts the process. Para la clase BatchBlock<T>, el comportamiento expansivo y no expansivo es similar, salvo que en modo no expansivo, un objeto BatchBlock<T> pospone todos los mensajes entrantes hasta que haya suficientes mensajes disponibles de orígenes distintos para completar un lote.For the BatchBlock<T> class, greedy and non-greedy behavior is similar, except that under non-greedy mode, a BatchBlock<T> object postpones all incoming messages until enough are available from distinct sources to complete a batch.

Para especificar el modo no expansivo para un bloque de flujo de datos, establezca Greedy en False.To specify non-greedy mode for a dataflow block, set Greedy to False. Para obtener un ejemplo en donde se muestra cómo usar el modo no expansivo para permitir que varios bloques de combinación compartan un origen de datos con mayor eficacia, vea Procedimiento: Uso de JoinBlock para leer datos de varios orígenes.For an example that demonstrates how to use non-greedy mode to enable multiple join blocks to share a data source more efficiently, see How to: Use JoinBlock to Read Data From Multiple Sources.

[ir al principio][go to top]

Bloques de flujo de datos personalizadosCustom Dataflow Blocks

Aunque la biblioteca de flujos de datos TPL proporciona muchos tipos de bloques predefinidos, puede crear tipos de bloques adicionales que tengan un comportamiento personalizado.Although the TPL Dataflow Library provides many predefined block types, you can create additional block types that perform custom behavior. Implemente las interfaces ISourceBlock<TOutput> o ITargetBlock<TInput> directamente, o utilice el método Encapsulate para compilar un bloque complejo que encapsule el comportamiento de los tipos de bloques existentes.Implement the ISourceBlock<TOutput> or ITargetBlock<TInput> interfaces directly or use the Encapsulate method to build a complex block that encapsulates the behavior of existing block types. Para obtener ejemplos que muestran cómo implementar funcionalidad en bloques de flujo de datos personalizados, vea Tutorial: Creación de tipos de bloques de flujo de datos personalizados.For examples that show how to implement custom dataflow block functionality, see Walkthrough: Creating a Custom Dataflow Block Type.

[ir al principio][go to top]

TitleTitle DescripciónDescription
Cómo: Escritura y lectura de mensajes en un bloque de flujo de datosHow to: Write Messages to and Read Messages from a Dataflow Block Muestra cómo escribir y leer los mensajes de un objeto BufferBlock<T>.Demonstrates how to write messages to and read messages from a BufferBlock<T> object.
Cómo: Implementación de un modelo de flujo de datos productor-consumidorHow to: Implement a Producer-Consumer Dataflow Pattern Describe cómo utilizar el modelo de flujo de datos para implementar un patrón consumidor-productor, cuando el productor envía mensajes a un bloque de flujo de datos y el consumidor lee mensajes de ese bloque.Describes how to use the dataflow model to implement a producer-consumer pattern, where the producer sends messages to a dataflow block, and the consumer reads messages from that block.
Cómo: Ejecución de una acción cuando un bloque de flujo de datos recibe datosHow to: Perform Action When a Dataflow Block Receives Data Describe cómo proporcionar delegados a los tipos de bloques de flujo de datos de ejecución, ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>.Describes how to provide delegates to the execution dataflow block types, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput>.
Tutorial: Creación de una canalización de flujos de datosWalkthrough: Creating a Dataflow Pipeline Describe cómo crear una canalización de flujo de datos que descarga texto desde Internet y realiza operaciones en ese texto.Describes how to create a dataflow pipeline that downloads text from the web and performs operations on that text.
Cómo: Desvinculación de bloques de flujo de datosHow to: Unlink Dataflow Blocks Muestra cómo utilizar el método LinkTo para desvincular un bloque de destino de su origen después de que el origen proporciona un mensaje al destino.Demonstrates how to use the LinkTo method to unlink a target block from its source after the source offers a message to the target.
Tutorial: Uso de flujos de datos en aplicaciones de Windows FormsWalkthrough: Using Dataflow in a Windows Forms Application Muestra cómo crear una red de bloques de flujo de datos que realizan procesamiento de imágenes en una aplicación de Windows Forms.Demonstrates how to create a network of dataflow blocks that perform image processing in a Windows Forms application.
Cómo: Cancelación de un bloque de flujo de datosHow to: Cancel a Dataflow Block Muestra cómo se usa la cancelación en una aplicación de Windows Forms.Demonstrates how to use cancellation in a Windows Forms application.
Cómo: Uso de JoinBlock para leer datos de diferentes orígenesHow to: Use JoinBlock to Read Data From Multiple Sources Explica cómo utilizar la clase JoinBlock<T1,T2> para realizar una operación cuando los datos están disponibles a partir de varios orígenes, y cómo utilizar el modo no expansivo para permitir que varios bloques de combinación puedan compartir un origen de datos más eficazmente.Explains how to use the JoinBlock<T1,T2> class to perform an operation when data is available from multiple sources, and how to use non-greedy mode to enable multiple join blocks to share a data source more efficiently.
Cómo: Especificación del grado de paralelismo en un bloque de flujo de datosHow to: Specify the Degree of Parallelism in a Dataflow Block Describe cómo establecer la propiedad MaxDegreeOfParallelism para permitir que un bloque de flujo de datos de ejecución pueda procesar varios mensajes al mismo tiempo.Describes how to set the MaxDegreeOfParallelism property to enable an execution dataflow block to process more than one message at a time.
Cómo: Especificación de un Programador de tareas en un bloque de flujo de datosHow to: Specify a Task Scheduler in a Dataflow Block Muestra cómo asociar un programador de tareas específico cuando se usa flujo de datos en la aplicación.Demonstrates how to associate a specific task scheduler when you use dataflow in your application.
Tutorial: Uso de BatchBlock y BatchedJoinBlock para mejorar la eficaciaWalkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency Describe cómo utilizar la clase BatchBlock<T> para mejorar la eficacia de las operaciones de inserción de la base de datos y cómo utilizar la clase BatchedJoinBlock<T1,T2> para capturar los resultados y cualquier excepción que se produzca mientras el programa lee de una base de datos.Describes how to use the BatchBlock<T> class to improve the efficiency of database insert operations, and how to use the BatchedJoinBlock<T1,T2> class to capture both the results and any exceptions that occur while the program reads from a database.
Tutorial: Creación de tipos de bloques de flujo de datos personalizadosWalkthrough: Creating a Custom Dataflow Block Type Muestra dos maneras de crear un tipo de bloque de flujo de datos que implementa un comportamiento personalizado.Demonstrates two ways to create a dataflow block type that implements custom behavior.
Biblioteca TPLTask Parallel Library (TPL) Presenta la biblioteca TPL, una biblioteca que simplifica la programación paralela y simultánea en aplicaciones de .NET Framework.Introduces the TPL, a library that simplifies parallel and concurrent programming in .NET Framework applications.