.NET 中的 System.IO.Pipelines

System.IO.Pipelines 是一个库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。

该库可作为 System.IO.Pipelines Nuget 包提供。

System.IO.Pipelines 解决什么问题

分析流数据的应用由样板代码组成,后者由许多专门且不寻常的代码流组成。 样板代码和特殊情况代码很复杂且难以进行维护。

System.IO.Pipelines 已构建为:

  • 具有高性能的流数据分析功能。
  • 减少代码复杂性。

下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 '\n' 分隔):

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

前面的代码有几个问题:

  • 单次调用 ReadAsync 可能无法接收整条消息(行尾)。
  • 忽略了 stream.ReadAsync 的结果。 stream.ReadAsync 返回读取的数据量。
  • 它不能处理在单个 ReadAsync 调用中读取多行的情况。
  • 它为每次读取分配一个 byte 数组。

要解决上述问题,需要进行以下更改:

  • 缓冲传入的数据,直到找到新行。

  • 分析缓冲区中返回的所有行。

  • 该行可能大于 1KB(1024 字节)。 此代码需要调整输入缓冲区的大小,直到找到分隔符后,才能在缓冲区内容纳完整行。

    • 如果调整缓冲区的大小,当输入中出现较长的行时,将生成更多缓冲区副本。
    • 压缩用于读取行的缓冲区,以减少空余。
  • 请考虑使用缓冲池来避免重复分配内存。

  • 下面的代码解决了其中一些问题:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

前面的代码很复杂,不能解决所识别的所有问题。 高性能网络通常意味着编写复杂的代码以使性能最大化。 System.IO.Pipelines 的设计目的是使编写此类代码更容易。

管道

Pipe 类可用于创建 PipeWriter/PipeReader 对。 写入 PipeWriter 的所有数据都可用于 PipeReader

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

管道基本用法

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

有两个循环:

  • FillPipeAsyncSocket 读取并写入 PipeWriter
  • ReadPipeAsyncPipeReader 读取并分析传入的行。

没有分配显式缓冲区。 所有缓冲区管理都委托给 PipeReaderPipeWriter 实现。 委派缓冲区管理使使用代码更容易集中关注业务逻辑。

在第一个循环中:

在第二个循环中,PipeReader 使用由 PipeWriter 写入的缓冲区。 缓冲区来自套接字。 对 PipeReader.ReadAsync 的调用:

  • 返回包含两条重要信息的 ReadResult

    • ReadOnlySequence<byte> 形式读取的数据。
    • 布尔值 IsCompleted,指示是否已到达数据结尾 (EOF)。

找到行尾 (EOL) 分隔符并分析该行后:

  • 该逻辑处理缓冲区以跳过已处理的内容。
  • 调用 PipeReader.AdvanceTo 以告知 PipeReader 已消耗和检查了多少数据。

读取器和编写器循环通过调用 Complete 结束。 Complete 使基础管道释放其分配的内存。

反压和流量控制

理想情况下,读取和分析可协同工作:

  • 读取线程使用来自网络的数据并将其放入缓冲区。
  • 分析线程负责构造适当的数据结构。

通常,分析所花费的时间比仅从网络复制数据块所用时间更长:

  • 读取线程领先于分析线程。
  • 读取线程必须减缓或分配更多内存来存储用于分析线程的数据。

为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。

为解决上述问题,Pipe 提供了两个设置来控制数据流:

Diagram with ResumeWriterThreshold and PauseWriterThreshold

PipeWriter.FlushAsync

  • Pipe 中的数据量超过 PauseWriterThreshold 时,返回不完整的 ValueTask<FlushResult>
  • 低于 ResumeWriterThreshold 时,返回完整的 ValueTask<FlushResult>

使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。

示例

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

通常在使用 asyncawait 时,异步代码会在 TaskScheduler 或当前 SynchronizationContext 上恢复。

在执行 I/O 时,对执行 I/O 的位置进行细粒度控制非常重要。 此控件允许高效利用 CPU 缓存。 高效的缓存对于 Web 服务器等高性能应用至关重要。 PipeScheduler 提供对异步回调运行位置的控制。 默认情况下:

  • 使用当前的 SynchronizationContext
  • 如果没有 SynchronizationContext,它将使用线程池运行回调。
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPoolPipeScheduler 实现,用于对线程池的回调进行排队。 PipeScheduler.ThreadPool 是默认选项,通常也是最佳选项。 PipeScheduler.Inline 可能会导致意外后果,如死锁。

管道重置

通常重用 Pipe 对象即可重置。 要重置管道,请在完成 PipeReaderPipeWriter 时调用 PipeReaderReset

PipeReader

PipeReader 代表调用方管理内存。 在调用 PipeReader.ReadAsync 之后始终调用 PipeReader.AdvanceTo。 这使 PipeReader 知道调用方何时用完内存,以便可以对其进行跟踪。 从 PipeReader.ReadAsync 返回的 ReadOnlySequence<byte> 仅在调用 PipeReader.AdvanceTo 之前有效。 调用 PipeReader.AdvanceTo 后,不能使用 ReadOnlySequence<byte>

PipeReader.AdvanceTo 采用两个 SequencePosition 参数:

  • 第一个参数确定消耗的内存量。
  • 第二个参数确定观察到的缓冲区数。

将数据标记为“已使用”意味着管道可以将内存返回到底层缓冲池。 将数据标记为“已观察”可控制对 PipeReader.ReadAsync 的下一个调用的操作。 将所有内容都标记为“已观察”意味着下次对 PipeReader.ReadAsync 的调用将不会返回,直到有更多数据写入管道。 任何其他值都将使对 PipeReader.ReadAsync 的下一次调用立即返回并包含已观察到的和未观察到的数据,但不是已被使用的数据。

读取流数据方案

尝试读取流数据时会出现以下几种典型模式:

  • 给定数据流时,分析单条消息。
  • 给定数据流时,分析所有可用消息。

以下示例使用 TryParseLines 方法分析来自 ReadOnlySequence<byte> 的消息。 TryParseLines 分析单条消息并更新输入缓冲区,以从缓冲区中剪裁已分析的消息。 TryParseLines 不是 .NET 的一部分,它是在以下部分中使用的用户编写的方法。

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

读取单条消息

下面的代码从 PipeReader 读取一条消息并将其返回给调用方。

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

前面的代码:

  • 分析单条消息。
  • 更新已使用的 SequencePosition 并检查 SequencePosition 以指向已剪裁的输入缓冲区的开始。

因为 TryParseLines 从输入缓冲区中删除了已分析的消息,所以更新了两个 SequencePosition 参数。 通常,分析来自缓冲区的单条消息时,检查的位置应为以下位置之一:

  • 消息的结尾。
  • 如果未找到消息,则返回接收缓冲区的结尾。

单条消息案例最有可能出现错误。 将错误的值传递给“已检查”可能会导致内存不足异常或无限循环 。 有关详细信息,请参阅本文中的 PipeReader 常见问题部分。

读取多条消息

以下代码从 PipeReader 读取所有消息,并在每条消息上调用 ProcessMessageAsync

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

取消

PipeReader.ReadAsync

  • 支持传递 CancellationToken
  • 如果在读取挂起期间取消了 CancellationToken,则会引发 OperationCanceledException
  • 支持通过 PipeReader.CancelPendingRead 取消当前读取操作的方法,这样可以避免引发异常。 调用 PipeReader.CancelPendingRead 将导致对 PipeReader.ReadAsync 的当前或下次调用返回 ReadResult,并将 IsCanceled 设置为 true。 这对于以非破坏性和非异常的方式停止现有的读取循环非常有用。
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader 常见问题

  • 将错误的值传递给 consumedexamined 可能会导致读取已读取的数据。

  • 传递 buffer.End 作为检查对象可能会导致以下问题:

    • 数据停止
    • 如果数据未使用,可能最终会出现内存不足 (OOM) 异常。 例如,当一次处理来自缓冲区的单条消息时,可能会出现 PipeReader.AdvanceTo(position, buffer.End)
  • 将错误的值传递给 consumedexamined 可能会导致无限循环。 例如,如果 buffer.Start 没有更改,则 PipeReader.AdvanceTo(buffer.Start) 将导致在下一个对 PipeReader.ReadAsync 的调用在新数据到来之前立即返回。

  • 将错误的值传递给 consumedexamined 可能会导致无限缓冲(最终导致 OOM)。

  • 在调用 PipeReader.AdvanceTo 之后使用 ReadOnlySequence<byte> 可能会导致内存损坏(在释放之后使用)。

  • 未能调用 PipeReader.Complete/CompleteAsync 可能会导致内存泄漏。

  • 在处理缓冲区之前检查 ReadResult.IsCompleted 并退出读取逻辑会导致数据丢失。 循环退出条件应基于 ReadResult.Buffer.IsEmptyReadResult.IsCompleted。 如果错误执行此操作,可能会导致无限循环。

有问题的代码

数据丢失

IsCompleted 被设置为 true 时,ReadResult 可能会返回最后一段数据。 在退出读循环之前不读取该数据将导致数据丢失。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题

无限循环

如果 Result.IsCompletedtrue,则以下逻辑可能会导致无限循环,但缓冲区中永远不会有完整的消息。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题

下面是另一段具有相同问题的代码。 该代码在检查 ReadResult.IsCompleted 之前检查非空缓冲区。 由于该代码位于 else if 中,如果缓冲区中没有完整的消息,它将永远循环。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题

应用程序无响应

在分析单条消息时,如果无条件调用 PipeReader.AdvanceTobuffer.End 位于 examined 位置,则可能导致应用程序变为无响应。 对 PipeReader.AdvanceTo 的下次调用将在以下情况下返回:

  • 有更多数据写入管道。
  • 以及之前未检查过新数据。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题

内存不足 (OOM)

在满足以下条件的情况下,以下代码将保持缓冲,直到发生 OutOfMemoryException

  • 没有最大消息大小。
  • PipeReader 返回的数据不会生成完整的消息。 例如,它不会生成完整的消息,因为另一端正在编写一条大消息(例如,一条为 4GB 的消息)。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题

内存损坏

当写入读取缓冲区的帮助程序时,应在调用 Advance 之前复制任何返回的有效负载。 下面的示例将返回 Pipe 已丢弃的内存,并可能将其重新用于下一个操作(读/写)。

警告

不要使用以下代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 以下示例用于解释 PipeReader 常见问题

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

警告

不要使用上述代码。 使用此示例将导致数据丢失、挂起和安全问题,并且不应复制。 前面的示例用于解释 PipeReader 常见问题

PipeWriter

PipeWriter 管理用于代表调用方写入的缓冲区。 PipeWriter 可实现 IBufferWriter<byte>IBufferWriter<byte> 使得无需额外的缓冲区副本就可以访问缓冲区来执行写入操作。

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

之前的代码:

  • 使用 GetMemoryPipeWriter 请求至少 5 个字节的缓冲区。
  • 将 ASCII 字符串 "Hello" 的字节写入返回的 Memory<byte>
  • 调用 Advance 以指示写入缓冲区的字节数。
  • 刷新 PipeWriter,以便将字节发送到基础设备。

以前的写入方法使用 PipeWriter 提供的缓冲区。 它可能还使用了 PipeWriter.WriteAsync,该项执行以下操作:

  • 将现有缓冲区复制到 PipeWriter
  • 根据需要调用 GetSpanAdvance,然后调用 FlushAsync
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

取消

FlushAsync 支持传递 CancellationToken。 如果令牌在刷新挂起时被取消,则传递 CancellationToken 将导致 OperationCanceledExceptionPipeWriter.FlushAsync 支持通过 PipeWriter.CancelPendingFlush 取消当前刷新操作而不引发异常的方法。 调用 PipeWriter.CancelPendingFlush 将导致对 PipeWriter.FlushAsyncPipeWriter.WriteAsync 的当前或下次调用返回 FlushResult,并将 IsCanceled 设置为 true。 这对于以非破坏性和非异常的方式停止暂停刷新非常有用。

PipeWriter 常见问题

  • GetSpanGetMemory 返回至少具有请求内存量的缓冲区。 请勿假设确切的缓冲区大小 。
  • 无法保证连续的调用将返回相同的缓冲区或相同大小的缓冲区。
  • 在调用 Advance 之后,必须请求一个新的缓冲区来继续写入更多数据。 不能写入先前获得的缓冲区。
  • 如果未完成对 FlushAsync 的调用,则调用 GetMemoryGetSpan 将不安全。
  • 如果未刷新数据,则调用 CompleteCompleteAsync 可能导致内存损坏。

有关使用 PipeReader 和 PipeWriter 的提示

以下提示将帮助你成功使用 System.IO.Pipelines 类:

  • 始终完成 PipeReaderPipeWriter(包括适用时的例外情况)。
  • 在调用 PipeReader.ReadAsync 之后始终调用 PipeReader.AdvanceTo
  • 写入时定期 awaitPipeWriter.FlushAsync,并始终检查 FlushResult.IsCompleted。 如果 IsCompletedtrue,则中止写入,因为这表示读取器已完成,不再关心所写入的内容。
  • 在写入希望 PipeReader 有权访问的内容后调用 PipeWriter.FlushAsync
  • 如果读取器在 FlushAsync 完成之前无法启动,请勿调用 FlushAsync,因为这可能会导致死锁。
  • 确保只有一个上下文“拥有”PipeReaderPipeWriter 或访问它们。 这些类型不是线程安全的。
  • 调用 AdvanceTo 或完成 PipeReader 后,切勿访问 ReadResult.Buffer

IDuplexPipe

IDuplexPipe 是支持读写的类型的协定。 例如,网络连接将由 IDuplexPipe 表示。

与包含 PipeReaderPipeWriterPipe 不同,IDuplexPipe 表示全双工连接的一侧。 这意味着写入 PipeWriter 的内容不会从 PipeReader 中读取。

在读取或写入流数据时,通常使用反序列化程序读取数据,并使用序列化程序写入数据。 大多数读取和写入流 API 都有一个 Stream 参数。 为了更轻松地与这些现有 API 集成,PipeReaderPipeWriter 公开了一个 AsStream 方法。 AsStream 返回围绕 PipeReaderPipeWriterStream 实现。

流示例

可使用给定了 Stream 对象和可选的相应创建选项的静态 Create 方法创建 PipeReaderPipeWriter 实例。

StreamPipeReaderOptions 允许使用以下参数控制 PipeReader 实例的创建:

StreamPipeWriterOptions 允许使用以下参数控制 PipeWriter 实例的创建:

重要

使用 Create 方法创建 PipeReaderPipeWriter 实例时,需要考虑 Stream 对象的生存期。 如果在读取器或编写器使用该方法完成操作后,你需要访问流,则需要在创建选项上将 LeaveOpen 标志设置为 true。 否则,流将关闭。

以下代码演示了使用 Create 方法从流中创建 PipeReaderPipeWriter 实例。

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

应用程序使用 StreamReader 以流形式读取 lorem-ipsum.txt 文件,并且必须以空白行结尾。 FileStream 传递给 PipeReader.Create,后者实例化 PipeReader 对象。 然后,控制台应用程序使用 Console.OpenStandardOutput() 将其标准输出流传递到 PipeWriter.Create。 示例支持取消