Поток данных (библиотека параллельных задач)Dataflow (Task Parallel Library)

Библиотека параллельных задач (TPL) предоставляет компоненты потока данных, что позволяет повысить надежность приложений с включенным параллелизмом.The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications. Эти компоненты потока данных вместе называются Библиотекой потоков данных TPL.These dataflow components are collectively referred to as the TPL Dataflow Library. Эта модель потоков данных поддерживает программирование на основе субъектов путем обеспечения внутрипроцессной передачи сообщений для недетализированного потока данных и задач по конвейеризации.This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks. Компоненты потоков данных строятся на типах и инфраструктуре планирования TPL и интегрированы с языковой поддержкой асинхронного программирования на C#, Visual Basic и F#.The dataflow components build on the types and scheduling infrastructure of the TPL and integrate with the C#, Visual Basic, and F# language support for asynchronous programming. Эти компоненты потоков данных полезны при наличии нескольких операций, которые должны асинхронно взаимодействовать друг с другом, или при необходимости обрабатывать данные по мере того, как они становятся доступными.These dataflow components are useful when you have multiple operations that must communicate with one another asynchronously or when you want to process data as it becomes available. Например, рассмотрим приложение, которое обрабатывает данные, поступающие с веб-камеры.For example, consider an application that processes image data from a web camera. С помощью модели потока данных приложение может обрабатывать кадры, как только они становятся доступными.By using the dataflow model, the application can process image frames as they become available. Если приложение повышает качество изображений на кадрах, например, выполняя коррекцию освещенности или удаление "красных глаз", можно создать конвейер компонентов потока данных.If the application enhances image frames, for example, by performing light correction or red-eye reduction, you can create a pipeline of dataflow components. Каждый этап конвейера может использовать функциональность с более грубым параллелизмом, например функцию, предоставляемую библиотекой TPL, для преобразования изображения.Each stage of the pipeline might use more coarse-grained parallelism functionality, such as the functionality that is provided by the TPL, to transform the image.

В этом документе содержатся общие сведения о библиотеке потоков данных TPL.This document provides an overview of the TPL Dataflow Library. Здесь описывается модель программирования, предопределенные типы блоков потоков данных и способы настройки блоков потоков данных для соответствия требованиям вашего приложения.It describes the programming model, the predefined dataflow block types, and how to configure dataflow blocks to meet the specific requirements of your applications.

Примечание

Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не поставляется с .NET.The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Проект и выполните поиск пакета System.Threading.Tasks.Dataflow в Интернете.To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. Кроме того, можно установить его с помощью .Net Core CLI, запустив dotnet add package System.Threading.Tasks.Dataflow.Alternatively, to install it using the .Net Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

Этот документ содержит следующие разделы.This document contains the following sections:

Модель программированияProgramming Model

Библиотека потоков данных TPL обеспечивает основу для передачи сообщений и параллелизации приложений, создающих большую нагрузку на ЦПУ и ввод-вывод, которые имеют высокую пропускную способность и низкую задержку.The TPL Dataflow Library provides a foundation for message passing and parallelizing CPU-intensive and I/O-intensive applications that have high throughput and low latency. Она также предоставляет явный контроль над тем, как данные буферизуются и перемещаются по системе.It also gives you explicit control over how data is buffered and moves around the system. Чтобы лучше понять модель программирования потоков данных, рассмотрим приложение, асинхронно загружающее изображения с диска и создающее из них композитное изображение.To better understand the dataflow programming model, consider an application that asynchronously loads images from disk and creates a composite of those images. Традиционные модели программирования обычно требуют использования обратных вызовов и объектов синхронизации, например блокировок, для координации задач и доступа к общим данным.Traditional programming models typically require that you use callbacks and synchronization objects, such as locks, to coordinate tasks and access to shared data. С помощью модели программирования потоков данных можно создавать объекты потоков данных, которые обрабатывают изображения по мере считывания их с диска.By using the dataflow programming model, you can create dataflow objects that process images as they are read from disk. В модели потоков данных объявляется, как обрабатываются данные, когда они становятся доступными, а также объявляются любые зависимости между данными.Under the dataflow model, you declare how data is handled when it becomes available, and also any dependencies between data. Поскольку среда выполнения управляет зависимостями между данными, часто можно избежать необходимости синхронизировать доступ к общим данным.Because the runtime manages dependencies between data, you can often avoid the requirement to synchronize access to shared data. Кроме того, поскольку планирование в среде выполнения основано на асинхронном прибытии данных, поток данных может увеличить пропускную способность и ускорить время ответа, эффективно управляя лежащими в основе потоками.In addition, because the runtime schedules work based on the asynchronous arrival of data, dataflow can improve responsiveness and throughput by efficiently managing the underlying threads. Пример, использующий модель программирования потоков данных для реализации обработки изображений в приложении Windows Forms, см. в статье Пошаговое руководство. Использование потока данных в приложении Windows FormsFor an example that uses the dataflow programming model to implement image processing in a Windows Forms application, see Walkthrough: Using Dataflow in a Windows Forms Application.

Источники и целевые блокиSources and Targets

Библиотека потоков данных TPL состоит из блоков потоков данных, которые представляют собой структуры данных, буферизующие и обрабатывающие данные.The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. В TPL определено три типа блоков потоков данных: блоки источника, целевые блоки и блоки передачи.The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. Блок источника выступает в качестве источника данных, из которого можно считать данные.A source block acts as a source of data and can be read from. Целевой блок выступает в качестве получателя данных, в который можно писать.A target block acts as a receiver of data and can be written to. Блок передачи действует и как блок источника, и как целевой блок: из него можно читать и в него можно писать.A propagator block acts as both a source block and a target block, and can be read from and written to. TPL определяет интерфейс System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> для представления источников, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> для представления целевых объектов и System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> для представления передающих.The TPL defines the System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> interface to represent sources, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> to represent targets, and System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> to represent propagators. IPropagatorBlock<TInput,TOutput> наследуется и от ISourceBlock<TOutput>, и от ITargetBlock<TInput>.IPropagatorBlock<TInput,TOutput> inherits from both ISourceBlock<TOutput>, and ITargetBlock<TInput>.

Библиотека потоков данных TPL предоставляет несколько предопределенных типов блоков потоков данных, которые реализуют интерфейсы ISourceBlock<TOutput>, ITargetBlock<TInput> и IPropagatorBlock<TInput,TOutput>.The TPL Dataflow Library provides several predefined dataflow block types that implement the ISourceBlock<TOutput>, ITargetBlock<TInput>, and IPropagatorBlock<TInput,TOutput> interfaces. Эти типы блоков потоков данных описаны в настоящем документе в разделе Предопределенные типы блоков потоков данных.These dataflow block types are described in this document in the section Predefined Dataflow Block Types.

Соединение блоковConnecting Blocks

Можно также соединять блоки потоков данных для создания конвейеров, которые являются линейными последовательностями блоков потоков данных, или сетей, являющихся графами блоков потоков данных.You can connect dataflow blocks to form pipelines, which are linear sequences of dataflow blocks, or networks, which are graphs of dataflow blocks. Конвейер является одним из видов сетей.A pipeline is one form of network. Конвейеры или сети асинхронно распространяют исходные данные целевым объектам, когда данные становятся доступны.In a pipeline or network, sources asynchronously propagate data to targets as that data becomes available. Метод ISourceBlock<TOutput>.LinkTo связывает блок потока данных источника и целевой блок.The ISourceBlock<TOutput>.LinkTo method links a source dataflow block to a target block. Источник может быть связан с несколькими целевыми объектами или не связан ни с одним; целевые объекты могут иметь связь с несколькими источниками или не иметь связей.A source can be linked to zero or more targets; targets can be linked from zero or more sources. Можно добавлять или удалять блоки потока данных из конвейера или сети одновременно.You can add or remove dataflow blocks to or from a pipeline or network concurrently. Предопределенные типы блоков потоков данных отвечают за все аспекты потокобезопасности установки и удаления связей.The predefined dataflow block types handle all thread-safety aspects of linking and unlinking.

Пример, в котором блоки потоков данных соединяются в простой конвейер, см. в статье Пошаговое руководство. Создание конвейера потока данных.For an example that connects dataflow blocks to form a basic pipeline, see Walkthrough: Creating a Dataflow Pipeline. Пример, в котором блоки потоков данных формируют более сложную сеть, см. в статье Пошаговое руководство. Использование потока данных в приложении Windows FormsFor an example that connects dataflow blocks to form a more complex network, see Walkthrough: Using Dataflow in a Windows Forms Application. Пример, в котором происходит удаление связи между источником и целевым объектом после того, как источник отправляет целевому объекту сообщение, см. в статье Руководство. Удаление связей с блоками потоков данных.For an example that unlinks a target from a source after the source offers the target a message, see How to: Unlink Dataflow Blocks.

ФильтрацияFiltering

При вызове метода ISourceBlock<TOutput>.LinkTo для связывания источника с целевым объектом, можно указать делегат, который определяет, принимает или отклоняет целевой блок сообщение в зависимости от содержания этого сообщения.When you call the ISourceBlock<TOutput>.LinkTo method to link a source to a target, you can supply a delegate that determines whether the target block accepts or rejects a message based on the value of that message. Механизм фильтрации позволяет гарантировать, что блок потока данных будет получать только определенные значения.This filtering mechanism is a useful way to guarantee that a dataflow block receives only certain values. Для большинства стандартных типов блока потока данных, если блок источника подключен к нескольким целевым блокам, когда один из целевых блоков отвергает сообщение, это сообщение отправляется следующему целевому объекту.For most of the predefined dataflow block types, if a source block is connected to multiple target blocks, when a target block rejects a message, the source offers that message to the next target. Порядок, в котором источник отправляет сообщения целевым объектам, определяется источником и может различаться в зависимости от типа источника.The order in which a source offers messages to targets is defined by the source and can vary according to the type of the source. Большинство типов блоков источников перестают отправлять сообщение после того, как один из целевых объектов его принимает.Most source block types stop offering a message after one target accepts that message. Единственным исключением из этого правила является класс BroadcastBlock<T>, который предлагает каждое сообщение всем целевым объектам, даже если некоторые из целевых объектов отклоняют это сообщение.One exception to this rule is the BroadcastBlock<T> class, which offers each message to all targets, even if some targets reject the message. Пример, в котором при помощи фильтрации обрабатываются только определенные сообщения, см. в статье Пошаговое руководство. Использование потока данных в приложении Windows FormsFor an example that uses filtering to process only certain messages, see Walkthrough: Using Dataflow in a Windows Forms Application.

Важно!

Поскольку все стандартные типы блоков потоков данных источника гарантируют, что сообщения передаются в том порядке, в каком они поступают, каждое сообщение должно быть считано из блока источника, чтобы блок источника мог обработать следующее сообщение.Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, every message must be read from the source block before the source block can process the next message. Поэтому при использовании фильтрации для подключения нескольких целевых объектов к источнику, убедитесь, что хотя бы один целевой блок получит все сообщения.Therefore, when you use filtering to connect multiple targets to a source, make sure that at least one target block receives each message. В противном случае в приложении может возникнуть взаимоблокировка.Otherwise, your application might deadlock.

Передача сообщенийMessage Passing

Модель программирования на основе потоков данных связана с понятием передача сообщений, так как в этой модели независимые компоненты программы взаимодействуют друг с другом посредством отправки сообщений.The dataflow programming model is related to the concept of message passing, where independent components of a program communicate with one another by sending messages. Один из способов передавать сообщения между компонентами приложения — вызвать методы Post и DataflowBlock.SendAsync для отправки сообщений целевым блокам потоков данных (Post работает синхронно, а SendAsync — асинхронно) и методы Receive, ReceiveAsync и TryReceive для получения сообщений от блоков источника.One way to propagate messages among application components is to call the Post and DataflowBlock.SendAsync methods to send messages to target dataflow blocks post (Post acts synchronously; SendAsync acts asynchronously) and the Receive, ReceiveAsync, and TryReceive methods to receive messages from source blocks. Можно объединять эти методы с конвейерами потоков данных или сетями, отправляя входные данные в ведущий узел (целевой блок) и принимая выходные данные из терминального узла конвейера или терминальных узлов сети (один или несколько блоков источника).You can combine these methods with dataflow pipelines or networks by sending input data to the head node (a target block), and receiving output data from the terminal node of the pipeline or the terminal nodes of the network (one or more source blocks). Можно также использовать метод Choose для чтения данных из первого из имеющихся источников, где доступны данные, и выполнения действий с этими данными.You can also use the Choose method to read from the first of the provided sources that has data available and perform action on that data.

Блоки источника предлагают данные целевым блокам, вызывая метод ITargetBlock<TInput>.OfferMessage.Source blocks offer data to target blocks by calling the ITargetBlock<TInput>.OfferMessage method. Целевой блок отвечает на предложенное сообщение одним из трех способов: он может принять сообщение, отклонить сообщение или отложить сообщение.The target block responds to an offered message in one of three ways: it can accept the message, decline the message, or postpone the message. Если целевой объект принимает сообщение, метод OfferMessage возвращает Accepted.When the target accepts the message, the OfferMessage method returns Accepted. Если целевой объект отклоняет сообщение, метод OfferMessage возвращает Declined.When the target declines the message, the OfferMessage method returns Declined. Если целевой объект сообщает, что больше не будет получать сообщений от этого источника, OfferMessage возвращает DecliningPermanently.When the target requires that it no longer receives any messages from the source, OfferMessage returns DecliningPermanently. Стандартные типы блоков источника не предлагают связанным целевым объектам сообщения после получения такого значения, и они автоматически удаляют связи с этим целевым объектом.The predefined source block types do not offer messages to linked targets after such a return value is received, and they automatically unlink from such targets.

Когда целевой блок откладывает сообщение для последующего использования, метод OfferMessage возвращает Postponed.When a target block postpones the message for later use, the OfferMessage method returns Postponed. Целевой блок, откладывающий сообщение, может позднее вызвать метод ISourceBlock<TOutput>.ReserveMessage, чтобы попытаться зарезервировать предложенное сообщение.A target block that postpones a message can later call the ISourceBlock<TOutput>.ReserveMessage method to try to reserve the offered message. На этом этапе сообщение либо по-прежнему доступно и может быть использовано целевым блоком, либо было принято другим целевым объектом.At this point, the message is either still available and can be used by the target block, or the message has been taken by another target. Если целевой блок пытается получить сообщение позже, он вызывает метод ISourceBlock<TOutput>.ConsumeMessage, а когда сообщение больше не нужно — метод ReleaseReservation.When the target block later requires the message or no longer needs the message, it calls the ISourceBlock<TOutput>.ConsumeMessage or ReleaseReservation method, respectively. Резервирование сообщений обычно используется типами блоков потоков данных, которые работают в нежадном режиме.Message reservation is typically used by the dataflow block types that operate in non-greedy mode. Нежадный режим описан далее в этом документе.Non-greedy mode is explained later in this document. Вместо резервирования отложенного сообщения целевой блок может также использовать метод ISourceBlock<TOutput>.ConsumeMessage, чтобы попытаться напрямую использовать отложенное сообщение.Instead of reserving a postponed message, a target block can also use the ISourceBlock<TOutput>.ConsumeMessage method to attempt to directly consume the postponed message.

Завершение блока потока данныхDataflow Block Completion

Блоки потоков данных также поддерживают понятие завершения.Dataflow blocks also support the concept of completion. Блок потока данных, находящийся в состоянии завершения, не будет далее выполнять никакой работы.A dataflow block that is in the completed state does not perform any further work. С каждым блоком потока данных связан объект System.Threading.Tasks.Task, который называется задачей завершения и представляет состояние завершенности блока.Each dataflow block has an associated System.Threading.Tasks.Task object, known as a completion task, that represents the completion status of the block. Поскольку можно дождаться завершения для объекта Task, с помощью задачи завершения, можно дождаться завершения одного или нескольких терминальных узлов сети потоков данных.Because you can wait for a Task object to finish, by using completion tasks, you can wait for one or more terminal nodes of a dataflow network to finish. Интерфейс IDataflowBlock определяет метод Complete, который уведомляет блок потока данных о запросе завершения, и свойство Completion, возвращающее задачу завершения для блока потока данных.The IDataflowBlock interface defines the Complete method, which informs the dataflow block of a request for it to complete, and the Completion property, which returns the completion task for the dataflow block. Интерфейсы ISourceBlock<TOutput> и ITargetBlock<TInput> наследуются от IDataflowBlock.Both ISourceBlock<TOutput> and ITargetBlock<TInput> inherit the IDataflowBlock interface.

Существует два способа определить, завершился ли блок потока данных без ошибки, с одной или несколькими ошибками или был отменен.There are two ways to determine whether a dataflow block completed without error, encountered one or more errors, or was canceled. Первый способ — вызвать метод Task.Wait для задачи завершения в блоке try-catch (Try-Catch в Visual Basic).The first way is to call the Task.Wait method on the completion task in a try-catch block (Try-Catch in Visual Basic). В следующем примере создается объект ActionBlock<TInput>, создающий исключение ArgumentOutOfRangeException, если его входное значение меньше нуля.The following example creates an ActionBlock<TInput> object that throws ArgumentOutOfRangeException if its input value is less than zero. AggregateException возникает, когда в данном примере вызывается Wait для задачи завершения.AggregateException is thrown when this example calls Wait on the completion task. Доступ к объекту ArgumentOutOfRangeException обеспечивается с помощью свойства InnerExceptions объекта AggregateException.The ArgumentOutOfRangeException is accessed through the InnerExceptions property of the AggregateException object.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}", 
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
         ' Create an ActionBlock<int> object that prints its input
         ' and throws ArgumentOutOfRangeException if the input
         ' is less than zero.
         Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
             Console.WriteLine("n = {0}", n)
             If n < 0 Then
                 Throw New ArgumentOutOfRangeException()
             End If
         End Sub)

         ' Post values to the block.
         throwIfNegative.Post(0)
         throwIfNegative.Post(-1)
         throwIfNegative.Post(1)
         throwIfNegative.Post(-2)
         throwIfNegative.Complete()

         ' Wait for completion in a try/catch block.
         Try
            throwIfNegative.Completion.Wait()
         Catch ae As AggregateException
            ' If an unhandled exception occurs during dataflow processing, all
            ' exceptions are propagated through an AggregateException object.
            ae.Handle(Function(e)
                Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                Return True
            End Function)
         End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

В этом примере показан случай, в котором необработанное исключение передается в делегат блока выполнения потока данных.This example demonstrates the case in which an exception goes unhandled in the delegate of an execution dataflow block. Рекомендуется обрабатывать исключения в телах таких блоков.We recommend that you handle exceptions in the bodies of such blocks. Однако, если возможности сделать это нет, блок ведет себя, как если бы он был отменен, и не обрабатывает входящие сообщения.However, if you are unable to do so, the block behaves as though it was canceled and does not process incoming messages.

Если блок потока данных отменяется явно, объект AggregateException содержит OperationCanceledException в свойстве InnerExceptions.When a dataflow block is canceled explicitly, the AggregateException object contains OperationCanceledException in the InnerExceptions property. Дополнительные сведения об отмене потока данных см. в разделе Выполнение отмены.For more information about dataflow cancellation, see Enabling Cancellation section.

Второй способ определить состояние завершения блока потока данных — использовать продолжение задачи завершения или использовать асинхронные функции языка C# и Visual Basic, чтобы асинхронно ожидать завершения задачи.The second way to determine the completion status of a dataflow block is to use a continuation of the completion task, or to use the asynchronous language features of C# and Visual Basic to asynchronously wait for the completion task. Делегат, который предоставляется методу Task.ContinueWith, принимает объект Task, представляющий предшествующую задачу.The delegate that you provide to the Task.ContinueWith method takes a Task object that represents the antecedent task. В случае со свойством Completion делегат для продолжения принимает саму задачу завершения.In the case of the Completion property, the delegate for the continuation takes the completion task itself. Следующий пример похож на предыдущий, но в нем используется метод ContinueWith для создания задачи завершения, которая выводит на печать общее состояние операции потока данных.The following example resembles the previous one, except that it also uses the ContinueWith method to create a continuation task that prints the status of the overall dataflow operation.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall 
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.", 
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
         ' Create an ActionBlock<int> object that prints its input
         ' and throws ArgumentOutOfRangeException if the input
         ' is less than zero.
         Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
             Console.WriteLine("n = {0}", n)
             If n < 0 Then
                 Throw New ArgumentOutOfRangeException()
             End If
         End Sub)

         ' Create a continuation task that prints the overall 
         ' task status to the console when the block finishes.
         throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

         ' Post values to the block.
         throwIfNegative.Post(0)
         throwIfNegative.Post(-1)
         throwIfNegative.Post(1)
         throwIfNegative.Post(-2)
         throwIfNegative.Complete()

         ' Wait for completion in a try/catch block.
         Try
            throwIfNegative.Completion.Wait()
         Catch ae As AggregateException
            ' If an unhandled exception occurs during dataflow processing, all
            ' exceptions are propagated through an AggregateException object.
            ae.Handle(Function(e)
                Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                Return True
            End Function)
         End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Можно также использовать такие свойства, как IsCanceled, в теле задачи продолжения, чтобы определить дополнительные сведения о состоянии выполнения блока потока данных.You can also use properties such as IsCanceled in the body of the continuation task to determine additional information about the completion status of a dataflow block. Дополнительные сведения о задачах продолжения и о том, как они связаны с отменой и обработкой ошибок, см. в разделах Создание цепочки задач с помощью задач продолжения, Отмена задач и Обработка исключений.For more information about continuation tasks and how they relate to cancellation and error handling, see Chaining Tasks by Using Continuation Tasks, Task Cancellation, and Exception Handling.

[в начало][go to top]

Предопределенные типы блоков потоков данныхPredefined Dataflow Block Types

Библиотека потоков данных TPL предоставляет несколько предопределенных типов блоков потоков данных.The TPL Dataflow Library provides several predefined dataflow block types. Эти типы делятся на три категории: блоки буферизации, блоки выполнения и блоки группировки.These types are divided into three categories: buffering blocks, execution blocks, and grouping blocks. В следующих подразделах описаны типы блоков, составляющие эти категории.The following sections describe the block types that make up these categories.

Блоки буферизацииBuffering Blocks

Блоки буферизации хранят данные для их использования объектами-потребителями данных.Buffering blocks hold data for use by data consumers. Библиотека потоков данных TPL предоставляет три типа блоков буферизации: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T> и System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.The TPL Dataflow Library provides three buffering block types: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>, and System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock(T)BufferBlock(T)

Класс BufferBlock<T> представляет структуру общего назначения для асинхронного обмена сообщениями.The BufferBlock<T> class represents a general-purpose asynchronous messaging structure. В этом классе хранится очередь сообщений типа «первым вошел — первым вышел» (FIFO), в которую могут записывать данные несколько источников и из которой могут читать данные несколько целевых объектов.This class stores a first in, first out (FIFO) queue of messages that can be written to by multiple sources or read from by multiple targets. Если целевой объект получает сообщение от объекта BufferBlock<T>, это сообщение удаляется из очереди сообщений.When a target receives a message from a BufferBlock<T> object, that message is removed from the message queue. Поэтому, хотя объект BufferBlock<T> может иметь несколько целевых объектов, каждое сообщение может быть получено только одним из них.Therefore, although a BufferBlock<T> object can have multiple targets, only one target will receive each message. Класс BufferBlock<T> удобен, если нужно передать несколько сообщений другому компоненту и этот компонент должен принять каждое сообщение.The BufferBlock<T> class is useful when you want to pass multiple messages to another component, and that component must receive each message.

Следующий пример отправляет несколько значений Int32 объекту BufferBlock<T>, а затем эти значения считываются из этого объекта.The following basic example posts several Int32 values to a BufferBlock<T> object and then reads those values back from that object.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
         ' Create a BufferBlock<int> object.
         Dim bufferBlock = New BufferBlock(Of Integer)()

         ' Post several messages to the block.
         For i As Integer = 0 To 2
            bufferBlock.Post(i)
         Next i

         ' Receive the messages back from the block.
         For i As Integer = 0 To 2
            Console.WriteLine(bufferBlock.Receive())
         Next i

'          Output:
'            0
'            1
'            2
'          

Полный пример, в котором демонстрируется способ записи сообщений в объект BufferBlock<T> и их чтения из него, см. в статье Руководство. Запись и чтение сообщений в блоке потока данных.For a complete example that demonstrates how to write messages to and read messages from a BufferBlock<T> object, see How to: Write Messages to and Read Messages from a Dataflow Block.

BroadcastBlock(T)BroadcastBlock(T)

Класс BroadcastBlock<T> удобен, если необходимо передать несколько сообщений другому компоненту, но этому компоненту нужно только самое последнее значение.The BroadcastBlock<T> class is useful when you must pass multiple messages to another component, but that component needs only the most recent value. Этот класс также может оказаться полезным при необходимости широковещательной передачи сообщения нескольким компонентам.This class is also useful when you want to broadcast a message to multiple components.

В следующем примере значение Double отправляется объекту BroadcastBlock<T>, а затем это значение считывается обратно из объекта несколько раз.The following basic example posts a Double value to a BroadcastBlock<T> object and then reads that value back from that object several times. Поскольку значения не удаляются из объектов BroadcastBlock<T> после их прочтения, одно и то же значение доступно каждый раз.Because values are not removed from BroadcastBlock<T> objects after they are read, the same value is available every time.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
         ' Create a BroadcastBlock<double> object.
         Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

         ' Post a message to the block.
         broadcastBlock.Post(Math.PI)

         ' Receive the messages back from the block several times.
         For i As Integer = 0 To 2
            Console.WriteLine(broadcastBlock.Receive())
         Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Полный пример, демонстрирующий способ использования BroadcastBlock<T> для передачи сообщения нескольким целевым блокам, см. в статье Руководство. Указание планировщика задач в блоке потока данных.For a complete example that demonstrates how to use BroadcastBlock<T> to broadcast a message to multiple target blocks, see How to: Specify a Task Scheduler in a Dataflow Block.

WriteOnceBlock(T)WriteOnceBlock(T)

Класс WriteOnceBlock<T> похож на класс BroadcastBlock<T>, но объект WriteOnceBlock<T> допускает только однократную запись.The WriteOnceBlock<T> class resembles the BroadcastBlock<T> class, except that a WriteOnceBlock<T> object can be written to one time only. Можно рассматривать WriteOnceBlock<T> как аналог ключевого слова readonly в C# (ReadOnly в Visual Basic), но объект WriteOnceBlock<T> становится неизменяемым после того, как он получает значение, а не в момент создания.You can think of WriteOnceBlock<T> as being similar to the C# readonly (ReadOnly in Visual Basic) keyword, except that a WriteOnceBlock<T> object becomes immutable after it receives a value instead of at construction. Как и в случае с классом BroadcastBlock<T>, когда целевой объект получает сообщение от объекта WriteOnceBlock<T>, это сообщение не удаляется.Like the BroadcastBlock<T> class, when a target receives a message from a WriteOnceBlock<T> object, that message is not removed from that object. Поэтому копию сообщения могут получить несколько целевых объектов.Therefore, multiple targets receive a copy of the message. Класс WriteOnceBlock<T> полезен, если требуется передать только первое из нескольких сообщений.The WriteOnceBlock<T> class is useful when you want to propagate only the first of multiple messages.

В следующем примере несколько значений String отправляется объекту WriteOnceBlock<T>, а затем это значение считывается обратно из этого объекта.The following basic example posts multiple String values to a WriteOnceBlock<T> object and then reads the value back from that object. Поскольку в объект WriteOnceBlock<T> можно сделать запись только один раз, после того, как объект WriteOnceBlock<T> получает сообщение, он отклоняет последующие сообщения.Because a WriteOnceBlock<T> object can be written to one time only, after a WriteOnceBlock<T> object receives a message, it discards subsequent messages.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first 
// message to be received is written to the block. 
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
         ' Create a WriteOnceBlock<string> object.
         Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

         ' Post several messages to the block in parallel. The first 
         ' message to be received is written to the block. 
         ' Subsequent messages are discarded.
         Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

         ' Receive the message from the block.
         Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Полный пример, демонстрирующий способ использования WriteOnceBlock<T> для получения значения первой завершенной операции, см. в статье Руководство. Удаление связей с блоками потоков данных.For a complete example that demonstrates how to use WriteOnceBlock<T> to receive the value of the first operation that finishes, see How to: Unlink Dataflow Blocks.

Блоки выполненияExecution Blocks

Блоки выполнения вызывают предоставленный пользователем делегат для каждого элемента полученных данных.Execution blocks call a user-provided delegate for each piece of received data. Библиотека потоков данных TPL предоставляет три типа блоков выполнения: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> и System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.The TPL Dataflow Library provides three execution block types: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>, and System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock(T)ActionBlock(T)

Класс ActionBlock<TInput> — целевой блок, который вызывает делегат при получении данных.The ActionBlock<TInput> class is a target block that calls a delegate when it receives data. Можно рассматривать объект ActionBlock<TInput> как делегат, который выполняется асинхронно, когда данные становятся доступными.Think of a ActionBlock<TInput> object as a delegate that runs asynchronously when data becomes available. Делегат, который предоставляется объекту ActionBlock<TInput>, может быть типа Action<T> или типа System.Func<TInput, Task>.The delegate that you provide to an ActionBlock<TInput> object can be of type Action<T> or type System.Func<TInput, Task>. При использовании объекта ActionBlock<TInput> с Action<T> обработка каждого входного элемента считается завершенной, когда возвращается делегат.When you use an ActionBlock<TInput> object with Action<T>, processing of each input element is considered completed when the delegate returns. При использовании объекта ActionBlock<TInput> с System.Func<TInput, Task> обработка каждого входного элемента считается завершенной, только если возвращенный объект Task завершен.When you use an ActionBlock<TInput> object with System.Func<TInput, Task>, processing of each input element is considered completed only when the returned Task object is completed. С помощью двух этих механизмов можно использовать ActionBlock<TInput> как для синхронной, так и для асинхронной обработки каждого входного элемента.By using these two mechanisms, you can use ActionBlock<TInput> for both synchronous and asynchronous processing of each input element.

В следующем примере несколько значений Int32 отправляется объекту ActionBlock<TInput>.The following basic example posts multiple Int32 values to an ActionBlock<TInput> object. Объект ActionBlock<TInput> выводит эти значения на консоль.The ActionBlock<TInput> object prints those values to the console. Затем в этом примере производится установка блока в завершенное состояние, и происходит ожидание завершения всех задач потока данных.This example then sets the block to the completed state and waits for all dataflow tasks to finish.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all 
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
         ' Create an ActionBlock<int> object that prints values
         ' to the console.
         Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

         ' Post several messages to the block.
         For i As Integer = 0 To 2
            actionBlock.Post(i * 10)
         Next i

         ' Set the block to the completed state and wait for all 
         ' tasks to finish.
         actionBlock.Complete()
         actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Полные примеры, демонстрирующие способы использования делегатов с классом ActionBlock<TInput>, см. в статье Руководство. Выполнение действий при получении данных блоком потоков данных.For complete examples that demonstrate how to use delegates with the ActionBlock<TInput> class, see How to: Perform Action When a Dataflow Block Receives Data.

TransformBlock(TInput, TOutput)TransformBlock(TInput, TOutput)

Класс TransformBlock<TInput,TOutput> похож на класс ActionBlock<TInput> за исключением того, что он работает и как источник, и как целевой объект.The TransformBlock<TInput,TOutput> class resembles the ActionBlock<TInput> class, except that it acts as both a source and as a target. Делегат, который передается объекту TransformBlock<TInput,TOutput>, возвращает значение типа TOutput.The delegate that you pass to a TransformBlock<TInput,TOutput> object returns a value of type TOutput. Делегат, который предоставляется объекту TransformBlock<TInput,TOutput>, может относиться к типу System.Func<TInput, TOutput> или типу System.Func<TInput, Task<TOutput>>.The delegate that you provide to a TransformBlock<TInput,TOutput> object can be of type System.Func<TInput, TOutput> or type System.Func<TInput, Task<TOutput>>. При использовании объекта TransformBlock<TInput,TOutput> с System.Func<TInput, TOutput> обработка каждого входного элемента считается завершенной, когда делегат возвращается.When you use a TransformBlock<TInput,TOutput> object with System.Func<TInput, TOutput>, processing of each input element is considered completed when the delegate returns. При использовании объекта TransformBlock<TInput,TOutput> с System.Func<TInput, Task<TOutput>> обработка каждого входного элемента считается завершенной, только если возвращаемый объект Task<TResult> завершен.When you use a TransformBlock<TInput,TOutput> object used with System.Func<TInput, Task<TOutput>>, processing of each input element is considered completed only when the returned Task<TResult> object is completed. Как и в случае с ActionBlock<TInput>, с помощью двух этих механизмов можно использовать TransformBlock<TInput,TOutput> как для синхронной, так и для асинхронной обработки каждого входного элемента.As with ActionBlock<TInput>, by using these two mechanisms, you can use TransformBlock<TInput,TOutput> for both synchronous and asynchronous processing of each input element.

В следующем примере создается объект TransformBlock<TInput,TOutput>, который вычисляет квадратный корень введенного значения.The following basic example creates a TransformBlock<TInput,TOutput> object that computes the square root of its input. Объект TransformBlock<TInput,TOutput> принимает значения Int32 на входе и создает значения Double на выходе.The TransformBlock<TInput,TOutput> object takes Int32 values as input and produces Double values as output.

// Create a TransformBlock<int, double> object that 
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
         ' Create a TransformBlock<int, double> object that 
         ' computes the square root of its input.
         Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

         ' Post several messages to the block.
         transformBlock.Post(10)
         transformBlock.Post(20)
         transformBlock.Post(30)

         ' Read the output messages from the block.
         For i As Integer = 0 To 2
            Console.WriteLine(transformBlock.Receive())
         Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Полный пример использования TransformBlock<TInput,TOutput> в сети блоков потоков данных, в котором выполняется обработка изображений в приложении Windows Forms, см. в статье Пошаговое руководство. Использование потока данных в приложении Windows FormsFor complete examples that uses TransformBlock<TInput,TOutput> in a network of dataflow blocks that performs image processing in a Windows Forms application, see Walkthrough: Using Dataflow in a Windows Forms Application.

TransformManyBlock(TInput, TOutput)TransformManyBlock(TInput, TOutput)

Класс TransformManyBlock<TInput,TOutput> похож на класс TransformBlock<TInput,TOutput> за исключением того, что для каждого входного значения TransformManyBlock<TInput,TOutput> создает ноль или более выходных значений вместо только одного значения.The TransformManyBlock<TInput,TOutput> class resembles the TransformBlock<TInput,TOutput> class, except that TransformManyBlock<TInput,TOutput> produces zero or more output values for each input value, instead of only one output value for each input value. Делегат, который предоставляется объекту TransformManyBlock<TInput,TOutput>, может относиться к типу System.Func<TInput, IEnumerable<TOutput>> или типу System.Func<TInput, Task<IEnumerable<TOutput>>>.The delegate that you provide to a TransformManyBlock<TInput,TOutput> object can be of type System.Func<TInput, IEnumerable<TOutput>> or type System.Func<TInput, Task<IEnumerable<TOutput>>>. При использовании объекта TransformManyBlock<TInput,TOutput> с System.Func<TInput, IEnumerable<TOutput>> обработка каждого входного элемента считается завершенной, когда делегат возвращается.When you use a TransformManyBlock<TInput,TOutput> object with System.Func<TInput, IEnumerable<TOutput>>, processing of each input element is considered completed when the delegate returns. При использовании объекта TransformManyBlock<TInput,TOutput> с System.Func<TInput, Task<IEnumerable<TOutput>>> обработка каждого входного элемента считается завершенной, только если возвращаемый объект System.Threading.Tasks.Task<IEnumerable<TOutput>> завершен.When you use a TransformManyBlock<TInput,TOutput> object with System.Func<TInput, Task<IEnumerable<TOutput>>>, processing of each input element is considered complete only when the returned System.Threading.Tasks.Task<IEnumerable<TOutput>> object is completed.

В следующем примере создается объект TransformManyBlock<TInput,TOutput>, который разделяет строки на отдельные цепочки символов.The following basic example creates a TransformManyBlock<TInput,TOutput> object that splits strings into their individual character sequences. Объект TransformManyBlock<TInput,TOutput> принимает значения String на входе и создает значения Char на выходе.The TransformManyBlock<TInput,TOutput> object takes String values as input and produces Char values as output.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
         ' Create a TransformManyBlock<string, char> object that splits
         ' a string into its individual characters.
         Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

         ' Post two messages to the first block.
         transformManyBlock.Post("Hello")
         transformManyBlock.Post("World")

         ' Receive all output values from the block.
         For i As Integer = 0 To ("Hello" & "World").Length - 1
            Console.WriteLine(transformManyBlock.Receive())
         Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Полные примеры, в которых используется TransformManyBlock<TInput,TOutput> для создания нескольких независимых выходных значений для каждого входного элемента в конвейере потока данных, см. в статье Пошаговое руководство. Создание конвейера потока данных.For complete examples that use TransformManyBlock<TInput,TOutput> to produce multiple independent outputs for each input in a dataflow pipeline, see Walkthrough: Creating a Dataflow Pipeline.

Степень параллелизмаDegree of Parallelism

Каждый из объектов ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput> буферизует входные сообщения до тех пор, пока блок не будет готов их обработать.Every ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput> object buffers input messages until the block is ready to process them. По умолчанию эти классы обрабатывают сообщения в том порядке, в котором они поступают, по одному.By default, these classes process messages in the order in which they are received, one message at a time. Можно также указать степень параллелизма для включения объектов ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput> для одновременной обработки нескольких сообщений.You can also specify the degree of parallelism to enable ActionBlock<TInput>, TransformBlock<TInput,TOutput> and TransformManyBlock<TInput,TOutput> objects to process multiple messages concurrently. Дополнительные сведения о параллельном выполнении см. в разделе "Определение степени параллелизма" далее в этом документе.For more information about concurrent execution, see the section Specifying the Degree of Parallelism later in this document. Пример, в котором задается степень параллелизма для обеспечения обработки блоком потока данных более одного сообщения одновременно, см. в статье Пошаговое руководство. Указание степени параллелизма в блоке потока данных.For an example that sets the degree of parallelism to enable an execution dataflow block to process more than one message at a time, see How to: Specify the Degree of Parallelism in a Dataflow Block.

Сводка по типам делегатовSummary of Delegate Types

В следующей таблице перечислены типы делегатов, которые можно передать объектам ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput>.The following table summarizes the delegate types that you can provide to ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput> objects. В этой таблице также указано, работает ли делегат данного типа синхронно или асинхронно.This table also specifies whether the delegate type operates synchronously or asynchronously.

ТипType Синхронный тип делегатаSynchronous Delegate Type Асинхронный тип делегатаAsynchronous Delegate Type
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Можно также использовать лямбда-выражения при работе с типами блоков выполнения.You can also use lambda expressions when you work with execution block types. Пример, демонстрирующий способ использования лямбда-выражения с блоком выполнения, см. в статье Руководство. Выполнение действий при получении данных блоком потоков данных.For an example that shows how to use a lambda expression with an execution block, see How to: Perform Action When a Dataflow Block Receives Data.

Группирующие блокиGrouping Blocks

Группирующие блоки объединяют данные из одного или более источников с различными ограничениями.Grouping blocks combine data from one or more sources and under various constraints. Библиотека потоков данных TPL предоставляет три типа блоков объединения: BatchBlock<T>, JoinBlock<T1,T2> и BatchedJoinBlock<T1,T2>.The TPL Dataflow Library provides three join block types: BatchBlock<T>, JoinBlock<T1,T2>, and BatchedJoinBlock<T1,T2>.

BatchBlock(T)BatchBlock(T)

Класс BatchBlock<T> объединяет наборы входных данных, называемые пакетами, в массивы выходных данных.The BatchBlock<T> class combines sets of input data, which are known as batches, into arrays of output data. Укажите размер каждого пакета при создании объекта BatchBlock<T>.You specify the size of each batch when you create a BatchBlock<T> object. Когда объект BatchBlock<T> получает указанное число входных элементов, он асинхронно передает массив, содержащий эти элементы.When the BatchBlock<T> object receives the specified count of input elements, it asynchronously propagates out an array that contains those elements. Если объект BatchBlock<T> переведен в состояние завершения, но не содержит достаточно элементов для формирования пакета, он формирует и передает конечный массив, который содержит оставшиеся входные элементы.If a BatchBlock<T> object is set to the completed state but does not contain enough elements to form a batch, it propagates out a final array that contains the remaining input elements.

Класс BatchBlock<T> работает либо в жадном, либо в нежадном режиме.The BatchBlock<T> class operates in either greedy or non-greedy mode. В жадном режиме, используемом по умолчанию, объект BatchBlock<T> принимает каждое предлагаемое ему сообщение и передает массив, как только получит указанное количество элементов.In greedy mode, which is the default, a BatchBlock<T> object accepts every message that it is offered and propagates out an array after it receives the specified count of elements. В нежадном режиме объект BatchBlock<T> откладывает все входящие сообщения до тех пор, пока достаточное количество источников не предложит сообщения блоку для формирования пакета.In non-greedy mode, a BatchBlock<T> object postpones all incoming messages until enough sources have offered messages to the block to form a batch. Жадный режим обычно работает быстрее, чем нежадный режим, поскольку он требует меньше дополнительной нагрузки для обработки.Greedy mode typically performs better than non-greedy mode because it requires less processing overhead. Однако можно использовать нежадный режим, если необходимо координировать получение от нескольких источников атомарным образом.However, you can use non-greedy mode when you must coordinate consumption from multiple sources in an atomic fashion. Определите нежадный режим, установив свойству Greedy значение False в параметре dataflowBlockOptions в конструкторе BatchBlock<T>.Specify non-greedy mode by setting Greedy to False in the dataflowBlockOptions parameter in the BatchBlock<T> constructor.

Следующий пример отправляет несколько значений Int32 объекту BatchBlock<T>, содержащему десять элементов в пакете.The following basic example posts several Int32 values to a BatchBlock<T> object that holds ten elements in a batch. Для обеспечения передачи всех значений из BatchBlock<T> в этом примере вызывается метод Complete.To guarantee that all values propagate out of the BatchBlock<T>, this example calls the Complete method. Метод Complete переводит объект BatchBlock<T> в состояние завершения, и поэтому объект BatchBlock<T> передает все оставшиеся элементы в качестве последнего пакета.The Complete method sets the BatchBlock<T> object to the completed state, and therefore, the BatchBlock<T> object propagates out any remaining elements as a final batch.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
         ' Create a BatchBlock<int> object that holds ten
         ' elements per batch.
         Dim batchBlock = New BatchBlock(Of Integer)(10)

         ' Post several values to the block.
         For i As Integer = 0 To 12
            batchBlock.Post(i)
         Next i
         ' Set the block to the completed state. This causes
         ' the block to propagate out any any remaining
         ' values as a final batch.
         batchBlock.Complete()

         ' Print the sum of both batches.

         Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

         Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Полный пример использования BatchBlock<T> для увеличения эффективности операций вставки в базу данных см. в статье Пошаговое руководство. Повышение эффективности с помощью BatchBlock и BatchedJoinBlock.For a complete example that uses BatchBlock<T> to improve the efficiency of database insert operations, see Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency.

JoinBlock(T1, T2, ...)JoinBlock(T1, T2, ...)

Классы JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> собирают входные элементы и распространяют объекты System.Tuple<T1,T2> или System.Tuple<T1,T2,T3>, содержащие эти элементы.The JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> classes collect input elements and propagate out System.Tuple<T1,T2> or System.Tuple<T1,T2,T3> objects that contain those elements. Классы JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> не наследуются от ITargetBlock<TInput>.The JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> classes do not inherit from ITargetBlock<TInput>. Вместо этого они предоставляют свойства Target1, Target2 и Target3, которые реализуют ITargetBlock<TInput>.Instead, they provide properties, Target1, Target2, and Target3, that implement ITargetBlock<TInput>.

Как BatchBlock<T>, JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> работают либо в жадном, либо в нежадном режиме.Like BatchBlock<T>, JoinBlock<T1,T2> and JoinBlock<T1,T2,T3> operate in either greedy or non-greedy mode. В жадном режиме, используемом по умолчанию, объект JoinBlock<T1,T2> или JoinBlock<T1,T2,T3> принимает каждое предлагаемое ему сообщение и распространяет кортеж после того, как каждый из его целевых объектов получает по меньшей мере одно сообщение.In greedy mode, which is the default, a JoinBlock<T1,T2> or JoinBlock<T1,T2,T3> object accepts every message that it is offered and propagates out a tuple after each of its targets receives at least one message. В нежадном режиме объект JoinBlock<T1,T2> или JoinBlock<T1,T2,T3> откладывает все входящие сообщения, пока всем целевым объектам не будут предложены данные, необходимые для создания кортежа.In non-greedy mode, a JoinBlock<T1,T2> or JoinBlock<T1,T2,T3> object postpones all incoming messages until all targets have been offered the data that is required to create a tuple. В этот момент блок использует протокол двухфазной фиксации для атомарного извлечения всех необходимых элементов из источника.At this point, the block engages in a two-phase commit protocol to atomically retrieve all required items from the sources. Эта задержка дает возможность другой сущности в это же время получать эти данные, что позволяет системе в целом продолжать работу.This postponement makes it possible for another entity to consume the data in the meantime, to allow the overall system to make forward progress.

В следующем примере описан случай, в котором объект JoinBlock<T1,T2,T3> требует нескольких элементов данных для вычисления значения.The following basic example demonstrates a case in which a JoinBlock<T1,T2,T3> object requires multiple data to compute a value. В этом примере создается объект JoinBlock<T1,T2,T3>, который требует два значения Int32 и Char для выполнения арифметической операции.This example creates a JoinBlock<T1,T2,T3> object that requires two Int32 values and a Char value to perform an arithmetic operation.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
         ' Create a JoinBlock<int, int, char> object that requires
         ' two numbers and an operator.
         Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

         ' Post two values to each target of the join.

         joinBlock.Target1.Post(3)
         joinBlock.Target1.Post(6)

         joinBlock.Target2.Post(5)
         joinBlock.Target2.Post(4)

         joinBlock.Target3.Post("+"c)
         joinBlock.Target3.Post("-"c)

         ' Receive each group of values and apply the operator part
         ' to the number parts.

         For i As Integer = 0 To 1
            Dim data = joinBlock.Receive()
            Select Case data.Item3
               Case "+"c
                  Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
               Case "-"c
                  Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
               Case Else
                  Console.WriteLine("Unknown operator '{0}'.", data.Item3)
            End Select
         Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Полный пример использования объектов JoinBlock<T1,T2> в нежадном режиме для совместного использования ресурса см. в статье Руководство. Использование JoinBlock для чтения данных из нескольких источников.For a complete example that uses JoinBlock<T1,T2> objects in non-greedy mode to cooperatively share a resource, see How to: Use JoinBlock to Read Data From Multiple Sources.

BatchedJoinBlock(T1, T2, ...)BatchedJoinBlock(T1, T2, ...)

Классы BatchedJoinBlock<T1,T2> и BatchedJoinBlock<T1,T2,T3> собирают пакеты входных элементов и распространяют объекты System.Tuple(IList(T1), IList(T2)) или System.Tuple(IList(T1), IList(T2), IList(T3)), содержащие эти элементы.The BatchedJoinBlock<T1,T2> and BatchedJoinBlock<T1,T2,T3> classes collect batches of input elements and propagate out System.Tuple(IList(T1), IList(T2)) or System.Tuple(IList(T1), IList(T2), IList(T3)) objects that contain those elements. Класс BatchedJoinBlock<T1,T2> можно рассматривать как сочетание BatchBlock<T> и JoinBlock<T1,T2>.Think of BatchedJoinBlock<T1,T2> as a combination of BatchBlock<T> and JoinBlock<T1,T2>. Укажите размер каждого пакета во время создания объекта BatchedJoinBlock<T1,T2>.Specify the size of each batch when you create a BatchedJoinBlock<T1,T2> object. BatchedJoinBlock<T1,T2> также предоставляет свойства, Target1 и Target2, которые реализуют ITargetBlock<TInput>.BatchedJoinBlock<T1,T2> also provides properties, Target1 and Target2, that implement ITargetBlock<TInput>. Если указанное число входных элементов получено от всех целевых объектов, объект BatchedJoinBlock<T1,T2> асинхронно распространяет объект System.Tuple(IList(T1), IList(T2)), содержащий эти элементы.When the specified count of input elements are received from across all targets, the BatchedJoinBlock<T1,T2> object asynchronously propagates out a System.Tuple(IList(T1), IList(T2)) object that contains those elements.

В следующем примере создается объект BatchedJoinBlock<T1,T2>, содержащий результаты, значения Int32 и ошибки, представленные объектами Exception.The following basic example creates a BatchedJoinBlock<T1,T2> object that holds results, Int32 values, and errors that are Exception objects. В этом примере выполняется несколько операций, результаты записываются в свойство Target1, а ошибки — в свойство Target2 объекта BatchedJoinBlock<T1,T2>.This example performs multiple operations and writes results to the Target1 property, and errors to the Target2 property, of the BatchedJoinBlock<T1,T2> object. Поскольку число успешных и неудачных операций не известно заранее, объекты IList<T> позволяют каждому целевому объекту получать ноль и более значений.Because the count of successful and failed operations is unknown in advance, the IList<T> objects enable each target to receive zero or more values.

 // For demonstration, create a Func<int, int> that 
 // returns its argument, or throws ArgumentOutOfRangeException
 // if the argument is less than zero.
 Func<int, int> DoWork = n =>
 {
    if (n < 0)
       throw new ArgumentOutOfRangeException();
    return n;
 };

 // Create a BatchedJoinBlock<int, Exception> object that holds 
 // seven elements per batch.
 var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

 // Post several items to the block.
 foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
 {
    try
    {
       // Post the result of the worker to the 
       // first target of the block.
       batchedJoinBlock.Target1.Post(DoWork(i));
    }
    catch (ArgumentOutOfRangeException e) 
    {
       // If an error occurred, post the Exception to the 
       // second target of the block.
       batchedJoinBlock.Target2.Post(e); 
    }
 }

 // Read the results from the block.
 var results = batchedJoinBlock.Receive();

 // Print the results to the console.

 // Print the results.
 foreach (int n in results.Item1)
 {
    Console.WriteLine(n);
 }
 // Print failures.
 foreach (Exception e in results.Item2)
 {
    Console.WriteLine(e.Message);
 }

 /* Output:
    5
    6
    13
    55
    0
    Specified argument was out of the range of valid values.
    Specified argument was out of the range of valid values.
  */
         ' For demonstration, create a Func<int, int> that 
         ' returns its argument, or throws ArgumentOutOfRangeException
         ' if the argument is less than zero.
         Dim DoWork As Func(Of Integer, Integer) = Function(n)
            If n < 0 Then
               Throw New ArgumentOutOfRangeException()
            End If
            Return n
         End Function

         ' Create a BatchedJoinBlock<int, Exception> object that holds 
         ' seven elements per batch.
         Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

         ' Post several items to the block.
         For Each i As Integer In New Integer() { 5, 6, -7, -22, 13, 55, 0 }
            Try
               ' Post the result of the worker to the 
               ' first target of the block.
               batchedJoinBlock.Target1.Post(DoWork(i))
            Catch e As ArgumentOutOfRangeException
               ' If an error occurred, post the Exception to the 
               ' second target of the block.
               batchedJoinBlock.Target2.Post(e)
            End Try
         Next i

         ' Read the results from the block.
         Dim results = batchedJoinBlock.Receive()

         ' Print the results to the console.

         ' Print the results.
         For Each n As Integer In results.Item1
            Console.WriteLine(n)
         Next n
         ' Print failures.
         For Each e As Exception In results.Item2
            Console.WriteLine(e.Message)
         Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Полный пример использования BatchedJoinBlock<T1,T2> для захвата как результатов, так и всех возможных исключений, которые могут возникнуть при выполнении считывания данных из базы данных, см. в статье Пошаговое руководство. Повышение эффективности с помощью BatchBlock и BatchedJoinBlock.For a complete example that uses BatchedJoinBlock<T1,T2> to capture both the results and any exceptions that occur while the program reads from a database, see Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency.

[в начало][go to top]

Настройка поведения блоков потоков данныхConfiguring Dataflow Block Behavior

Можно включить дополнительные параметры, предоставив объект System.Threading.Tasks.Dataflow.DataflowBlockOptions конструктору типов блоков потоков данных.You can enable additional options by providing a System.Threading.Tasks.Dataflow.DataflowBlockOptions object to the constructor of dataflow block types. Эти параметры управляют таким поведением, как у планировщика, который управляет основной задачей и степенью параллелизма.These options control behavior such the scheduler that manages the underlying task and the degree of parallelism. DataflowBlockOptions также содержит производные типы, которые задают поведение, характерное для некоторых типов блоков потоков данных.The DataflowBlockOptions also has derived types that specify behavior that is specific to certain dataflow block types. В следующей таблице перечислено, какие типы параметров связаны с каждым из типов блоков потоков данных.The following table summarizes which options type is associated with each dataflow block type.

Тип блока потока данныхDataflow Block Type Тип DataflowBlockOptionsDataflowBlockOptions type
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

В следующих разделах приведены дополнительные сведения о важных типах параметров блоков потоков данных, доступных с помощью классов System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions и System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.The following sections provide additional information about the important kinds of dataflow block options that are available through the System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions, and System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions classes.

Определение планировщика заданийSpecifying the Task Scheduler

Все стандартные блоки потоков данных используют механизм планирования задач TPL для выполнения таких действий, как передача данных целевому объекту, получение данных из источника и выполнение определенных пользователем делегатов, когда данные становятся доступны.Every predefined dataflow block uses the TPL task scheduling mechanism to perform activities such as propagating data to a target, receiving data from a source, and running user-defined delegates when data becomes available. TaskScheduler — абстрактный класс, представляющий планировщик заданий, ставящий задачи в очередь потоков.TaskScheduler is an abstract class that represents a task scheduler that queues tasks onto threads. Планировщик заданий по умолчанию, Default, использует класс ThreadPool для постановки в очередь и выполнения работы.The default task scheduler, Default, uses the ThreadPool class to queue and execute work. Можно переопределить планировщик заданий по умолчанию, установив свойство TaskScheduler при создании объекта блока потока данных.You can override the default task scheduler by setting the TaskScheduler property when you construct a dataflow block object.

Если один и тот же планировщик заданий управляет несколькими блоками потока данных, он может применять к ним определенные политики.When the same task scheduler manages multiple dataflow blocks, it can enforce policies across them. Например, если каждый из нескольких блоков потока данных настроен для отдельного планировщика заданий одного объекта ConcurrentExclusiveSchedulerPair, вся работа, выполняемая в этих блоках, сериализуется.For example, if multiple dataflow blocks are each configured to target the exclusive scheduler of the same ConcurrentExclusiveSchedulerPair object, all work that runs across these blocks is serialized. Аналогично, если эти блоки настроены для параллельного планировщика заданий одного объекта ConcurrentExclusiveSchedulerPair и этот планировщик настроен так, чтобы иметь максимальный уровень параллелизма, вся работа из этих блоков ограничивается заданным числом одновременных операций.Similarly, if these blocks are configured to target the concurrent scheduler of the same ConcurrentExclusiveSchedulerPair object, and that scheduler is configured to have a maximum concurrency level, all work from these blocks is limited to that number of concurrent operations. Пример, использующий класс ConcurrentExclusiveSchedulerPair, чтобы обеспечить параллельное выполнение операций чтения, а во время операций записи исключать выполнение любых других операций, см. в статье Руководство. Указание планировщика задач в блоке потока данных.For an example that uses the ConcurrentExclusiveSchedulerPair class to enable read operations to occur in parallel, but write operations to occur exclusively of all other operations, see How to: Specify a Task Scheduler in a Dataflow Block. Дополнительные сведения о планировщиках задач в TPL см. в разделе TaskScheduler.For more information about task schedulers in the TPL, see the TaskScheduler class topic.

Определение степени параллелизмаSpecifying the Degree of Parallelism

По умолчанию библиотека потоков данных TPL предоставляет три типа блоков выполнения: ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput>, которые обрабатывают одно сообщение за раз.By default, the three execution block types that the TPL Dataflow Library provides, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput>, process one message at a time. Эти типы блоков потоков данных также обрабатывают сообщения в порядке их поступления.These dataflow block types also process messages in the order in which they are received. Чтобы обеспечить возможность обработки одновременно нескольких сообщений этими блоками потоков данных, задайте свойство ExecutionDataflowBlockOptions.MaxDegreeOfParallelism при создании объекта блока потока данных.To enable these dataflow blocks to process messages concurrently, set the ExecutionDataflowBlockOptions.MaxDegreeOfParallelism property when you construct the dataflow block object.

Значение по умолчанию у MaxDegreeOfParallelism — 1, оно гарантирует, что блок потока данных единовременно обрабатывает одно сообщение.The default value of MaxDegreeOfParallelism is 1, which guarantees that the dataflow block processes one message at a time. Присвоение этому свойству значения более 1 активирует обработку блоком потока данных нескольких сообщений одновременно.Setting this property to a value that is larger than 1 enables the dataflow block to process multiple messages concurrently. Присвоение этому свойству значения DataflowBlockOptions.Unbounded позволяет основному планировщику задач управлять максимальной степенью параллелизма.Setting this property to DataflowBlockOptions.Unbounded enables the underlying task scheduler to manage the maximum degree of concurrency.

Важно!

При максимальной степени параллелизма, превышающей 1, несколько сообщений обрабатываются одновременно, поэтому сообщения не могут обрабатываться в порядке их получения.When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore messages might not be processed in the order in which they are received. Однако порядок, в котором сообщения выводятся из блока, соответствует порядку, в котором они поступают в блок.The order in which the messages are output from the block is, however, the same one in which they are received.

Поскольку свойство MaxDegreeOfParallelism представляет максимальную степень параллелизма, блок потока данных может выполняться с меньшей степенью параллелизма, чем задано.Because the MaxDegreeOfParallelism property represents the maximum degree of parallelism, the dataflow block might execute with a lesser degree of parallelism than you specify. Блок потока данных может использовать меньшую степень параллелизма для удовлетворения функциональных требований или при нехватке доступных системных ресурсов.The dataflow block might use a lesser degree of parallelism to meet its functional requirements or because there is a lack of available system resources. Блок потока данных никогда не выбирает степень параллелизма больше заданной.A dataflow block never chooses more parallelism than you specify.

Значение свойства MaxDegreeOfParallelism уникально для каждого объекта блока потока данных.The value of the MaxDegreeOfParallelism property is exclusive to each dataflow block object. Например, если для четырех объектов блока потока данных задана 1 как максимальная степень параллелизма, все четыре объекта блока потока данных потенциально могут выполняться параллельно.For example, if four dataflow block objects each specify 1 for the maximum degree of parallelism, all four dataflow block objects can potentially run in parallel.

Пример, который задает максимальную степень параллелизма для обеспечения параллельного выполнения продолжительных операций, см. в статье Руководство. Указание степени параллелизма в блоке потока данных.For an example that sets the maximum degree of parallelism to enable lengthy operations to occur in parallel, see How to: Specify the Degree of Parallelism in a Dataflow Block.

Определение количества сообщений на задачуSpecifying the Number of Messages per Task

Предопределенные типы блоков потока данных используют задачи для обработки нескольких входных элементов.The predefined dataflow block types use tasks to process multiple input elements. Это позволяет минимизировать число объектов задачи, необходимых для обработки данных, из-за чего приложения выполняются более эффективно.This helps minimize the number of task objects that are required to process data, which enables applications to run more efficiently. Однако если задачи из одного набора блоков потока данных обрабатывают данные, задачи из других блоков потока данных быть вынуждены ожидать в течение времени, необходимого для обработки данных, добавляя сообщения в очередь.However, when the tasks from one set of dataflow blocks are processing data, the tasks from other dataflow blocks might need to wait for processing time by queuing messages. Чтобы обеспечить улучшенное распределение ресурсов между задачами потока данных, задайте значение для свойства MaxMessagesPerTask.To enable better fairness among dataflow tasks, set the MaxMessagesPerTask property. Если свойству MaxMessagesPerTask задано значение DataflowBlockOptions.Unbounded, которое используется по умолчанию, задача, используемая блоком потока данных, обрабатывает столько сообщений, сколько их доступно.When MaxMessagesPerTask is set to DataflowBlockOptions.Unbounded, which is the default, the task used by a dataflow block processes as many messages as are available. Если свойству MaxMessagesPerTask задано значение, отличное от Unbounded, блок потока данных обрабатывает не больше этого количества сообщений на объект Task.When MaxMessagesPerTask is set to a value other than Unbounded, the dataflow block processes at most this number of messages per Task object. Хотя настройка свойства MaxMessagesPerTask может улучшить распределение ресурсов между задачами, это может привести к созданию в системе количества задач, превышающего необходимое количество, что может снизить производительность.Although setting the MaxMessagesPerTask property can increase fairness among tasks, it can cause the system to create more tasks than are necessary, which can decrease performance.

Включение отменыEnabling Cancellation

TPL предоставляет механизм, который позволяет задачам координировать отмены согласованным образом.The TPL provides a mechanism that enables tasks to coordinate cancellation in a cooperative manner. Чтобы позволить блокам потока данных участвовать в этом механизме отмены, задайте значение для свойства CancellationToken.To enable dataflow blocks to participate in this cancellation mechanism, set the CancellationToken property. Если объект CancellationToken переводится в состояние отмены, все блоки потока данных, которые отслеживают выполнение этого токена, завершат выполнение текущего элемента, но не начнут обрабатывать последующие.When this CancellationToken object is set to the canceled state, all dataflow blocks that monitor this token finish execution of their current item but do not start processing subsequent items. Эти блоки потока данных также очищают все буферизованные сообщения, прекращают подключения ко всем блокам источников и целевых объектов, и переходят в состояние отмены.These dataflow blocks also clear any buffered messages, release connections to any source and target blocks, and transition to the canceled state. Путем перехода в состояние отмены у свойства Completion свойству Status присваивается значение Canceled, если во время обработки не возникает исключения.By transitioning to the canceled state, the Completion property has the Status property set to Canceled, unless an exception occurred during processing. В таком случае свойству Status присваивается значение Faulted.In that case, Status is set to Faulted.

Пример использования отмены в приложении Windows Forms см. в статье Руководство. Отмена блока потока данных.For an example that demonstrates how to use cancellation in a Windows Forms application, see How to: Cancel a Dataflow Block. Подробнее об отмене в TPL читайте в разделе Отмена задач.For more information about cancellation in the TPL, see Task Cancellation.

Определение жадного и нежадного поведенияSpecifying Greedy Versus Non-Greedy Behavior

Несколько группирующих типов блоков потоков данных могут работать либо в жадном, либо в нежадном режиме.Several grouping dataflow block types can operate in either greedy or non-greedy mode. По умолчанию стандартные типы блоков потоков данных работают в жадном режиме.By default, the predefined dataflow block types operate in greedy mode.

Для типов блоков соединения, таких как JoinBlock<T1,T2>, жадный режим означает, что блок сразу принимает данные, даже если соответствующие данные, с которыми нужно выполнить объединение, еще недоступны.For join block types such as JoinBlock<T1,T2>, greedy mode means that the block immediately accepts data even if the corresponding data with which to join is not yet available. Нежадный режим означает, что блок откладывает все входящие сообщения до тех пор, пока сообщение не будет доступно на каждом из его целевых объектов для завершения объединения.Non-greedy mode means that the block postpones all incoming messages until one is available on each of its targets to complete the join. Если любое из отложенных сообщений больше недоступно, блоки соединения освобождают все отложенные сообщения и перезапускают процесс.If any of the postponed messages are no longer available, the join block releases all postponed messages and restarts the process. Для класса BatchBlock<T> жадный и нежадный режим работают одинаково за исключением того, что в нежадном режиме объект BatchBlock<T> откладывает все входящие сообщения до тех пор, пока достаточное их количество не будет доступно из различных источников для заполнения пакета.For the BatchBlock<T> class, greedy and non-greedy behavior is similar, except that under non-greedy mode, a BatchBlock<T> object postpones all incoming messages until enough are available from distinct sources to complete a batch.

Для определения нежадного режима для блока потока данных задайте свойству Greedy значение False.To specify non-greedy mode for a dataflow block, set Greedy to False. Пример использования нежадного режима для обеспечения более эффективного совместного использования источника данных несколькими блоками соединения см. в статье Руководство. Использование JoinBlock для чтения данных из нескольких источников.For an example that demonstrates how to use non-greedy mode to enable multiple join blocks to share a data source more efficiently, see How to: Use JoinBlock to Read Data From Multiple Sources.

[в начало][go to top]

Пользовательские блоки потоков данныхCustom Dataflow Blocks

Хотя библиотека потоков данных TPL предоставляет множество стандартных типов блоков, можно создавать дополнительные типы блоков, которые выполняют устанавливаемые пользователем функции.Although the TPL Dataflow Library provides many predefined block types, you can create additional block types that perform custom behavior. Реализуйте интерфейсы ISourceBlock<TOutput> или ITargetBlock<TInput> непосредственно или используйте метод Encapsulate для сборки сложных блоков, которые инкапсулируют поведение существующих типов блоков.Implement the ISourceBlock<TOutput> or ITargetBlock<TInput> interfaces directly or use the Encapsulate method to build a complex block that encapsulates the behavior of existing block types. Примеры, в которых демонстрируются способы нестандартной реализации функциональности блоков потока данных, см. в статье Пошаговое руководство. Создание пользовательского типа блока потока данных.For examples that show how to implement custom dataflow block functionality, see Walkthrough: Creating a Custom Dataflow Block Type.

[в начало][go to top]

ЗаголовокTitle ОПИСАНИЕDescription
Практическое руководство. Запись и чтение сообщений в блоке потока данныхHow to: Write Messages to and Read Messages from a Dataflow Block Здесь показано, как писать и считывать сообщения из объекта BufferBlock<T>.Demonstrates how to write messages to and read messages from a BufferBlock<T> object.
Практическое руководство. Реализация шаблона потока данных "производитель — получатель"How to: Implement a Producer-Consumer Dataflow Pattern Здесь приводится описания способов использования модели потока данных для реализации шаблона производитель-потребитель, где производитель отправляет сообщения в блок потока данных, а потребитель считывает сообщения из этого блока.Describes how to use the dataflow model to implement a producer-consumer pattern, where the producer sends messages to a dataflow block, and the consumer reads messages from that block.
Практическое руководство. Выполнение действий при получении данных блоком потоков данныхHow to: Perform Action When a Dataflow Block Receives Data Здесь приводится описание способов предоставления делегатов типам блоков выполнения потока данных, ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput>.Describes how to provide delegates to the execution dataflow block types, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput>.
Пошаговое руководство: Создание конвейера потока данныхWalkthrough: Creating a Dataflow Pipeline Здесь приводится способ создания конвейера потока данных, который загружает текст из Интернета и выполняет над ним операции.Describes how to create a dataflow pipeline that downloads text from the web and performs operations on that text.
Практическое руководство. Удаление связей с блоками потоков данныхHow to: Unlink Dataflow Blocks Здесь показано, как с помощью метода LinkTo удалить связь между целевым блоком и его источником после того, как источник отправил сообщение целевому объекту.Demonstrates how to use the LinkTo method to unlink a target block from its source after the source offers a message to the target.
Пошаговое руководство: Использование потока данных в приложении Windows FormsWalkthrough: Using Dataflow in a Windows Forms Application Здесь демонстрируется, как создавать сеть блоков потока данных, которые выполняют обработку изображений в приложении Windows Forms.Demonstrates how to create a network of dataflow blocks that perform image processing in a Windows Forms application.
Практическое руководство. Отмена блока потока данныхHow to: Cancel a Dataflow Block Здесь демонстрируется способ использования отмены в приложении Windows Forms.Demonstrates how to use cancellation in a Windows Forms application.
Практическое руководство. Использование JoinBlock для чтения данных из нескольких источниковHow to: Use JoinBlock to Read Data From Multiple Sources Здесь описывается способ использования класса JoinBlock<T1,T2> для выполнения операции, когда данные доступны из нескольких источников, и использования нежадного режима, чтобы позволить нескольким блокам объединения использовать общий источник данных более эффективно.Explains how to use the JoinBlock<T1,T2> class to perform an operation when data is available from multiple sources, and how to use non-greedy mode to enable multiple join blocks to share a data source more efficiently.
Практическое руководство. Указание степени параллелизма в блоке потока данныхHow to: Specify the Degree of Parallelism in a Dataflow Block Здесь описывается, как задать свойство MaxDegreeOfParallelism, чтобы позволить блоку выполнения потока данных обрабатывать более одного сообщения единовременно.Describes how to set the MaxDegreeOfParallelism property to enable an execution dataflow block to process more than one message at a time.
Практическое руководство. Указание планировщика задач в блоке потока данныхHow to: Specify a Task Scheduler in a Dataflow Block Здесь демонстрируется, как связать определенный планировщик задач при использовании потока данных в приложении.Demonstrates how to associate a specific task scheduler when you use dataflow in your application.
Пошаговое руководство: Повышение эффективности с помощью BatchBlock и BatchedJoinBlockWalkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency Здесь описывается, как использовать класс BatchBlock<T> для увеличения эффективности операций вставки в базу данных, а также как использовать класс BatchedJoinBlock<T1,T2> для получения как результата, так и всех исключений, возникающих при чтении данных из базы.Describes how to use the BatchBlock<T> class to improve the efficiency of database insert operations, and how to use the BatchedJoinBlock<T1,T2> class to capture both the results and any exceptions that occur while the program reads from a database.
Пошаговое руководство: Создание пользовательского типа блока потока данныхWalkthrough: Creating a Custom Dataflow Block Type Здесь приводятся два способа создания типа блока потока данных, который реализует пользовательские функции.Demonstrates two ways to create a dataflow block type that implements custom behavior.
Библиотека параллельных задач (TPL)Task Parallel Library (TPL) Здесь приводится описание TPL — библиотеки, упрощающей параллельное и одновременное программирование в приложениях .NET Framework.Introduces the TPL, a library that simplifies parallel and concurrent programming in .NET Framework applications.