Gewusst wie: Synchronisieren eines Producer- und Consumerthreads (C#-Programmierhandbuch)

Aktualisiert: November 2007

Im folgenden Beispiel wird die Threadsynchronisierung zwischen dem primären Thread und zwei Arbeitsthreads mit dem lock-Schlüsselwort sowie der AutoResetEvent-Klasse und der ManualResetEvent-Klasse veranschaulicht. Weitere Informationen hierzu finden Sie unter lock-Anweisung (C#-Referenz).

Im Beispiel werden zwei Hilfs- bzw. Arbeitsthreads erstellt. Ein Thread erstellt Elemente und speichert sie in einer nicht threadsicheren generischen Warteschlange. Weitere Informationen finden Sie unter Queue<T>. Der andere Thread verwendet Elemente aus dieser Warteschlange. Darüber hinaus zeigt der primäre Thread in regelmäßigen Abständen den Inhalt der Warteschlange an, sodass drei Threads auf die Warteschlange zugreifen. Das lock-Schlüsselwort wird zur Synchronisierung des Zugriffs auf die Warteschlange verwendet, um sicherzustellen, dass die Warteschlange nicht beschädigt wird.

Zusätzlich zur Vermeidung des gleichzeitigen Zugriffs mit dem lock-Schlüsselwort sorgen zwei Ereignisobjekte für eine weitere Synchronisierung. Eines wird verwendet, um den Arbeitsthreads zu signalisieren, dass sie beendet werden, und das andere wird vom Producerthread verwendet, um dem Consumerthread zu signalisieren, dass der Warteschlange ein neues Element hinzugefügt wurde. Diese zwei Ereignisobjekte werden in einer Klasse mit dem Namen SyncEvents gekapselt. Auf diese Weise können die Ereignisse problemlos an die Objekte übergeben werden, die den Consumer- und den Producerthread darstellen. Die SyncEvents-Klasse wird folgendermaßen definiert:

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

Die AutoResetEvent-Klasse wird für das Ereignis "neues Element" verwendet, da dieses Ereignis jedes Mal automatisch zurückgesetzt werden soll, wenn der Consumerthread darauf reagiert. Im Unterschied hierzu wird für das Ereignis "Thread beenden" die ManualResetEvent-Klasse verwendet, da mehrere Threads reagieren sollen, wenn dieses Ereignis signalisiert wird. Wenn Sie stattdessen AutoResetEvent verwenden, wird das Ereignis in einen nicht signalisierten Zustand zurückgesetzt, sobald ein Thread darauf reagiert hat. Der andere Thread würde nicht reagieren und demzufolge nicht beendet werden.

Die SyncEvents-Klasse erstellt zwei Ereignisse und speichert sie auf zwei verschiedene Arten: als EventWaitHandle, was die Basisklasse für AutoResetEvent und ManualResetEvent darstellt, und in einem Array auf der Grundlage von WaitHandle. Wie Sie im Abschnitt zum Consumerthread sehen werden, ist dieses Array notwendig, damit der Consumerthread auf beide Ereignisse reagieren kann.

Der Consumer- und der Producerthread werden durch Klassen mit den Namen Consumer und Producer dargestellt. Beide definieren eine Methode mit dem Namen ThreadRun. Diese Methoden werden als Einstiegspunkte für die Arbeitsthreads verwendet, die die Main-Methode erstellt.

Die ThreadRun-Methode, die von der Producer-Klasse definiert wird, sieht etwa so aus:

// Producer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

Diese Methode wird als Schleife ausgeführt, bis das Ereignis "Thread beenden" signalisiert wird. Der Zustand dieses Ereignisses wird mit der WaitOne-Methode überprüft. Dabei wird die durch die SyncEvents-Klasse definierte ExitThreadEvent-Eigenschaft verwendet. In diesem Fall wird der Zustand des Ereignisses überprüft, ohne den aktuellen Thread zu blockieren, da das erste mit WaitOne verwendete Argument 0 (Null) lautet. Dadurch wird angegeben, dass die Methode das Ergebnis sofort zurückgeben muss. Wenn WaitOne den Wert true zurückgibt, wird das betreffende Ereignis gerade signalisiert. In diesem Fall wird die ThreadRun-Methode zurückgegeben. Dies führt dazu, dass der Arbeitsthread durch die Ausführung dieser Methode beendet wird.

Bis das Ereignis "Thread beenden" signalisiert wird, versucht die Producer.ThreadStart-Methode, 20 Elemente in der Warteschlange zu halten. Ein Element ist einfach eine Ganzzahl zwischen 0 und 100. Vor dem Hinzufügen neuer Elemente muss die Auflistung gesperrt werden, um zu verhindern, dass der Consumerthread und der primäre Thread gleichzeitig auf sie zugreifen. Verwenden Sie dazu das lock-Schlüsselwort. Das an lock übergebene Argument ist das mit der ICollection-Schnittstelle verfügbar gemachte SyncRoot-Feld. Dieses Feld wird speziell für die Synchronisierung des Threadzugriffs bereitgestellt. Allen Anweisungen, die im Codeblock hinter lock enthalten sind, wird exklusiver Zugriff auf die Auflistung gewährt. Für jedes neue Element, das der Producer der Warteschlange hinzufügt, wird die Set-Methode für das Ereignis "neues Element" aufgerufen. Dies signalisiert dem Consumerthread, den unterbrochenen Zustand aufzuheben, um das neue Element zu verarbeiten.

Das Consumer-Objekt definiert auch eine Methode mit dem Namen ThreadRun. Wie die ThreadRun-Version des Producers wird diese Methode von einem mit der Main-Methode erstellten Arbeitsthread ausgeführt. Die Consumerversion von ThreadStart muss allerdings auf zwei Ereignisse reagieren. Die Consumer.ThreadRun-Methode sieht in etwa so aus:

// Consumer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

Diese Methode verwendet WaitAny, um den Consumerthread zu blockieren, bis eines der Wait-Handles im bereitgestellten Array signalisiert wird. In diesem Fall gibt es im Array zwei Handles, eines zum Beenden der Arbeitsthreads und eines, um anzuzeigen, dass der Auflistung ein neues Element hinzugefügt wurde. WaitAny gibt den Index des Ereignisses zurück, das signalisiert wurde. Das Ereignis "neues Element" ist das erste im Array, der Index 0 (Null) weist also auf ein neues Element hin. Suchen Sie in diesem Fall nach dem Index 1, der auf das Ereignis "Thread beenden" hinweist. Anhand dessen wird bestimmt, ob diese Methode weiterhin Elemente verwendet. Wenn das Ereignis "neues Element" signalisiert wurde, erhalten Sie mit lock exklusiven Zugriff auf die Auflistung und verwenden das neue Element. Da in diesem Beispiel Tausende von Elementen erstellt und verwendet werden, wird nicht jedes verwendete Element angezeigt. Um den Inhalt der Warteschlange in regelmäßigen Abständen anzuzeigen, verwenden Sie stattdessen Main. Dies wird im Folgenden veranschaulicht.

Die Main-Methode beginnt mit dem Erstellen der Warteschlange, deren Inhalt produziert und verwendet wird, und einer Instanz von SyncEvents (diese Klasse wurde weiter oben behandelt):

Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

Danach konfiguriert Main das Producer-Objekt und das Consumer-Objekt zur Verwendung mit Arbeitsthreads. In diesem Schritt werden jedoch nicht die tatsächlichen Arbeitsthreads erstellt oder gestartet:

Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

Beachten Sie, dass die Warteschlange und das Synchronisierungsereignisobjekt als Konstruktorargumente sowohl an den Consumer-Thread als auch an den Producer-Thread übergeben werden. Dadurch werden beiden Objekten die gemeinsam genutzten Ressourcen bereitgestellt, die sie benötigen, um ihre jeweiligen Aufgaben auszuführen. Anschließend werden zwei neue Thread-Objekte erstellt, wobei die ThreadRun-Methode für jedes Objekt als Argument verwendet wird. Beim Starten der einzelnen Arbeitsthreads wird dieses Argument als Einstiegspunkt für den Thread verwendet.

Danach startet Main die beiden Arbeitsthreads mit einem Aufruf der Start-Methode:

producerThread.Start();
consumerThread.Start();

Daraufhin werden die beiden neuen Arbeitsthreads erstellt und beginnen, unabhängig vom primären Thread, der derzeit die Main-Methode ausführt, mit der asynchronen Ausführung. Als Nächstes unterbricht Main den primären Thread mit einem Aufruf der Sleep-Methode. Die Methode unterbricht den derzeit ausführenden Thread für eine bestimmte Anzahl von Millisekunden. Nach Ablauf dieses Intervalls wird Main erneut aktiviert und zeigt den Inhalt der Warteschlange an. Main wiederholt dies für vier Iterationen:

for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

Durch Aufruf der Set-Methode des Ereignisses "Thread beenden" signalisiert Main den Arbeitsthreads schließlich, dass sie beendet werden, und ruft anschließend für jeden Arbeitsthread die Join-Methode auf, um den primären Thread zu blockieren, bis der jeweilige Arbeitsthread auf das Ereignis reagiert hat und beendet wurde.

Es gibt ein letztes Beispiel für die Threadsynchronisierung: die ShowQueueContents-Methode. Wie der Consumer- und der Producerthread erlangt diese Methode mithilfe von lock exklusiven Zugriff auf die Warteschlange. In diesem Fall ist der exklusive Zugriff jedoch sehr wichtig, da ShowQueueContents die gesamte Auflistung darstellt. Bei der Darstellung der gesamten Auflistung ist die Gefahr der Beschädigung von Daten durch asynchrone Vorgänge besonders hoch, da der Inhalt der gesamten Auflistung traversiert wird.

Beachten Sie, dass ShowQueueContents vom primären Thread ausgeführt wird, da die Methode von Main aufgerufen wird. Das bedeutet, dass diese Methode, wenn sie exklusiven Zugriff auf das Warteschlangenelement erlangt, den Zugriff auf die Warteschlange für den Producer- und den Consumerthread blockiert. ShowQueueContents sperrt die Warteschlange und listet den Inhalt auf:

private static void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

Im Folgenden finden Sie das vollständige Beispiel.

Beispiel

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
public class Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class ThreadSyncSample
{
    private static void ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    static void Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}
Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items

Siehe auch

Aufgaben

Technologiebeispiel für Monitor-Synchronisierung

Technologiebeispiel für Wait-Synchronisierung

Konzepte

C#-Programmierhandbuch

Referenz

Threadsynchronisierung (C#-Programmierhandbuch)

Thread

lock-Anweisung (C#-Referenz)

AutoResetEvent

ManualResetEvent

Set

Join

WaitOne

WaitAll

Queue

ICollection

Start

Sleep

WaitHandle

EventWaitHandle