.NET の System.IO.PipelinesSystem.IO.Pipelines in .NET

System.IO.Pipelines は、.NET でハイ パフォーマンスの I/O をより簡単に行えるように設計された新しいライブラリです。System.IO.Pipelines is a new library that is designed to make it easier to do high-performance I/O in .NET. これは、すべての .NET 実装で動作する .NET Standard を対象とするライブラリです。It’s a library targeting .NET Standard that works on all .NET implementations.

System.IO.Pipelines によって解決される問題What problem does System.IO.Pipelines solve

ストリーミング データを解析するアプリは、多くの特殊な通常とは異なるコード フローを持つ定型コードで構成されます。Apps that parse streaming data are composed of boilerplate code having many specialized and unusual code flows. 定型および特殊なケースのコードは複雑で、保守が困難です。The boilerplate and special case code is complex and difficult to maintain.

System.IO.Pipelines は、以下のようになるように設計されています。System.IO.Pipelines was architected to:

  • ハイ パフォーマンスのストリーミング データ解析を実現する。Have high performance parsing streaming data.
  • コードの複雑さを軽減する。Reduce code complexity.

次のコードは、クライアントから ('\n' で区切られた) 行区切りメッセージを受信する TCP サーバーでは一般的なものです。The following code is typical for a TCP server that receives line-delimited messages (delimited by '\n') from a client:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

上記のコードには、以下のようないくつかの問題があります。The preceding code has several problems:

  • メッセージ全体 (行の終わり) が、ReadAsync への 1 回の呼び出しで受信されない場合がある。The entire message (end of line) might not be received in a single call to ReadAsync.
  • stream.ReadAsync の結果が無視される。It's ignoring the result of stream.ReadAsync. stream.ReadAsync で、読み取られたデータの量が返される。stream.ReadAsync returns how much data was read.
  • 複数の行が 1 回の ReadAsync 呼び出しで読み取られるケースが処理されない。It doesn't handle the case where multiple lines are read in a single ReadAsync call.
  • 読み取りごとに byte 配列が割り当てられる。It allocates a byte array with each read.

上記の問題を解決するには、次の変更が必要です。To fix the preceding problems, the following changes are required:

  • 改行が検出されるまで、受信データをバッファーに格納します。Buffer the incoming data until a new line is found.

  • バッファーで返されたすべての行を解析します。Parse all the lines returned in the buffer.

  • 行が 1 KB (1024 バイト) より大きい可能性があります。It's possible that the line is bigger than 1 KB (1024 bytes). コードでは、バッファー内の完全な行に収まるように、区切り記号が見つかるまで入力バッファーのサイズを変更する必要があります。The code needs to resize the input buffer until the delimiter is found in order to fit the complete line inside the buffer.

    • バッファーのサイズが変更された場合、入力により長い行が表示されると、より多くのバッファー コピーが作成されます。If the buffer is resized, more buffer copies are made as longer lines appear in the input.
    • 無駄な領域を減らすには、行の読み取りに使用されるバッファーを小さくします。To reduce wasted space, compact the buffer used for reading lines.
  • メモリが繰り返し割り当てられないようにするために、バッファー プーリングの使用を検討します。Consider using buffer pooling to avoid allocating memory repeatedly.

  • 次のコードでは、これらの問題の一部に対処します。The following code addresses some of these problems:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

上記のコードは複雑で、特定されたすべての問題には対処していません。The previous code is complex and doesn't address all the problems identified. ハイ パフォーマンス ネットワークは、通常、パフォーマンスを最大化するために非常に複雑なコードを記述することを意味します。High-performance networking usually means writing very complex code to maximize performance. System.IO.Pipelines は、この種のコードをより簡単に記述できるように設計されています。System.IO.Pipelines was designed to make writing this type of code easier.

パイプPipe

Pipe クラスを使用して、PipeWriter/PipeReader ペアを作成できます。The Pipe class can be used to create a PipeWriter/PipeReader pair. PipeWriter に書き込まれたすべてのデータは、PipeReader で利用できます。All data written into the PipeWriter is available in the PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

パイプの基本的な使用方法Pipe basic usage

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

次の 2 つのループがあります。There are two loops:

  • FillPipeAsync では Socket から読み取り、PipeWriter に書き込みます。FillPipeAsync reads from the Socket and writes to the PipeWriter.
  • ReadPipeAsync では PipeReader から読み取り、受信行を解析します。ReadPipeAsync reads from the PipeReader and parses incoming lines.

明示的なバッファーは割り当てられていません。There are no explicit buffers allocated. すべてのバッファー管理は、PipeReaderPipeWriter の実装に委任されます。All buffer management is delegated to the PipeReader and PipeWriter implementations. バッファー管理を委任することで、ビジネス ロジックのみに焦点を当てるコードがより簡単に消費されるようになります。Delegating buffer management makes it easier for consuming code to focus solely on the business logic.

最初のループでは、次のようになります。In the first loop:

2 番目のループでは、PipeWriter によって書き込まれたバッファーが PipeReader で消費されます。In the second loop, the PipeReader consumes the buffers written by PipeWriter. バッファーはソケットから取得されます。The buffers come from the socket. PipeReader.ReadAsync の呼び出しでは、次のようになります。The call to PipeReader.ReadAsync:

  • 次の 2 つの重要な情報を含む、ReadResult を返します。Returns a ReadResult that contains two important pieces of information:

    • ReadOnlySequence<byte> の形式で読み取られたデータ。The data that was read in the form of ReadOnlySequence<byte>.
    • データの終わり (EOF) に到達したかどうかを示すブール値 IsCompletedA boolean IsCompleted that indicates if the end of data (EOF) has been reached.

行の終わり (EOL) の区切り記号が検出され、行が解析された後は、次のようになります。After finding the end of line (EOL) delimiter and parsing the line:

  • ロジックでバッファーが処理され、既に処理されているものがスキップされます。The logic processes the buffer to skip what's already processed.
  • PipeReader.AdvanceTo は、どのくらいの量のデータが使用され、検査されたかを PipeReader に示すために呼び出されます。PipeReader.AdvanceTo is called to tell the PipeReader how much data has been consumed and examined.

リーダーとライターのループは、Complete を呼び出すことによって終了します。The reader and writer loops end by calling Complete. Complete は、基になるパイプで割り当てられたメモリを解放できるようにします。Complete lets the underlying Pipe release the memory it allocated.

バックプレッシャとフロー制御Backpressure and flow control

読み取りと解析を連動させるのが理想的です。Ideally, reading and parsing work together:

  • 書き込みスレッドでは、ネットワークからのデータを消費し、それをバッファーに格納します。The writing thread consumes data from the network and puts it in buffers.
  • 解析スレッドでは、適切なデータ構造の構築を担当します。The parsing thread is responsible for constructing the appropriate data structures.

通常、解析には、ネットワークからデータのブロックをコピーするだけの場合より、時間がかかります。Typically, parsing takes more time than just copying blocks of data from the network:

  • 読み取りスレッドは解析スレッドより前になります。The reading thread gets ahead of the parsing thread.
  • 読み取りスレッドの速度を落とすか、解析スレッド用のデータを格納するためにより多くのメモリを割り当てる必要があります。The reading thread has to either slow down or allocate more memory to store the data for the parsing thread.

最適なパフォーマンスが得られるように、頻繁な一時停止間のバランスが保たれ、より多くのメモリが割り当てられます。For optimal performance, there's a balance between frequent pauses and allocating more memory.

この問題を解決するために、Pipe には、データのフローを制御するための次の 2 つの設定があります。To solve the preceding problem, the Pipe has two settings to control the flow of data:

  • PauseWriterThreshold:FlushAsync の呼び出しが一時停止する前に、バッファーに格納する必要があるデータ量を判別します。PauseWriterThreshold: Determines how much data should be buffered before calls to FlushAsync pause.
  • ResumeWriterThreshold:PipeWriter.FlushAsync の呼び出しが再開される前に、リーダーが監視する必要があるデータの量を判別します。ResumeWriterThreshold: Determines how much data the reader has to observe before calls to PipeWriter.FlushAsync resume.

ResumeWriterThreshold と PauseWriterThreshold を含む図

PipeWriter.FlushAsync:PipeWriter.FlushAsync:

  • Pipe のデータ量が PauseWriterThreshold を超えたときに、不完全な ValueTask<FlushResult> を返します。Returns an incomplete ValueTask<FlushResult> when the amount of data in the Pipe crosses PauseWriterThreshold.
  • ResumeWriterThreshold より低くなったときに、ValueTask<FlushResult> を完了します。Completes ValueTask<FlushResult> when it becomes lower than ResumeWriterThreshold.

2 つの値は、1 つの値が使用された場合に発生する可能性がある、急速な循環を防ぐために使用されます。Two values are used to prevent rapid cycling, which can occur if one value is used.

使用例Examples

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeSchedulerPipeScheduler

通常、async および await を使用する場合、非同期コードは TaskScheduler または現在の SynchronizationContext で再開されます。Typically when using async and await, asynchronous code resumes on either on a TaskScheduler or on the current SynchronizationContext.

I/O を行う場合は、I/O が実行される場所をきめ細かく制御することが重要です。When doing I/O, it's important to have fine-grained control over where the I/O is performed. この制御により、CPU キャッシュを効果的に利用できます。This control allows taking advantage of CPU caches effectively. 効率的なキャッシュは、Web サーバーなどのハイ パフォーマンス アプリに不可欠です。Efficient caching is critical for high-performance apps like web servers. PipeScheduler では、非同期コールバックが実行される場所を制御できます。PipeScheduler provides control over where asynchronous callbacks run. 既定では:By default:

  • 現在の SynchronizationContext が使用されます。The current SynchronizationContext is used.
  • SynchronizationContext がない場合は、スレッド プールを使用してコールバックを実行します。If there's no SynchronizationContext, it uses the thread pool to run callbacks.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object> action, object state)
    {
        _queue.Add((action, state));
    }
}

PipeScheduler.ThreadPool は、スレッド プールへのコールバックをキューに登録する PipeScheduler の実装です。PipeScheduler.ThreadPool is the PipeScheduler implementation that queues callbacks to the thread pool. PipeScheduler.ThreadPool は既定値であり、一般的に最適な選択肢です。PipeScheduler.ThreadPool is the default and generally the best choice. PipeScheduler.Inline を使用すると、デッドロックなどの意図しない結果となる可能性があります。PipeScheduler.Inline can cause unintended consequences such as deadlocks.

パイプのリセットPipe reset

多くの場合、Pipe オブジェクトを再利用するのが効率的です。It's frequently efficient to reuse the Pipe object. パイプをリセットするには、PipeReaderPipeWriter の両方が完了したときに PipeReader Reset を呼び出します。To reset the pipe, call PipeReader Reset when both the PipeReader and PipeWriter are complete.

PipeReaderPipeReader

PipeReader では、呼び出し元の代わりにメモリを管理します。PipeReader manages memory on the caller's behalf. PipeReader.ReadAsync を呼び出した後に、常に PipeReader.AdvanceTo を呼び出します。Always call PipeReader.AdvanceTo after calling PipeReader.ReadAsync. これにより、PipeReader では、呼び出し元がメモリを使用して実行されるタイミングを把握し、追跡できるようになります。This lets the PipeReader know when the caller is done with the memory so that it can be tracked. PipeReader.ReadAsync から返された ReadOnlySequence<byte> が有効なのは、PipeReader.AdvanceTo が呼び出されるまでのみとなります。The ReadOnlySequence<byte> returned from PipeReader.ReadAsync is only valid until the call the PipeReader.AdvanceTo. PipeReader.AdvanceTo を呼び出した後、ReadOnlySequence<byte> を使用することはできません。It's illegal to use ReadOnlySequence<byte> after calling PipeReader.AdvanceTo.

PipeReader.AdvanceTo では、次の 2 つの SequencePosition 引数を受け取ります。PipeReader.AdvanceTo takes two SequencePosition arguments:

  • 最初の引数では、消費されたメモリの量を判別します。The first argument determines how much memory was consumed.
  • 2 番目の引数では、監視されたバッファーの量を判別します。The second argument determines how much of the buffer was observed.

データを消費済みとしてマークすることは、パイプでメモリを基になるバッファープ ールに返すことができることを意味します。Marking data as consumed means that the pipe can return the memory to the underlying buffer pool. データを監視済みとしてマークすると、PipeReader.ReadAsync の次の呼び出しで行われる内容が制御されます。Marking data as observed controls what the next call to PipeReader.ReadAsync does. すべてを監視済みとしてマークすることは、パイプにデータがさらに書き込まれるまで、PipeReader.ReadAsync の次の呼び出しでは何も返されないことを意味します。Marking everything as observed means that the next call to PipeReader.ReadAsync won't return until there's more data written to the pipe. それ以外の値を指定すると、PipeReader.ReadAsync の次の呼び出しで、監視されたデータ 監視されていないデータがすぐに返されますが、既に消費されているデータは除きます。Any other value will make the next call to PipeReader.ReadAsync return immediately with the observed and unobserved data, but data that has already been consumed.

ストリーミング データの読み取りシナリオRead streaming data scenarios

ストリーミング データを読み取ろうとすると発生する、一般的なパターンがいくつかあります。There are a couple of typical patterns that emerge when trying to read streaming data:

  • 指定したデータのストリームで、1 つのメッセージを解析する。Given a stream of data, parse a single message.
  • 指定したデータのストリームで、使用可能なメッセージをすべて解析する。Given a stream of data, parse all available messages.

次の例では、ReadOnlySequence<byte> からのメッセージを解析するために TryParseMessage メソッドを使用しています。The following examples use the TryParseMessage method for parsing messages from a ReadOnlySequence<byte>. TryParseMessageでは 1 つのメッセージを解析し、入力バッファーを更新して、解析されたメッセージをバッファーからトリミングします。TryParseMessage parses a single message and update the input buffer to trim the parsed message from the buffer. TryParseMessage は .NET の一部ではありません。これは、次のセクションで使用されるユーザーが記述するメソッドです。TryParseMessage is not part of .NET, it's a user written method used in the following sections.

bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);

1 つのメッセージを読み取るRead a single message

次のコードでは、PipeReader からの 1 つのメッセージを読み取り、呼び出し元に返します。The following code reads a single message from a PipeReader and returns it to the caller.

async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed 
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseMessage(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start as the 
                // parsed buffer as consumed. TryParseMessage trims the buffer to 
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call 
                // to ReadSingleMessageAsync will process the next message if there's 
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

上記のコードでは次の操作が行われます。The preceding code:

  • 1 つのメッセージを解析します。Parses a single message.
  • 消費された SequencePosition と検査された SequencePosition を更新し、トリミングされた入力バッファーの先頭を指すようにします。Updates the consumed SequencePosition and examined SequencePosition to point to the start of the trimmed input buffer.

2 つの SequencePosition 引数が更新されます。これは、TryParseMessage で解析されたメッセージが入力バッファーから削除されるためです。The two SequencePosition arguments are updated because TryParseMessage removes the parsed message from the input buffer. 一般に、バッファーからの 1 つのメッセージを解析する場合、検査位置は次のいずれかである必要があります。Generally, when parsing a single message from the buffer, the examined position should be one of the following:

  • メッセージの終わり。The end of the message.
  • メッセージが検出されなかった場合は、受信されたバッファーの終わり。The end of the received buffer if no message was found.

1 つのメッセージ ケースでは、エラーが発生する可能性が最も高くなります。The single message case has the most potential for errors. examined に間違った値を渡すと、メモリ不足の例外または無限ループが発生する可能性があります。Passing the wrong values to examined can result in an out of memory exception or an infinite loop. 詳細については、この記事の「PipeReader の一般的な問題」セクションを参照してください。For more information, see the PipeReader common problems section in this article.

複数のメッセージの読み取りReading multiple messages

次のコードでは、PipeReader からのすべてのメッセージを読み取り、それぞれに対して ProcessMessageAsync を呼び出します。The following code reads all messages from a PipeReader and calls ProcessMessageAsync on each.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each 
                // iteration.
                while (TryParseMessage(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the 
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

キャンセルCancellation

PipeReader.ReadAsync:PipeReader.ReadAsync:

  • CancellationToken を渡すことをサポートします。Supports passing a CancellationToken.
  • 読み取りが保留中のときに CancellationToken が取り消された場合は、OperationCanceledException をスローします。Throws an OperationCanceledException if the CancellationToken is canceled while there's a read pending.
  • PipeReader.CancelPendingRead を使用して現在の読み取り操作を取り消す方法をサポートします。これにより、例外の発生を回避できます。Supports a way to cancel the current read operation via PipeReader.CancelPendingRead, which avoids raising an exception. PipeReader.CancelPendingRead を呼び出すと、PipeReader.ReadAsync の現在の呼び出しまたは次の呼び出しで、IsCanceledtrue に設定された ReadResult が返されます。Calling PipeReader.CancelPendingRead causes the current or next call to PipeReader.ReadAsync to return a ReadResult with IsCanceled set to true. これは、既存の読み取りループを非破壊的で非例外的な方法で停止する際に役立つ場合があります。This can be useful for halting the existing read loop in a non-destructive and non-exceptional way.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each 
                // iteration.
                while (TryParseMessage(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the 
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader の一般的な問題PipeReader common problems

  • consumed または examined に間違った値を渡すと、既に読み取られているデータが読み取られる場合があります。Passing the wrong values to consumed or examined may result in reading already read data.

  • buffer.End を検査済みとして渡すと、次のようになる場合があります。Passing buffer.End as examined may result in:

    • データが停止するStalled data
    • データが消費されていない場合、最終的にメモリ不足 (OOM) 例外が発生する可能性があります。Possibly an eventual Out of Memory (OOM) exception if data isn't consumed. たとえば、バッファーから一度に 1 つのメッセージを処理する場合は、PipeReader.AdvanceTo(position, buffer.End) です。For example, PipeReader.AdvanceTo(position, buffer.End) when processing a single message at a time from the buffer.
  • consumed または examined に間違った値を渡すと、無限ループが発生する可能性があります。Passing the wrong values to consumed or examined may result in an infinite loop. たとえば、buffer.Start が変更されていない場合、PipeReader.AdvanceTo(buffer.Start) では、新しいデータが到着する直前に PipeReader.ReadAsync への次の呼び出しで制御が返されます。For example, PipeReader.AdvanceTo(buffer.Start) if buffer.Start hasn't changed will cause the next call to PipeReader.ReadAsync to return immediately before new data arrives.

  • consumed または examined に間違った値を渡すと、無限バッファーリング (最終的には OOM) が発生する可能性があります。Passing the wrong values to consumed or examined may result in infinite buffering (eventual OOM).

  • PipeReader.AdvanceTo を呼び出した後に ReadOnlySequence<byte> を使用すると、メモリが破損する可能性があります (解放後使用)。Using the ReadOnlySequence<byte> after calling PipeReader.AdvanceTo may result in memory corruption (use after free).

  • PipeReader.Complete/CompleteAsync を呼び出せないと、メモリ リークが発生する可能性があります。Failing to call PipeReader.Complete/CompleteAsync may result in a memory leak.

  • バッファーを処理する前に ReadResult.IsCompleted を確認し、読み取りロジックを終了すると、データが失われます。Checking ReadResult.IsCompleted and exiting the reading logic before processing the buffer results in data loss. ループの終了条件は、ReadResult.Buffer.IsEmpty および ReadResult.IsCompleted に基づいている必要があります。The loop exit condition should be based on ReadResult.Buffer.IsEmpty and ReadResult.IsCompleted. これを誤って実行すると、無限ループになる可能性があります。Doing this incorrectly could result in an infinite loop.

問題のあるコードProblematic code

データ損失Data loss

IsCompletedtrue に設定されている場合、ReadResult でデータの最終セグメントを返すことができます。The ReadResult can return the final segment of data when IsCompleted is set to true. 読み取りループを終了する前にデータを読み取らないと、データが失われます。Not reading that data before exiting the read loop will result in data loss.

警告

次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
    {
        break;
    }

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

警告

上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.

無限ループInfinite loop

次のロジックでは、Result.IsCompletedtrue でも、バッファー内に完全なメッセージがない場合は、無限ループが発生する可能性があります。The following logic may result in an infinite loop if the Result.IsCompleted is true but there's never a complete message in the buffer.

警告

次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
    {
        break;
    }

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.

同じ問題があるもう 1 つのコードを次に示します。Here's another piece of code with the same problem. ReadResult.IsCompleted を確認する前に、空でないバッファーがあるかどうかが確認されます。It's checking for a non-empty buffer before checking ReadResult.IsCompleted. これは else if にあるため、バッファー内に完全なメッセージがない場合は、無限にループされます。Because it's in an else if, it will loop forever if there's never a complete message in the buffer.

警告

次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
    {
        Process(ref infiniteLoopBuffer, out Message message);
    }
    else if (result.IsCompleted)
    {
        break;
    }

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.

予期しないハングUnexpected Hang

examined の位置で buffer.End を使用して PipeReader.AdvanceTo を無条件に呼び出すと、1 つのメッセージを解析するときにハングが発生する可能性があります。Unconditionally calling PipeReader.AdvanceTo with buffer.End in the examined position may result in hangs when parsing a single message. PipeReader.AdvanceTo の次の呼び出しでは、以下のようになるまで何も返されません。The next call to PipeReader.AdvanceTo won't return until:

  • パイプにさらにデータが書き込まれている。There's more data written to the pipe.
  • また、新しいデータが以前に検査されていない。And the new data wasn't previously examined.

警告

次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
    {
        break;
    }

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
    {
        return message;
    }
}

警告

上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.

メモリ不足 (OOM)Out of Memory (OOM)

次のような状況の場合、以下のコードでは、OutOfMemoryException が発生するまでバッファーが保持されます。With the following conditions, the following code keeps buffering until an OutOfMemoryException occurs:

  • 最大メッセージ サイズがない。There's no maximum message size.
  • PipeReader から返されたデータで、完全なメッセージが作成されない。The data returned from the PipeReader doesn't make a complete message. たとえば、他の側で大きなメッセージ (4 GB のメッセージなど) を書き込んでいるため、完全なメッセージが作成されていない。For example, it doesn't make a complete message because the other side is writing a large message (For example, a 4-GB message).

警告

次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
    {
        break;
    }

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
    {
        return message;
    }
}

警告

上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.

メモリの破損Memory Corruption

バッファーを読み取るヘルパーを記述するときは、返されたすべてのペイロードをコピーしてから Advance を呼び出す必要があります。When writing helpers that read the buffer, any returned payload should be copied before calling Advance. 次の例では、Pipe で破棄されたメモリを返し、次の操作 (読み取り/書き込み) でそれを再利用する可能性があります。The following example will return memory that the Pipe has discarded and may reuse it for the next operation (read/write).

警告

次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
        {
            break;
        }

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer 
            // was captured.
            break;
        }
    }

    return message;
}

警告

上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.

PipeWriterPipeWriter

PipeWriter では、呼び出し元の代わりに書き込むバッファーを管理します。The PipeWriter manages buffers for writing on the caller's behalf. PipeWriter では、IBufferWriter<byte> を実装します。PipeWriter implements IBufferWriter<byte>. IBufferWriter<byte> は、バッファーのコピーを追加せずに、書き込みを実行するためにバッファーにアクセスできるようにします。IBufferWriter<byte> makes it possible to get access to buffers to perform writes without additional buffer copies.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

上記のコードでは、次のようになります。The previous code:

  • GetMemory を使用して、PipeWriter から少なくとも 5 バイトのバッファーを要求します。Requests a buffer of at least 5 bytes from the PipeWriter using GetMemory.
  • ASCII 文字列である "Hello" のバイトを、返された Memory<byte> に書き込みます。Writes bytes for the ASCII string "Hello" to the returned Memory<byte>.
  • Advance を呼び出して、バッファーに書き込まれたバイト数を示します。Calls Advance to indicate how many bytes were written to the buffer.
  • 基になるデバイスにバイトを送信する、PipeWriter をフラッシュします。Flushes the PipeWriter, which sends the bytes to the underlying device.

上記の書き込みメソッドでは、PipeWriter によって提供されるバッファーが使用されています。The previous method of writing uses the buffers provided by the PipeWriter. あるいは、PipeWriter.WriteAsync では次のようになります。Alternatively, PipeWriter.WriteAsync:

  • 既存のバッファーを PipeWriter にコピーします。Copies the existing buffer to the PipeWriter.
  • GetSpan を呼び出し、必要に応じて Advance を呼び出してから、FlushAsync を呼び出します。Calls GetSpan, Advance as appropriate and calls FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here 
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

キャンセルCancellation

FlushAsync では、CancellationToken を渡すことがサポートされます。FlushAsync supports passing a CancellationToken. CancellationToken を渡すと、フラッシュの保留中にトークンが取り消された場合に、OperationCanceledException となります。Passing a CancellationToken results in an OperationCanceledException if the token is canceled while there's a flush pending. PipeWriter.FlushAsync では、例外を発生させることなく、PipeWriter.CancelPendingFlush を使用して、現在のフラッシュ操作を取り消す方法がサポートされます。PipeWriter.FlushAsync supports a way to cancel the current flush operation via PipeWriter.CancelPendingFlush without raising an exception. PipeWriter.CancelPendingFlush を呼び出すと、PipeWriter.FlushAsync または PipeWriter.WriteAsync の現在の呼び出しあるいは次の呼び出しで、IsCanceledtrue に設定された FlushResult が返されます。Calling PipeWriter.CancelPendingFlush causes the current or next call to PipeWriter.FlushAsync or PipeWriter.WriteAsync to return a FlushResult with IsCanceled set to true. これは、非破壊的で非例外的な方法で生成されるフラッシュを停止する際に役立つ場合があります。This can be useful for halting the yielding flush in a non-destructive and non-exceptional way.

PipeWriter の一般的な問題PipeWriter common problems

  • GetSpan および GetMemory では、少なくとも要求された量のメモリを持つバッファーを返します。GetSpan and GetMemory return a buffer with at least the requested amount of memory. 正確なバッファー サイズを想定しないでくださいDon't assume exact buffer sizes.
  • 連続する呼び出しで同じバッファーまたは同じサイズのバッファーが返される保証はありません。There's no guarantee that successive calls will return the same buffer or the same-sized buffer.
  • さらにデータの書き込みを続行するには、Advance を呼び出した後に新しいバッファーを要求する必要があります。A new buffer must be requested after calling Advance to continue writing more data. 以前に取得したバッファーに書き込むことはできません。The previously acquired buffer can't be written to.
  • FlushAsync の呼び出しが不完全な場合に GetMemory または GetSpan を呼び出すのは安全ではありません。Calling GetMemory or GetSpan while there's an incomplete call to FlushAsync isn't safe.
  • フラッシュされていないデータがある場合に Complete または CompleteAsync を呼び出すと、メモリが破損する可能性があります。Calling Complete or CompleteAsync while there's unflushed data can result in memory corruption.

IDuplexPipeIDuplexPipe

IDuplexPipe は、読み取りと書き込みの両方をサポートする種類のコントラクトです。The IDuplexPipe is a contract for types that support both reading and writing. たとえば、ネットワーク接続は IDuplexPipe によって表されます。For example, a network connection would be represented by an IDuplexPipe.

PipeReaderPipeWriter を含む Pipe とは異なり、IDuplexPipe は全二重接続の片側を表します。Unlike Pipe which contains a PipeReader and a PipeWriter, IDuplexPipe represents a single side of a full duplex connection. つまり、PipeWriter に書き込まれるものは、PipeReader からは読み取られません。That means what is written to the PipeWriter will not be read from the PipeReader.

ストリームStreams

ストリーム データの読み取りまたは書き込みを行う場合、通常は、デシリアライザーを使用してデータを読み取り、シリアライザーを使用してデータを書き込みます。When reading or writing stream data, you typically read data using a de-serializer and write data using a serializer. これらの読み取りおよび書き込みストリーム API のほとんどに、Stream パラメーターがあります。Most of these read and write stream APIs have a Stream parameter. これらの既存の API との統合をより容易にするために、PipeReader および PipeWriterAsStream が公開されます。To make it easier to integrate with these existing APIs, PipeReader and PipeWriter expose an AsStream. AsStream では、PipeReader または PipeWriter に関する Stream 実装を返します。AsStream returns a Stream implementation around the PipeReader or PipeWriter.