方法 : Producer と (C# および Visual Basic) のコンシューマー スレッドを同期させる

[このドキュメントはプレビュー版であり、後のリリースで変更されることがあります。 空白のトピックは、プレースホルダーとして挿入されています。]

次の例の lock (C#) または SyncLock (Visual Basic の場合) のキーワード、および AutoResetEventManualResetEvent クラスを使用して、プライマリ スレッドと 2 つのワーカー スレッドのスレッドの同期に示します。 詳細については、「lock Statement (C# Reference)」または「SyncLock ステートメント」を参照してください。

この例は 2 つの補助、またはワーカー スレッドを作成します。 1 つのスレッドの要素を生成し、スレッドセーフでない汎用キューに格納します。 詳細については、Queue<T> を参照してください。 他のスレッドは、このキューから項目を消費します。 さらに、プライマリ スレッドは、定期的に内容を表示、キューの 3 つのスレッドがキューにアクセスできるようにします。 lock SyncLock キーワードを使用、キューの状態が壊れていないことを確認する、キューへのアクセスを同期します。

lock SyncLock キーワードを使用する同時アクセスを防止だけで、他に追加の同期は 2 つのイベント オブジェクトによって提供されます。 1 つのワーカー スレッドを終了するシグナルを使用され、新しい項目がキューに追加されたときのコンシューマー スレッドに通知するプロデューサー スレッドによってに使用、されます。 これらの 2 つのイベント オブジェクトは、SyncEvents という名前のクラスにカプセル化します。 これにより、コンシューマーとプロデューサー スレッドを簡単に表現するオブジェクトに渡すイベント。 SyncEvents クラスように定義されます。

                        Public
                        Class SyncEvents
    SubNew()
        _newItemEvent = New AutoResetEvent(False)
        _exitThreadEvent = New ManualResetEvent(False)
        _eventArray(0) = _newItemEvent
        _eventArray(1) = _exitThreadEvent
    EndSubPublicReadOnlyProperty ExitThreadEvent() As EventWaitHandle
        GetReturn _exitThreadEvent
        EndGetEndPropertyPublicReadOnlyProperty NewItemEvent() As EventWaitHandle
        GetReturn _newItemEvent
        EndGetEndPropertyPublicReadOnlyProperty EventArray() As WaitHandle()
        GetReturn _eventArray
        EndGetEndPropertyPrivate _newItemEvent As EventWaitHandle
    Private _exitThreadEvent As EventWaitHandle
    Private _eventArray(0 To 1) As WaitHandle
EndClass
                        public
                        class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

AutoResetEvent クラスは「新しい項目」の使用します。イベントこのイベントをコンシューマー スレッドがこのイベントに応答するたびに自動的にリセットするためです。 また、ManualResetEvent クラスは、「終了」の使用されます。イベントこのイベントがシグナル ステートに応答する複数のスレッドをするためです。 AutoResetEvent を代わりに、使用する場合、1 つのスレッドが、イベントに応答後、非シグナル状態には、イベントは戻ります。 他のスレッドが応答しないされ、ここを終了に失敗はします。

SyncEvents クラスは、2 つのイベントを作成し、2 つの異なる形式で格納します。EventWaitHandle、としては AutoResetEventManualResetEvent、および WaitHandle に基づいて配列では、基本クラスです。 コンシューマー スレッド ディスカッションにわかりますが、この配列は consumer スレッドがいずれかのイベントに応答できるように必要です。

コンシューマーおよびプロデューサー スレッドは、クラスという ConsumerProducer 名前表されます。 これらいずれは ThreadRun と呼ばれるメソッドを定義します。 これらのメソッドは、Main メソッドを作成するワーカー スレッドのエントリ ポイントとして使用されます。

ThreadRun クラスによって定義されている Producer メソッドのようにこのなります。

                        ' Producer.ThreadRun
                        Public
                        Sub ThreadRun()
    Dim count AsInteger = 0
    Dim r AsNew Random()

    DoWhileNot _syncEvents.ExitThreadEvent.WaitOne(0, False)
        SyncLockCType(_queue, ICollection).SyncRoot
            DoWhile _queue.Count < 20
                _queue.Enqueue(r.Next(0, 100))
                _syncEvents.NewItemEvent.Set()
                count += 1
            LoopEndSyncLockLoop
    Console.WriteLine("Producer thread: produced {0} items", count)
EndSub
                        // Producer.ThreadRun
                        public
                        void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

このメソッドでは終了スレッドまでループ処理します。イベントがシグナル状態です。 このイベントの状態は、WaitOne クラスで定義された ExitThreadEvent プロパティを使用 SyncEvents メソッドをテストします。 ここでは、イベントの状態が WaitOne を併用する最初の引数がゼロをメソッドはすぐに返すことを示すために、現在のスレッドをブロックせずチェックします。 WaitOne true を返す対象のイベントが現在通知されます。 ThreadRun メソッドが返すため場合に、このメソッドを実行するワーカー スレッドの終了の効果があります。

終了スレッドまでイベントが通知 Producer.ThreadStart メソッドは、20 のアイテムをキューに保持しようとしました。 項目は 0 ~ 100 整数だけです。 プライマリ スレッドとコンシューマーが、同じ時にコレクションにアクセスするを防ぐの新しいアイテムを追加する前に、コレクションをロックする必要があります。 これは、lock (C#) または (Visual Basic) SyncLock キーワードを使用して行われます。 lock SyncLock に渡された引数は、 SyncRoot フィールド ICollection インターフェイスを通じて公開されます。 このフィールドは、スレッド アクセスを同期するため特にに用意されています。 コレクションへの排他アクセスが、コードに含まれるすべての命令与えられている次の lock または SyncLock ブロックします。 各新規項目、プロデューサーは、[新しい項目"の Set メソッドの呼び出し、キューに追加します。イベントが行われます。 これは、通知、コンシューマー スレッドは、新しいアイテムを処理する中断状態から出現します。

Consumer オブジェクトも ThreadRun と呼ばれるメソッドを定義します。 ThreadRunのプロデューサーのバージョンと同様にこのメソッドは、 Main メソッドで作成されたワーカー スレッドによって実行されます。 ただし、コンシューマー バージョン ThreadRun の 2 つのイベントに応答する必要があります。 Consumer.ThreadRun メソッドのようにこのなります。

                        ' Consumer.ThreadRun
                        Public
                        Sub ThreadRun()
    Dim count AsInteger = 0

    DoWhile WaitHandle.WaitAny(_syncEvents.EventArray) <> 1
        SyncLockCType(_queue, ICollection).SyncRoot
            Dim item AsInteger = _queue.Dequeue()
        EndSyncLock
        count += 1
    Loop
    Console.WriteLine("Consumer Thread: consumed {0} items", count)
EndSub
                        // Consumer.ThreadRun
                        public
                        void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

このメソッドはシグナル状態になる、指定された配列内の待機ハンドルのいずれかになるまでのコンシューマー スレッドをブロックする WaitAny を使用します。 ここでは、これには、配列のワーカー スレッドを終了して、新しい項目がコレクションに追加されたことを示す 2 つのハンドルがあります。 WaitAny はシグナル状態になりましたが、イベントのインデックスを返します。 "新しいアイテム]イベントが、配列内の最初のゼロのインデックスを新しい項目を示します。 この場合は、終了スレッドを示す 1 のインデックスの確認します。イベントとこれが、このメソッドは品目の消費を継続するかどうかを判断する使用されます。 場合は、"新しいアイテム]イベントがシグナル状態、lock または SyncLock を使用して、コレクションに排他的にアクセスして、新しいアイテムを消費することができます。 各項目を表示しない次の使用例を生成し、大量のアイテムを消費、ので消費します。 代わりに Main を使用してが示すように、キューの内容を定期的に表示するようにします。

Main メソッドは、キューの内容を生成および消費されると SyncEvents、以前に検索をするインスタンスを作成して開始します。

                        Dim queue AsNew Queue(OfInteger)
Dim syncEvents AsNew SyncEvents()
Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

次に、Main と構成します ProducerConsumer オブジェクトを使用してワーカー スレッドで。 この手順は、ただし、作成または実際のワーカー スレッドを開始します。

                        Dim producer AsNew Producer(queue, syncEvents)
Dim consumer AsNew Consumer(queue, syncEvents)
Dim producerThread AsNew Thread(AddressOf producer.ThreadRun)
Dim consumerThread AsNew Thread(AddressOf consumer.ThreadRun)
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

キューとの同期イベント オブジェクトを Consumer とコンストラクター引数として Producer スレッドに渡されることを確認します。 これにより、両方をそれぞれの作業に必要な共有リソースを持つオブジェクト。 Thread メソッドをオブジェクトごとに引数としてを使用して 2 つの新しい ThreadRun オブジェクトが [作成されました。 各ワーカー スレッドを起動したときは、スレッドのこの引数のエントリ ポイントとしてに使用するがします。

このような Main メソッドの呼び出しで、2 つのワーカー スレッドを起動する次の Start

producerThread.Start()
consumerThread.Start()
producerThread.Start();
consumerThread.Start();

この時点で、2 つの新しいワーカー スレッドは作成され、Main メソッドを現在実行しているプライマリ スレッドから独立を非同期実行を開始します。 実際は、次のもの MainSleep メソッドの呼び出しで、プライマリ スレッドを中断するには。 メソッドを中断、現在実行中のスレッドのミリ秒数を指定します。 再アクティブこの間隔が経過したら、いったん Main 化ではどの時点でキューの内容が表示されます。 Main はこのような 4 つのイテレーションのこれを繰り返します。

                        For i AsInteger = 0 To 3
    Thread.Sleep(2500)
    ShowQueueContents(queue)
Next
                        for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

Main 信号のワーカー スレッドの終了スレッドの Set メソッドを呼び出すことによって終了の最後に、イベント、および [呼び出しまで各ワーカー スレッドはプライマリ スレッドをブロックするには各ワーカー スレッドで Join メソッドは、イベントに応答し、終了) します。

スレッドの同期の最終的な例を 1 つがあります。ShowQueueContents メソッドです。 コンシューマーとプロデューサー スレッドと同様にこの方法では、lock (C#) または SyncLock (Visual Basic の場合) を使用して、キューに排他アクセスします。 ここでは、ただし、排他アクセスは非常に重要で、コレクションのすべての ShowQueueContents に列挙するため。 コレクションを列挙するのにはコレクション全体の内容を走査するために特に発生しやすい非同期操作によってデータの破損が操作です。

その ShowQueueContentsは Main、によって呼び出されるためによって実行される、プライマリ スレッドを確認します。 つまり、項目のキューへの排他アクセスを保ちますそのときに、このメソッドでプロデューサーとコンシューマーの両方アクセスからするスレッド、キューをブロックすることです。 ShowQueueContents は、キューをロックし、内容を列挙します。

                        Private
                        Sub ShowQueueContents(ByVal q As Queue(OfInteger))
    SyncLockCType(q, ICollection).SyncRoot
        ForEach item AsIntegerIn q
            Console.Write("{0} ", item)
        NextEndSyncLock
    Console.WriteLine()
EndSub
                        private
                        static
                        void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

完全な例に従います。

使用例

                        Imports System.Threading

Module Module1
    PublicClass SyncEvents
        SubNew()
            _newItemEvent = New AutoResetEvent(False)
            _exitThreadEvent = New ManualResetEvent(False)
            _eventArray(0) = _newItemEvent
            _eventArray(1) = _exitThreadEvent
        EndSubPublicReadOnlyProperty ExitThreadEvent() As EventWaitHandle
            GetReturn _exitThreadEvent
            EndGetEndPropertyPublicReadOnlyProperty NewItemEvent() As EventWaitHandle
            GetReturn _newItemEvent
            EndGetEndPropertyPublicReadOnlyProperty EventArray() As WaitHandle()
            GetReturn _eventArray
            EndGetEndPropertyPrivate _newItemEvent As EventWaitHandle
        Private _exitThreadEvent As EventWaitHandle
        Private _eventArray(0 To 1) As WaitHandle
    EndClassPublicClass Producer
        SubNew(ByVal q As Queue(OfInteger), ByVal e As SyncEvents)
            _queue = q
            _syncEvents = e
        EndSub
        ' Producer.ThreadRunPublicSub ThreadRun()
            Dim count AsInteger = 0
            Dim r AsNew Random()

            DoWhileNot _syncEvents.ExitThreadEvent.WaitOne(0, False)
                SyncLockCType(_queue, ICollection).SyncRoot
                    DoWhile _queue.Count < 20
                        _queue.Enqueue(r.Next(0, 100))
                        _syncEvents.NewItemEvent.Set()
                        count += 1
                    LoopEndSyncLockLoop
            Console.WriteLine("Producer thread: produced {0} items", count)
        EndSubPrivate _queue As Queue(OfInteger)
        Private _syncEvents As SyncEvents
    EndClassPublicClass Consumer
        SubNew(ByVal q As Queue(OfInteger), ByVal e As SyncEvents)
            _queue = q
            _syncEvents = e
        EndSub
        ' Consumer.ThreadRunPublicSub ThreadRun()
            Dim count AsInteger = 0

            DoWhile WaitHandle.WaitAny(_syncEvents.EventArray) <> 1
                SyncLockCType(_queue, ICollection).SyncRoot
                    Dim item AsInteger = _queue.Dequeue()
                EndSyncLock
                count += 1
            Loop
            Console.WriteLine("Consumer Thread: consumed {0} items", count)
        EndSubPrivate _queue As Queue(OfInteger)
        Private _syncEvents As SyncEvents
    EndClassPrivateSub ShowQueueContents(ByVal q As Queue(OfInteger))
        SyncLockCType(q, ICollection).SyncRoot
            ForEach item AsIntegerIn q
                Console.Write("{0} ", item)
            NextEndSyncLock
        Console.WriteLine()
    EndSubSub Main()
        Dim queue AsNew Queue(OfInteger)
        Dim syncEvents AsNew SyncEvents()

        Console.WriteLine("Configuring worker threads...")
        Dim producer AsNew Producer(queue, syncEvents)
        Dim consumer AsNew Consumer(queue, syncEvents)
        Dim producerThread AsNew Thread(AddressOf producer.ThreadRun)
        Dim consumerThread AsNew Thread(AddressOf consumer.ThreadRun)

        Console.WriteLine("Launching producer and consumer threads...")
        producerThread.Start()
        consumerThread.Start()

        For i AsInteger = 0 To 3
            Thread.Sleep(2500)
            ShowQueueContents(queue)
        Next

        Console.WriteLine("Signaling threads to terminate...")
        syncEvents.ExitThreadEvent.Set()

        producerThread.Join()
        consumerThread.Join()
    EndSubEndModule
                        using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

publicclass SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
publicclass Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRunpublicvoid ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

publicclass Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRunpublicvoid ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

publicclass ThreadSyncSample
{
    privatestaticvoid ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    staticvoid Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}
Configuring worker threads... Launching producer and consumer threads... 22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87 79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59 0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21 38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92 Signalling threads to terminate... Consumer Thread: consumed 1053771 items Producer thread: produced 1053791 items 

参照

参照

スレッドの同期 (C# および Visual Basic)

Thread

SyncLock ステートメント

lock Statement (C# Reference)

AutoResetEvent

ManualResetEvent

Set

Join

WaitOne

WaitAll

Queue

ICollection

Start

Sleep

WaitHandle

EventWaitHandle