Cómo: Sincronizar un subproceso productor y un subproceso consumidor (Guía de programación de C#)

Actualización: noviembre 2007

En el ejemplo siguiente se muestra la sincronización de subprocesos entre el subproceso primario y dos subprocesos de trabajo utilizando la palabra clave lock y las clases AutoResetEvent y ManualResetEvent. Para obtener más información, vea lock (Instrucción, Referencia de C#).

El ejemplo crea dos subprocesos auxiliares o de trabajo. Un subproceso genera los elementos y los almacena en una cola genérica que no es segura para subprocesos. Para obtener más información, vea Queue<T>. El otro subproceso utiliza los elementos de esa cola. Además, el subproceso primario muestra periódicamente el contenido de la cola, de modo que tres subprocesos tienen acceso a la cola. La palabra clave lock se utiliza para sincronizar el acceso a la cola para asegurarse de que no se daña el estado de la misma.

Además de simplemente evitar el acceso simultáneo con la palabra clave lock, dos objetos de evento proporcionan sincronización adicional. Uno se utiliza para indicar a los subprocesos de trabajo que finalicen y el otro lo utiliza el subproceso productor para indicar al subproceso consumidor que se ha agregado un nuevo elemento a la cola. Estos dos objetos de evento se encapsulan en una clase denominada SyncEvents. Esto permite que los eventos pasen fácilmente a los objetos que representan el subproceso consumidor y productor. La clase SyncEvents se define de la siguiente forma:

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

La clase AutoResetEvent se utiliza para el evento "nuevo elemento" porque es conveniente que este evento se restablezca automáticamente cada vez que el subproceso consumidor responde a este evento. Alternativamente, la clase ManualResetEvent se utiliza para el evento "salir" porque se desea que varios subprocesos respondan cuando se señala este evento. Si se utilizara en su lugar AutoResetEvent, el evento revertiría a un estado no señalado después de que sólo un subproceso respondiera al evento. El otro subproceso no respondería y, en ese caso, no podría finalizar.

La clase SyncEvents crea los dos eventos y los almacena de dos formas diferentes: como EventWaitHandle, que es la clase base de AutoResetEvent y ManualResetEvent, y en una matriz basada en WaitHandle. Como verá en la descripción del subproceso consumidor, esta matriz es necesaria para que el subproceso consumidor pueda responder a cualquier evento.

Los subprocesos consumidor y productor se representan con las clases denominadas Consumer y Producer. Ambas definen un método denominado ThreadRun. Estos métodos se utilizan como puntos de entrada para los subprocesos de trabajo que el método Main crea.

El método ThreadRun definido por la clase Producer tiene la apariencia siguiente:

// Producer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

Este método recorre el código hasta que el evento "salir del subproceso" queda señalado. El estado de este evento se prueba con el método WaitOne utilizando la propiedad ExitThreadEvent definida por la clase SyncEvents. En este caso, el estado del evento se comprueba sin bloquear el subproceso actual porque el primer argumento utilizado con WaitOne es cero, lo que indica que el método debe devolver un valor inmediatamente. Si WaitOne devuelve true, el evento en cuestión se señala actualmente. En ese caso, se devuelve el método ThreadRun, que tiene el efecto de finalizar el subproceso de trabajo que ejecuta este método.

Hasta que se señala el evento "salir del subproceso", el método Producer.ThreadStart intenta mantener 20 elementos en la cola. Un elemento es simplemente un entero entre cero y 100. La colección debe bloquearse antes de agregar nuevos elementos para evitar que los subprocesos consumidor y primario tengan acceso a la colección al mismo tiempo. Para ello, utilice la palabra clave lock. El argumento que se pasa a lock es el campo SyncRoot expuesto a través de la interfaz ICollection. Este campo se proporciona específicamente para sincronizar el acceso del subproceso. Se concede acceso exclusivo a la colección a cualquier instrucción incluida en el bloque de código que sigue a lock. Para cada nuevo elemento que el productor agrega a la cola, se realiza una llamada al método Set en el evento "nuevo elemento". Esto indica al subproceso consumidor que emerja de su estado suspendido para procesar el nuevo elemento.

El objeto Consumer también define un método denominado ThreadRun. Al igual que en la versión del productor de ThreadRun, un subproceso de trabajo creado por el método Main ejecuta este método. Sin embargo, la versión del consumidor de ThreadStart debe responder a dos eventos. El método Consumer.ThreadRun tiene la apariencia siguiente:

// Consumer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

Este método utiliza WaitAny para bloquear el subproceso consumidor hasta que se señale cualquiera de los identificadores de espera en la matriz suministrada. En este caso, hay dos identificadores en la matriz: uno que termina los subprocesos de trabajo y otro que indica que se ha agregado un nuevo elemento a la colección. WaitAny devuelve el índice del evento que se ha señalado. El evento "nuevo elemento" es el primero en la matriz, por lo que un índice de cero indica un nuevo elemento. En este caso, compruebe si hay un índice de 1, que indica el evento "salir del subproceso". Se utiliza para determinar si el método sigue utilizando elementos. Si el evento "nuevo elemento" se ha señalado, se obtiene acceso exclusivo a la colección con lock y se utiliza el nuevo elemento. Dado que este ejemplo genera y utiliza miles de elementos, no muestra cada elemento utilizado. En su lugar se utiliza Main para mostrar periódicamente el contenido de la cola, como se mostrará.

El método Main empieza por crear la cola cuyo contenido se generará y utilizará, y una instancia de SyncEvents, que se vio anteriormente:

Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

A continuación, Main configura los objetos Producer y Consumer para el uso con subprocesos de trabajo. Sin embargo, este paso no crea o inicia los subprocesos de trabajo reales:

Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

Observe que la cola y el objeto de evento de sincronización se pasan a los subprocesos Consumer y Producer como argumentos de constructor. Esto proporciona a ambos objetos los recursos compartidos que necesitan para realizar sus tareas respectivas. A continuación, se crean dos nuevos objetos Thread utilizando el método ThreadRun de cada objeto como argumento. Cada subproceso de trabajo, cuando se inicie, utilizará este argumento como punto de entrada para el subproceso.

A continuación, Main inicia los dos subprocesos de trabajo con una llamada al método Start, de la siguiente forma:

producerThread.Start();
consumerThread.Start();

En este momento, se crean los dos subprocesos de trabajo y se inicia la ejecución asincrónica, independientemente del subproceso primario que está ejecutando actualmente el método Main. De hecho, lo siguiente que Main hace es suspender el subproceso primario con una llamada al método Sleep. El método suspende el subproceso que está actualmente en ejecución durante unos milisegundos. Una vez transcurrido este intervalo, se reactiva Main y, en ese momento, se muestra el contenido de la cola. Main repite esto cuatro veces, de la siguiente forma:

for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

Por último, Main indica a los subprocesos de trabajo que finalicen invocando al método Set del evento "salir del subproceso" y, después, llama al método Join en cada subproceso de trabajo para bloquear el subproceso primario hasta que cada subproceso de trabajo responde al evento y finaliza.

Hay un último ejemplo de sincronización de subprocesos: el método ShowQueueContents. Este método, como los subprocesos consumidor y productor, utiliza lock para obtener acceso exclusivo a la cola. Sin embargo, en este caso el acceso exclusivo es muy importante, porque ShowQueueContents enumera la colección completa. La enumeración de una colección es una operación especialmente propensa a dañar los datos con operaciones asincrónicas porque implica recorrer el contenido de toda la colección.

Observe que el subproceso primario ejecuta ShowQueueContents porque Main lo llama. Esto significa que este método, cuando obtiene acceso exclusivo a la cola de elementos, bloquea los subprocesos consumidor y productor para evitar que tengan acceso a la cola. ShowQueueContents bloquea la cola y enumera el contenido:

private static void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

A continuación se muestra el ejemplo completo.

Ejemplo

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
public class Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class ThreadSyncSample
{
    private static void ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    static void Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}
Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items

Vea también

Tareas

Ejemplo Monitor Synchronization Technology

Ejemplo Wait Synchronization Technology

Conceptos

Guía de programación de C#

Referencia

Sincronización de subprocesos (Guía de programación de C#)

Thread

lock (Instrucción, Referencia de C#)

AutoResetEvent

ManualResetEvent

Set

Join

WaitOne

WaitAll

Queue

ICollection

Start

Sleep

WaitHandle

EventWaitHandle