Procedimiento Toma de medidas cuando un bloque de flujos de datos recibe datosHow to: Perform Action When a Dataflow Block Receives Data

Los tipos Bloque de flujo de datos de ejecución llaman a un delegado proporcionado por el usuario cuando reciben datos.Execution dataflow block types call a user-provided delegate when they receive data. Las clases System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> y System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> son tipos de bloques de flujo de datos de ejecución.The System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>, and System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> classes are execution dataflow block types. Puede usar la palabra clave delegate (Sub en Visual Basic), Action<T>, Func<T,TResult> o una expresión lambda cuando proporcione una función de trabajo a un bloque de flujo de datos de ejecución.You can use the delegate keyword (Sub in Visual Basic), Action<T>, Func<T,TResult>, or a lambda expression when you provide a work function to an execution dataflow block. Este documento describe cómo usar Func<T,TResult> y expresiones lambda para realizar la acción en bloques de ejecución.This document describes how to use Func<T,TResult> and lambda expressions to perform action in execution blocks.

Nota

La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET.The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow.To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.Alternatively, to install it using the .Net Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

EjemploExample

En el ejemplo siguiente se usa el flujo de datos para leer un archivo del disco y se calcula el número de bytes en el archivo que son iguales a cero.The following example uses dataflow to read a file from disk and computes the number of bytes in that file that are equal to zero. Usa TransformBlock<TInput,TOutput> para leer el archivo y calcular el número de cero bytes, y ActionBlock<TInput> para imprimir el número de cero bytes en la consola.It uses TransformBlock<TInput,TOutput> to read the file and compute the number of zero bytes, and ActionBlock<TInput> to print the number of zero bytes to the console. El objeto TransformBlock<TInput,TOutput> especifica un objeto Func<T,TResult> para realizar el trabajo cuando los bloques reciben datos.The TransformBlock<TInput,TOutput> object specifies a Func<T,TResult> object to perform work when the blocks receive data. El objeto ActionBlock<TInput> usa una expresión lambda para imprimir en la consola el número de cero bytes que se lee.The ActionBlock<TInput> object uses a lambda expression to print to the console the number of zero bytes that are read.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
   // Computes the number of zero bytes that the provided file
   // contains.
   static int CountBytes(string path)
   {
      byte[] buffer = new byte[1024];
      int totalZeroBytesRead = 0;
      using (var fileStream = File.OpenRead(path))
      {
         int bytesRead = 0;
         do
         {
            bytesRead = fileStream.Read(buffer, 0, buffer.Length);
            totalZeroBytesRead += buffer.Count(b => b == 0);
         } while (bytesRead > 0);
      }

      return totalZeroBytesRead;
   }

   static void Main(string[] args)
   {
      // Create a temporary file on disk.
      string tempFile = Path.GetTempFileName();

      // Write random data to the temporary file.
      using (var fileStream = File.OpenWrite(tempFile))
      {
         Random rand = new Random();
         byte[] buffer = new byte[1024];
         for (int i = 0; i < 512; i++)
         {
            rand.NextBytes(buffer);
            fileStream.Write(buffer, 0, buffer.Length);
         }
      }

      // Create an ActionBlock<int> object that prints to the console 
      // the number of bytes read.
      var printResult = new ActionBlock<int>(zeroBytesRead =>
      {
         Console.WriteLine("{0} contains {1} zero bytes.",
            Path.GetFileName(tempFile), zeroBytesRead);
      });

      // Create a TransformBlock<string, int> object that calls the 
      // CountBytes function and returns its result.
      var countBytes = new TransformBlock<string, int>(
         new Func<string, int>(CountBytes));

      // Link the TransformBlock<string, int> object to the 
      // ActionBlock<int> object.
      countBytes.LinkTo(printResult);

      // Create a continuation task that completes the ActionBlock<int>
      // object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });

      // Post the path to the temporary file to the 
      // TransformBlock<string, int> object.
      countBytes.Post(tempFile);

      // Requests completion of the TransformBlock<string, int> object.
      countBytes.Complete();

      // Wait for the ActionBlock<int> object to print the message.
      printResult.Completion.Wait();

      // Delete the temporary file.
      File.Delete(tempFile);
   }
}

/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
   ' Computes the number of zero bytes that the provided file
   ' contains.
   Private Shared Function CountBytes(ByVal path As String) As Integer
      Dim buffer(1023) As Byte
      Dim totalZeroBytesRead As Integer = 0
      Using fileStream = File.OpenRead(path)
         Dim bytesRead As Integer = 0
         Do
            bytesRead = fileStream.Read(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
         Loop While bytesRead > 0
      End Using

      Return totalZeroBytesRead
   End Function

   Shared Sub Main(ByVal args() As String)
      ' Create a temporary file on disk.
      Dim tempFile As String = Path.GetTempFileName()

      ' Write random data to the temporary file.
      Using fileStream = File.OpenWrite(tempFile)
         Dim rand As New Random()
         Dim buffer(1023) As Byte
         For i As Integer = 0 To 511
            rand.NextBytes(buffer)
            fileStream.Write(buffer, 0, buffer.Length)
         Next i
      End Using

      ' Create an ActionBlock<int> object that prints to the console 
      ' the number of bytes read.
      Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))

      ' Create a TransformBlock<string, int> object that calls the 
      ' CountBytes function and returns its result.
      Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))

      ' Link the TransformBlock<string, int> object to the 
      ' ActionBlock<int> object.
      countBytes.LinkTo(printResult)

      ' Create a continuation task that completes the ActionBlock<int>
      ' object when the TransformBlock<string, int> finishes.
      countBytes.Completion.ContinueWith(Sub() printResult.Complete())

      ' Post the path to the temporary file to the 
      ' TransformBlock<string, int> object.
      countBytes.Post(tempFile)

      ' Requests completion of the TransformBlock<string, int> object.
      countBytes.Complete()

      ' Wait for the ActionBlock<int> object to print the message.
      printResult.Completion.Wait()

      ' Delete the temporary file.
      File.Delete(tempFile)
   End Sub
End Class

' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'

Aunque puede proporcionar una expresión lambda para un objeto TransformBlock<TInput,TOutput>, este ejemplo utiliza Func<T,TResult> para habilitar otro código para usar el método CountBytes.Although you can provide a lambda expression to a TransformBlock<TInput,TOutput> object, this example uses Func<T,TResult> to enable other code to use the CountBytes method. El objeto ActionBlock<TInput> utiliza una expresión lambda porque el trabajo que se debe realizar es específico de esta tarea y es probable que no sea útil desde otro código.The ActionBlock<TInput> object uses a lambda expression because the work to be performed is specific to this task and is not likely to be useful from other code. Para información sobre cómo funcionan las expresiones lambda en la biblioteca TPL, vea Expresiones lambda en PLINQ y TPL.For more information about how lambda expressions work in the Task Parallel Library, see Lambda Expressions in PLINQ and TPL.

En la sección Resumen de tipos de delegado del documento Flujo de datos se resumen los tipos de delegado que puede proporcionar a los objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> y TransformManyBlock<TInput,TOutput>.The section Summary of Delegate Types in the Dataflow document summarizes the delegate types that you can provide to ActionBlock<TInput>, TransformBlock<TInput,TOutput>, and TransformManyBlock<TInput,TOutput> objects. La tabla también especifica si el tipo de delegado funciona de forma sincrónica o asincrónica.The table also specifies whether the delegate type operates synchronously or asynchronously.

Programación sólidaRobust Programming

En este ejemplo se proporcionada un delegado de tipo Func<T,TResult> para el objeto TransformBlock<TInput,TOutput>, con el fin de realizar la tarea del bloque de flujo de datos de forma sincrónica.This example provides a delegate of type Func<T,TResult> to the TransformBlock<TInput,TOutput> object to perform the task of the dataflow block synchronously. Para que el bloque de flujo de datos se comporte de forma asincrónica, proporcione un delegado de tipo Func<TResult> al bloque de flujo de datos.To enable the dataflow block to behave asynchronously, provide a delegate of type Func<TResult> to the dataflow block. Cuando un bloque de flujo de datos se comporta de forma asincrónica, la tarea del bloque de flujo de datos se completa solo cuando el objeto Task<TResult> devuelto finaliza.When a dataflow block behaves asynchronously, the task of the dataflow block is complete only when the returned Task<TResult> object finishes. En el ejemplo siguiente se modifica el método CountBytes y se usan los operadores async y await (Async y Await en Visual Basic) para calcular de forma asincrónica el número total de bytes que son cero en el archivo proporcionado.The following example modifies the CountBytes method and uses the async and await operators (Async and Await in Visual Basic) to asynchronously compute the total number of bytes that are zero in the provided file. El método ReadAsync realiza las operaciones de lectura de archivo de forma asincrónica.The ReadAsync method performs file read operations asynchronously.

// Asynchronously computes the number of zero bytes that the provided file 
// contains.
static async Task<int> CountBytesAsync(string path)
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file 
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
   Dim buffer(1023) As Byte
   Dim totalZeroBytesRead As Integer = 0
   Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
      Dim bytesRead As Integer = 0
      Do
         ' Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
         totalZeroBytesRead += buffer.Count(Function(b) b = 0)
      Loop While bytesRead > 0
   End Using

   Return totalZeroBytesRead
End Function

También puede utilizar expresiones lambda asincrónicas para realizar la acción en un bloque de flujo de datos de ejecución.You can also use asynchronous lambda expressions to perform action in an execution dataflow block. En el ejemplo siguiente se modifica el objeto TransformBlock<TInput,TOutput> que se utiliza en el ejemplo anterior para que utilice una expresión lambda para realizar el trabajo de forma asincrónica.The following example modifies the TransformBlock<TInput,TOutput> object that is used in the previous example so that it uses a lambda expression to perform the work asynchronously.

// Create a TransformBlock<string, int> object that calls the 
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
   byte[] buffer = new byte[1024];
   int totalZeroBytesRead = 0;
   using (var fileStream = new FileStream(
      path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
   {
      int bytesRead = 0;
      do
      {
         // Asynchronously read from the file stream.
         bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
         totalZeroBytesRead += buffer.Count(b => b == 0);
      } while (bytesRead > 0);
   }

   return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the 
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
         ' Asynchronously read from the file stream.
    Dim buffer(1023) As Byte
    Dim totalZeroBytesRead As Integer = 0
    Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
        Dim bytesRead As Integer = 0
        Do
            bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
            totalZeroBytesRead += buffer.Count(Function(b) b = 0)
        Loop While bytesRead > 0
    End Using
    Return totalZeroBytesRead
End Function)

Vea tambiénSee also