방법: 공급자-소비자 데이터 흐름 패턴 구현

이 문서에서는 TPL 데이터 흐름 라이브러리를 사용하여 생산자-소비자 패턴을 구현하는 방법을 알아봅니다. 이 패턴에서 생산자는 메시지 블록에 메시지를 보내고 소비자는 해당 블록에서 메시지를 읽습니다.

참고 항목

TPL 데이터 흐름 라이브러리(System.Threading.Tasks.Dataflow 네임스페이스)는 .NET과 함께 배포되지 않습니다. Visual Studio에서 System.Threading.Tasks.Dataflow 네임스페이스를 설치하려면 프로젝트를 열고, 프로젝트 메뉴에서 NuGet 패키지 관리를 선택한 후, System.Threading.Tasks.Dataflow 패키지를 온라인으로 검색합니다. 또는 .NET Core CLI를 사용하여 설치하려면 dotnet add package System.Threading.Tasks.Dataflow를 실행합니다.

예시

다음 예제에서는 데이터 흐름을 사용하는 기본적인 생산자-소비자 모델을 보여 줍니다. Produce 메서드는 임의의 데이터 바이트를 포함하는 배열을 System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 개체에 쓰고 Consume 메서드는 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 개체에서 바이트를 읽습니다. 파생 형식 대신 ISourceBlock<TOutput>ITargetBlock<TInput> 인터페이스에서 작업함으로써 다양한 데이터 흐름 블록 형식에서 작업할 수 있는 재사용 가능한 코드를 작성할 수 있습니다. 이 예제에서는 BufferBlock<T> 클래스를 사용합니다. BufferBlock<T> 클래스가 소스 블록과 대상 블록 역할을 하고, 생산자와 소비자는 공유 개체를 사용하여 데이터를 전송할 수 있습니다.

Produce 메서드는 루프에서 Post 메서드를 호출하여 대상 블록에 동기적으로 데이터를 씁니다. Produce 메서드는 모든 데이터를 대상 블록에 쓴 후 Complete 메서드를 호출하여 블록에 사용 가능한 추가 데이터가 더 이상 없음을 나타냅니다. Consume 메서드는 asyncawait(Visual Basic에서는 AsyncAwait) 연산자를 사용하여 ISourceBlock<TOutput> 개체에서 받은 총 바이트 수를 비동기적으로 컴퓨팅합니다. 비동기적으로 작동하기 위해 Consume 메서드는 OutputAvailableAsync 메서드를 호출하여 소스 블록에 사용 가능한 데이터가 있을 때와 소스 블록에 사용 가능한 추가 데이터가 더 이상 없을 때 알림을 받습니다.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

강력한 프로그래밍

앞의 예제에서는 소비자를 하나만 사용하여 소스 데이터를 처리합니다. 애플리케이션에 여러 소비자가 있는 경우 다음 예제와 같이 TryReceive 메서드를 사용하여 소스 블록에서 데이터를 읽습니다.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

TryReceive 메서드는 사용 가능한 데이터가 없을 때 False를 반환합니다. 여러 소비자가 소스 블록에 동시에 액세스해야 하는 경우 이 메커니즘은 OutputAvailableAsync 호출 후에 데이터를 계속 사용할 수 있도록 보장합니다.

참고 항목