Procedimiento Uso de JoinBlock para leer datos de diferentes orígenesHow to: Use JoinBlock to Read Data From Multiple Sources

En este documento se explica cómo utilizar la clase JoinBlock<T1,T2> para realizar una operación cuando los datos están disponibles en varios orígenes.This document explains how to use the JoinBlock<T1,T2> class to perform an operation when data is available from multiple sources. También se demuestra cómo usar el modo no expansivo para habilitar varios bloques de combinación para compartir un origen de datos más eficazmente.It also demonstrates how to use non-greedy mode to enable multiple join blocks to share a data source more efficiently.

Nota

La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET.The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow.To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.Alternatively, to install it using the .Net Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

EjemploExample

El siguiente ejemplo define tres tipos de recursos, NetworkResource, FileResource y MemoryResource, y realiza las operaciones cuando los recursos están disponibles.The following example defines three resource types, NetworkResource, FileResource, and MemoryResource, and performs operations when resources become available. Este ejemplo requiere un par NetworkResource y MemoryResource para realizar la primera operación y un par FileResource y MemoryResource para realizar la segunda operación.This example requires a NetworkResource and MemoryResource pair in order to perform the first operation and a FileResource and MemoryResource pair in order to perform the second operation. Para permitir que estas operaciones se produzcan cuando todos los recursos necesarios están disponibles, este ejemplo usa la clase JoinBlock<T1,T2>.To enable these operations to occur when all required resources are available, this example uses the JoinBlock<T1,T2> class. Cuando un objeto JoinBlock<T1,T2> recibe datos de todos los orígenes, los propaga en su destino, que en este ejemplo es un objeto ActionBlock<TInput>.When a JoinBlock<T1,T2> object receives data from all sources, it propagates that data to its target, which in this example is an ActionBlock<TInput> object. Ambos objetos JoinBlock<T1,T2> leen de un grupo compartido de objetos MemoryResource.Both JoinBlock<T1,T2> objects read from a shared pool of MemoryResource objects.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
   // Represents a resource. A derived class might represent 
   // a limited resource such as a memory, network, or I/O
   // device.
   abstract class Resource
   {
   }

   // Represents a memory resource. For brevity, the details of 
   // this class are omitted.
   class MemoryResource : Resource
   {
   }

   // Represents a network resource. For brevity, the details of 
   // this class are omitted.
   class NetworkResource : Resource
   {
   }

   // Represents a file resource. For brevity, the details of 
   // this class are omitted.
   class FileResource : Resource
   {
   }

   static void Main(string[] args)
   {
      // Create three BufferBlock<T> objects. Each object holds a different
      // type of resource.
      var networkResources = new BufferBlock<NetworkResource>();
      var fileResources = new BufferBlock<FileResource>();
      var memoryResources = new BufferBlock<MemoryResource>();

      // Create two non-greedy JoinBlock<T1, T2> objects. 
      // The first join works with network and memory resources; 
      // the second pool works with file and memory resources.

      var joinNetworkAndMemoryResources =
         new JoinBlock<NetworkResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      var joinFileAndMemoryResources =
         new JoinBlock<FileResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      // Create two ActionBlock<T> objects. 
      // The first block acts on a network resource and a memory resource.
      // The second block acts on a file resource and a memory resource.

      var networkMemoryAction =
         new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("Network worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("Network worker: finished using resources...");

               // Release the resources back to their respective pools.
               networkResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      var fileMemoryAction =
         new ActionBlock<Tuple<FileResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("File worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("File worker: finished using resources...");

               // Release the resources back to their respective pools.
               fileResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      // Link the resource pools to the JoinBlock<T1, T2> objects.
      // Because these join blocks operate in non-greedy mode, they do not
      // take the resource from a pool until all resources are available from
      // all pools.

      networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
      memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);

      fileResources.LinkTo(joinFileAndMemoryResources.Target1);
      memoryResources.LinkTo(joinFileAndMemoryResources.Target2);

      // Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

      joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
      joinFileAndMemoryResources.LinkTo(fileMemoryAction);

      // Populate the resource pools. In this example, network and 
      // file resources are more abundant than memory resources.

      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());

      memoryResources.Post(new MemoryResource());

      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());

      // Allow data to flow through the network for several seconds.
      Thread.Sleep(10000);
   }
}

/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
   ' Represents a resource. A derived class might represent 
   ' a limited resource such as a memory, network, or I/O
   ' device.
   Private MustInherit Class Resource
   End Class

   ' Represents a memory resource. For brevity, the details of 
   ' this class are omitted.
   Private Class MemoryResource
       Inherits Resource
   End Class

   ' Represents a network resource. For brevity, the details of 
   ' this class are omitted.
   Private Class NetworkResource
       Inherits Resource
   End Class

   ' Represents a file resource. For brevity, the details of 
   ' this class are omitted.
   Private Class FileResource
       Inherits Resource
   End Class

   Shared Sub Main(ByVal args() As String)
      ' Create three BufferBlock<T> objects. Each object holds a different
      ' type of resource.
      Dim networkResources = New BufferBlock(Of NetworkResource)()
      Dim fileResources = New BufferBlock(Of FileResource)()
      Dim memoryResources = New BufferBlock(Of MemoryResource)()

      ' Create two non-greedy JoinBlock<T1, T2> objects. 
      ' The first join works with network and memory resources; 
      ' the second pool works with file and memory resources.

      Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

      Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

      ' Create two ActionBlock<T> objects. 
      ' The first block acts on a network resource and a memory resource.
      ' The second block acts on a file resource and a memory resource.

      Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
               ' Perform some action on the resources.
               ' Print a message.
               ' Simulate a lengthy operation that uses the resources.
               ' Print a message.
               ' Release the resources back to their respective pools.
          Console.WriteLine("Network worker: using resources...")
          Thread.Sleep(New Random().Next(500, 2000))
          Console.WriteLine("Network worker: finished using resources...")
          networkResources.Post(data.Item1)
          memoryResources.Post(data.Item2)
      End Sub)

      Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
               ' Perform some action on the resources.
               ' Print a message.
               ' Simulate a lengthy operation that uses the resources.
               ' Print a message.
               ' Release the resources back to their respective pools.
          Console.WriteLine("File worker: using resources...")
          Thread.Sleep(New Random().Next(500, 2000))
          Console.WriteLine("File worker: finished using resources...")
          fileResources.Post(data.Item1)
          memoryResources.Post(data.Item2)
      End Sub)

      ' Link the resource pools to the JoinBlock<T1, T2> objects.
      ' Because these join blocks operate in non-greedy mode, they do not
      ' take the resource from a pool until all resources are available from
      ' all pools.

      networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
      memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)

      fileResources.LinkTo(joinFileAndMemoryResources.Target1)
      memoryResources.LinkTo(joinFileAndMemoryResources.Target2)

      ' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

      joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
      joinFileAndMemoryResources.LinkTo(fileMemoryAction)

      ' Populate the resource pools. In this example, network and 
      ' file resources are more abundant than memory resources.

      networkResources.Post(New NetworkResource())
      networkResources.Post(New NetworkResource())
      networkResources.Post(New NetworkResource())

      memoryResources.Post(New MemoryResource())

      fileResources.Post(New FileResource())
      fileResources.Post(New FileResource())
      fileResources.Post(New FileResource())

      ' Allow data to flow through the network for several seconds.
      Thread.Sleep(10000)

   End Sub

End Class

' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'

Para habilitar el uso eficaz del grupo compartido de objetos MemoryResource, este ejemplo especifica un objeto GroupingDataflowBlockOptions que tiene la propiedad Greedy establecida en False para crear objetos JoinBlock<T1,T2> que actúan de modo no expansivo.To enable efficient use of the shared pool of MemoryResource objects, this example specifies a GroupingDataflowBlockOptions object that has the Greedy property set to False to create JoinBlock<T1,T2> objects that act in non-greedy mode. Un bloque de combinación no expansivo pospone todos los mensajes entrantes hasta que uno esté disponible de cada origen.A non-greedy join block postpones all incoming messages until one is available from each source. Si otro bloque aceptó cualquiera de los mensajes pospuestos, el bloque de combinación reinicia el proceso.If any of the postponed messages were accepted by another block, the join block restarts the process. El modo no expansivo permite que los bloques de combinación compartan uno o varios bloques de origen para avanzar mientras otros bloques esperan datos.Non-greedy mode enables join blocks that share one or more source blocks to make forward progress as the other blocks wait for data. En este ejemplo, si un objeto MemoryResource se agrega al grupo memoryResources, el primer bloque de combinación que recibe su segundo origen de datos puede progresar.In this example, if a MemoryResource object is added to the memoryResources pool, the first join block to receive its second data source can make forward progress. Si en este ejemplo se usara el modo expansivo, que es el predeterminado, un bloque de combinación puede adoptar el objeto MemoryResource y esperar a que el segundo recurso esté disponible.If this example were to use greedy mode, which is the default, one join block might take the MemoryResource object and wait for the second resource to become available. Sin embargo, si el otro bloque de combinación tiene su segundo origen de datos disponible, no puede avanzar porque otro bloque de combinación ha adoptado el objeto MemoryResource.However, if the other join block has its second data source available, it cannot make forward progress because the MemoryResource object has been taken by the other join block.

Programación sólidaRobust Programming

El uso de las combinaciones no expansivas también puede ayudar a evitar el interbloqueo en la aplicación.The use of non-greedy joins can also help you prevent deadlock in your application. En una aplicación de software, el interbloqueo se produce cuando dos o más procesos mantienen un recurso y esperan mutuamente a que el otro proceso libere algún otro recurso.In a software application, deadlock occurs when two or more processes each hold a resource and mutually wait for another process to release some other resource. Considere una aplicación que define dos objetos JoinBlock<T1,T2>.Consider an application that defines two JoinBlock<T1,T2> objects. Ambos objetos leen datos de dos bloques de origen compartidos.Both objects each read data from two shared source blocks. En modo expansivo, si un bloque de combinación lee desde el primer origen y el segundo bloque de combinación lee desde el segundo origen, se podría producir un interbloqueo de la aplicación porque los dos bloques de combinación esperan a que el otro libere su recurso.In greedy mode, if one join block reads from the first source and the second join block reads from the second source, the application might deadlock because both join blocks mutually wait for the other to release its resource. En el modo no expansivo, cada bloque de combinación lee de sus orígenes solo cuando todos los datos están disponibles y, por tanto, se elimina el riesgo del interbloqueo.In non-greedy mode, each join block reads from its sources only when all data is available, and therefore, the risk of deadlock is eliminated.

Vea tambiénSee also