방법: 데이터 흐름 블록의 작업 Scheduler 지정

이 문서에서는 애플리케이션에서 데이터 흐름을 사용하는 경우 특정 작업 스케줄러를 연결하는 방법을 보여 줍니다. 예제에서는 Windows Forms 애플리케이션에서 System.Threading.Tasks.ConcurrentExclusiveSchedulerPair 클래스를 사용하여 판독기 작업이 활성화된 경우와 작성기 작업이 활성화된 경우를 표시합니다. 또한 TaskScheduler.FromCurrentSynchronizationContext 메서드를 사용하여 데이터 흐름 블록이 사용자 인터페이스 스레드에서 실행될 수 있도록 합니다.

참고 항목

TPL 데이터 흐름 라이브러리(System.Threading.Tasks.Dataflow 네임스페이스)는 .NET과 함께 배포되지 않습니다. Visual Studio에서 System.Threading.Tasks.Dataflow 네임스페이스를 설치하려면 프로젝트를 열고, 프로젝트 메뉴에서 NuGet 패키지 관리를 선택한 후, System.Threading.Tasks.Dataflow 패키지를 온라인으로 검색합니다. 또는 .NET Core CLI를 사용하여 설치하려면 dotnet add package System.Threading.Tasks.Dataflow를 실행합니다.

Windows Forms 애플리케이션을 만들려면

  1. Visual C# 또는 Visual Basic Windows Forms 애플리케이션 프로젝트를 만듭니다. 다음 단계에서 프로젝트의 이름은 WriterReadersWinForms입니다.

  2. 기본 폼인 Form1.cs(Visual Basic에서는 Form1.vb)의 폼 디자이너에서 네 개 CheckBox 컨트롤을 추가합니다. Text 속성을 checkBox1의 경우 Reader 1, checkBox2의 경우 Reader 2, checkBox3의 경우 Reader 3, checkBox4의 경우 Writer로 설정합니다. 각 컨트롤의 Enabled 속성을 False로 설정합니다.

  3. 폼에 Timer 컨트롤을 추가합니다. Interval 속성을 2500로 설정합니다.

데이터 흐름 기능 추가

이 단원에서는 애플리케이션에 참여하는 데이터 흐름 블록을 만드는 방법과 각 데이터 흐름 블록을 작업 스케줄러와 연결하는 방법을 설명합니다.

애플리케이션에 데이터 흐름 기능을 추가하려면

  1. 프로젝트에서 System.Threading.Tasks.Dataflow.dll에 대한 참조를 추가합니다.

  2. Form1.cs(Visual Basic에서는 Form1.vb)에 다음 using 명령문(Visual Basic에서는 Imports)이 포함되어 있는지 확인합니다.

    using System;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    using System.Windows.Forms;
    
    Imports System.Threading
    Imports System.Threading.Tasks
    Imports System.Threading.Tasks.Dataflow
    
    
  3. BroadcastBlock<T> 클래스에 Form1 데이터 멤버를 추가합니다.

    // Broadcasts values to an ActionBlock<int> object that is associated
    // with each check box.
    BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
    
    ' Broadcasts values to an ActionBlock<int> object that is associated
    ' with each check box.
    Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
    
  4. Form1 생성자에서 InitializeComponent를 호출한 후 ActionBlock<TInput> 개체의 상태를 전환하는 CheckBox 개체를 만듭니다.

    // Create an ActionBlock<CheckBox> object that toggles the state
    // of CheckBox objects.
    // Specifying the current synchronization context enables the
    // action to run on the user-interface thread.
    var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
    {
       checkBox.Checked = !checkBox.Checked;
    },
    new ExecutionDataflowBlockOptions
    {
       TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
    });
    
    ' Create an ActionBlock<CheckBox> object that toggles the state
    ' of CheckBox objects.
    ' Specifying the current synchronization context enables the 
    ' action to run on the user-interface thread.
    Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
    
  5. Form1 생성자에서 ConcurrentExclusiveSchedulerPair 개체와 4개의 ActionBlock<TInput> 개체를 만듭니다. 이때 각 ActionBlock<TInput> 개체에 대해 CheckBox 개체를 하나씩 만듭니다. 각 ActionBlock<TInput> 개체에 대해 ExecutionDataflowBlockOptions 속성이 판독기의 경우 TaskScheduler 속성으로 설정되고 작성기의 경우 ConcurrentScheduler 속성으로 설정된 ExclusiveScheduler 개체를 지정합니다.

    // Create a ConcurrentExclusiveSchedulerPair object.
    // Readers will run on the concurrent part of the scheduler pair.
    // The writer will run on the exclusive part of the scheduler pair.
    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
    
    // Create an ActionBlock<int> object for each reader CheckBox object.
    // Each ActionBlock<int> object represents an action that can read
    // from a resource in parallel to other readers.
    // Specifying the concurrent part of the scheduler pair enables the
    // reader to run in parallel to other actions that are managed by
    // that scheduler.
    var readerActions =
       from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
       select new ActionBlock<int>(milliseconds =>
       {
          // Toggle the check box to the checked state.
          toggleCheckBox.Post(checkBox);
    
          // Perform the read action. For demonstration, suspend the current
          // thread to simulate a lengthy read operation.
          Thread.Sleep(milliseconds);
    
          // Toggle the check box to the unchecked state.
          toggleCheckBox.Post(checkBox);
       },
       new ExecutionDataflowBlockOptions
       {
          TaskScheduler = taskSchedulerPair.ConcurrentScheduler
       });
    
    // Create an ActionBlock<int> object for the writer CheckBox object.
    // This ActionBlock<int> object represents an action that writes to
    // a resource, but cannot run in parallel to readers.
    // Specifying the exclusive part of the scheduler pair enables the
    // writer to run in exclusively with respect to other actions that are
    // managed by the scheduler pair.
    var writerAction = new ActionBlock<int>(milliseconds =>
    {
       // Toggle the check box to the checked state.
       toggleCheckBox.Post(checkBox4);
    
       // Perform the write action. For demonstration, suspend the current
       // thread to simulate a lengthy write operation.
       Thread.Sleep(milliseconds);
    
       // Toggle the check box to the unchecked state.
       toggleCheckBox.Post(checkBox4);
    },
    new ExecutionDataflowBlockOptions
    {
       TaskScheduler = taskSchedulerPair.ExclusiveScheduler
    });
    
    // Link the broadcaster to each reader and writer block.
    // The BroadcastBlock<T> class propagates values that it
    // receives to all connected targets.
    foreach (var readerAction in readerActions)
    {
       broadcaster.LinkTo(readerAction);
    }
    broadcaster.LinkTo(writerAction);
    
    ' Create a ConcurrentExclusiveSchedulerPair object.
    ' Readers will run on the concurrent part of the scheduler pair.
    ' The writer will run on the exclusive part of the scheduler pair.
    Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()
    
    ' Create an ActionBlock<int> object for each reader CheckBox object.
    ' Each ActionBlock<int> object represents an action that can read 
    ' from a resource in parallel to other readers.
    ' Specifying the concurrent part of the scheduler pair enables the 
    ' reader to run in parallel to other actions that are managed by 
    ' that scheduler.
    Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
                        Select New ActionBlock(Of Integer)(Sub(milliseconds)
                                               ' Toggle the check box to the checked state.
                                               ' Perform the read action. For demonstration, suspend the current
                                               ' thread to simulate a lengthy read operation.
                                               ' Toggle the check box to the unchecked state.
                                               toggleCheckBox.Post(checkBox)
                                                               Thread.Sleep(milliseconds)
                                                               toggleCheckBox.Post(checkBox)
                                                           End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})
    
    ' Create an ActionBlock<int> object for the writer CheckBox object.
    ' This ActionBlock<int> object represents an action that writes to 
    ' a resource, but cannot run in parallel to readers.
    ' Specifying the exclusive part of the scheduler pair enables the 
    ' writer to run in exclusively with respect to other actions that are 
    ' managed by the scheduler pair.
    Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
                                                       ' Toggle the check box to the checked state.
                                                       ' Perform the write action. For demonstration, suspend the current
                                                       ' thread to simulate a lengthy write operation.
                                                       ' Toggle the check box to the unchecked state.
                                                       toggleCheckBox.Post(checkBox4)
                                                       Thread.Sleep(milliseconds)
                                                       toggleCheckBox.Post(checkBox4)
                                                   End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})
    
    ' Link the broadcaster to each reader and writer block.
    ' The BroadcastBlock<T> class propagates values that it 
    ' receives to all connected targets.
    For Each readerAction In readerActions
        broadcaster.LinkTo(readerAction)
    Next readerAction
    broadcaster.LinkTo(writerAction)
    
  6. Form1 생성자에서 Timer 개체를 시작합니다.

    // Start the timer.
    timer1.Start();
    
    ' Start the timer.
    timer1.Start()
    
  7. 기본 폼의 폼 디자이너에서 타이머에 대한 Tick 이벤트의 이벤트 처리기를 만듭니다.

  8. 타이머에 대한 Tick 이벤트를 구현합니다.

    // Event handler for the timer.
    private void timer1_Tick(object sender, EventArgs e)
    {
       // Post a value to the broadcaster. The broadcaster
       // sends this message to each target.
       broadcaster.Post(1000);
    }
    
    ' Event handler for the timer.
    Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
        ' Post a value to the broadcaster. The broadcaster
        ' sends this message to each target. 
        broadcaster.Post(1000)
    End Sub
    

toggleCheckBox 데이터 흐름 블록이 사용자 인터페이스에서 동작하기 때문에 이 작업은 사용자 인터페이스 스레드에서 발생해야 합니다. 이를 위해 생성 중에 이 개체는 ExecutionDataflowBlockOptions 속성이 TaskScheduler로 설정된 TaskScheduler.FromCurrentSynchronizationContext 개체를 제공합니다. FromCurrentSynchronizationContext 메서드는 현재 동기화 컨텍스트에서 작업을 수행하는 TaskScheduler 개체를 만듭니다. Form1 생성자가 사용자 인터페이스 스레드에서 호출되기 때문에 toggleCheckBox 데이터 흐름 블록에 대한 작업도 사용자 인터페이스 스레드에서 실행됩니다.

또한 이 예제에서는 ConcurrentExclusiveSchedulerPair 클래스를 사용하여 일부 데이터 흐름 블록이 동시에 동작할 수 있도록 하고 다른 데이터 흐름 블록이 동일한 ConcurrentExclusiveSchedulerPair 개체에서 실행되는 다른 모든 데이터 흐름 블록과 독립적으로 동작할 수 있도록 합니다. 이 방법은 여러 데이터 흐름 블록이 리소스를 공유할 때 해당 리소스에 대한 액세스를 수동으로 동기화할 필요를 없애기 위해 일부 데이터 흐름 블록에 해당 리소스에 대한 배타적 액세스 권한이 필요한 경우에 유용합니다. 수동 동기화를 제거하면 코드의 효율성이 향상될 수 있습니다.

예시

다음 예제에서는 Form1.cs(Visual Basic에서는 Form1.vb)의 전체 코드를 보여 줍니다.

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;

namespace WriterReadersWinForms
{
   public partial class Form1 : Form
   {
      // Broadcasts values to an ActionBlock<int> object that is associated
      // with each check box.
      BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);

      public Form1()
      {
         InitializeComponent();

         // Create an ActionBlock<CheckBox> object that toggles the state
         // of CheckBox objects.
         // Specifying the current synchronization context enables the
         // action to run on the user-interface thread.
         var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
         {
            checkBox.Checked = !checkBox.Checked;
         },
         new ExecutionDataflowBlockOptions
         {
            TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
         });

         // Create a ConcurrentExclusiveSchedulerPair object.
         // Readers will run on the concurrent part of the scheduler pair.
         // The writer will run on the exclusive part of the scheduler pair.
         var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();

         // Create an ActionBlock<int> object for each reader CheckBox object.
         // Each ActionBlock<int> object represents an action that can read
         // from a resource in parallel to other readers.
         // Specifying the concurrent part of the scheduler pair enables the
         // reader to run in parallel to other actions that are managed by
         // that scheduler.
         var readerActions =
            from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
            select new ActionBlock<int>(milliseconds =>
            {
               // Toggle the check box to the checked state.
               toggleCheckBox.Post(checkBox);

               // Perform the read action. For demonstration, suspend the current
               // thread to simulate a lengthy read operation.
               Thread.Sleep(milliseconds);

               // Toggle the check box to the unchecked state.
               toggleCheckBox.Post(checkBox);
            },
            new ExecutionDataflowBlockOptions
            {
               TaskScheduler = taskSchedulerPair.ConcurrentScheduler
            });

         // Create an ActionBlock<int> object for the writer CheckBox object.
         // This ActionBlock<int> object represents an action that writes to
         // a resource, but cannot run in parallel to readers.
         // Specifying the exclusive part of the scheduler pair enables the
         // writer to run in exclusively with respect to other actions that are
         // managed by the scheduler pair.
         var writerAction = new ActionBlock<int>(milliseconds =>
         {
            // Toggle the check box to the checked state.
            toggleCheckBox.Post(checkBox4);

            // Perform the write action. For demonstration, suspend the current
            // thread to simulate a lengthy write operation.
            Thread.Sleep(milliseconds);

            // Toggle the check box to the unchecked state.
            toggleCheckBox.Post(checkBox4);
         },
         new ExecutionDataflowBlockOptions
         {
            TaskScheduler = taskSchedulerPair.ExclusiveScheduler
         });

         // Link the broadcaster to each reader and writer block.
         // The BroadcastBlock<T> class propagates values that it
         // receives to all connected targets.
         foreach (var readerAction in readerActions)
         {
            broadcaster.LinkTo(readerAction);
         }
         broadcaster.LinkTo(writerAction);

         // Start the timer.
         timer1.Start();
      }

      // Event handler for the timer.
      private void timer1_Tick(object sender, EventArgs e)
      {
         // Post a value to the broadcaster. The broadcaster
         // sends this message to each target.
         broadcaster.Post(1000);
      }
   }
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow


Namespace WriterReadersWinForms
    Partial Public Class Form1
        Inherits Form
        ' Broadcasts values to an ActionBlock<int> object that is associated
        ' with each check box.
        Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)

        Public Sub New()
            InitializeComponent()

            ' Create an ActionBlock<CheckBox> object that toggles the state
            ' of CheckBox objects.
            ' Specifying the current synchronization context enables the 
            ' action to run on the user-interface thread.
            Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})

            ' Create a ConcurrentExclusiveSchedulerPair object.
            ' Readers will run on the concurrent part of the scheduler pair.
            ' The writer will run on the exclusive part of the scheduler pair.
            Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()

            ' Create an ActionBlock<int> object for each reader CheckBox object.
            ' Each ActionBlock<int> object represents an action that can read 
            ' from a resource in parallel to other readers.
            ' Specifying the concurrent part of the scheduler pair enables the 
            ' reader to run in parallel to other actions that are managed by 
            ' that scheduler.
            Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
                                Select New ActionBlock(Of Integer)(Sub(milliseconds)
                                                       ' Toggle the check box to the checked state.
                                                       ' Perform the read action. For demonstration, suspend the current
                                                       ' thread to simulate a lengthy read operation.
                                                       ' Toggle the check box to the unchecked state.
                                                       toggleCheckBox.Post(checkBox)
                                                                       Thread.Sleep(milliseconds)
                                                                       toggleCheckBox.Post(checkBox)
                                                                   End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})

            ' Create an ActionBlock<int> object for the writer CheckBox object.
            ' This ActionBlock<int> object represents an action that writes to 
            ' a resource, but cannot run in parallel to readers.
            ' Specifying the exclusive part of the scheduler pair enables the 
            ' writer to run in exclusively with respect to other actions that are 
            ' managed by the scheduler pair.
            Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
                                                               ' Toggle the check box to the checked state.
                                                               ' Perform the write action. For demonstration, suspend the current
                                                               ' thread to simulate a lengthy write operation.
                                                               ' Toggle the check box to the unchecked state.
                                                               toggleCheckBox.Post(checkBox4)
                                                               Thread.Sleep(milliseconds)
                                                               toggleCheckBox.Post(checkBox4)
                                                           End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})

            ' Link the broadcaster to each reader and writer block.
            ' The BroadcastBlock<T> class propagates values that it 
            ' receives to all connected targets.
            For Each readerAction In readerActions
                broadcaster.LinkTo(readerAction)
            Next readerAction
            broadcaster.LinkTo(writerAction)

            ' Start the timer.
            timer1.Start()
        End Sub

        ' Event handler for the timer.
        Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
            ' Post a value to the broadcaster. The broadcaster
            ' sends this message to each target. 
            broadcaster.Post(1000)
        End Sub
    End Class
End Namespace

참고 항목