.NET の System.IO.PipelinesSystem.IO.Pipelines in .NET
System.IO.Pipelines は、.NET でハイ パフォーマンスの I/O をより簡単に行えるように設計された新しいライブラリです。System.IO.Pipelines is a new library that is designed to make it easier to do high-performance I/O in .NET. これは、すべての .NET 実装で動作する .NET Standard を対象とするライブラリです。It's a library targeting .NET Standard that works on all .NET implementations.
System.IO.Pipelines によって解決される問題What problem does System.IO.Pipelines solve
ストリーミング データを解析するアプリは、多くの特殊な通常とは異なるコード フローを持つ定型コードで構成されます。Apps that parse streaming data are composed of boilerplate code having many specialized and unusual code flows. 定型および特殊なケースのコードは複雑で、保守が困難です。The boilerplate and special case code is complex and difficult to maintain.
System.IO.Pipelines
は、以下のようになるように設計されています。System.IO.Pipelines
was architected to:
- ハイ パフォーマンスのストリーミング データ解析を実現する。Have high performance parsing streaming data.
- コードの複雑さを軽減する。Reduce code complexity.
次のコードは、クライアントから ('\n'
で区切られた) 行区切りメッセージを受信する TCP サーバーでは一般的なものです。The following code is typical for a TCP server that receives line-delimited messages (delimited by '\n'
) from a client:
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
上記のコードには、以下のようないくつかの問題があります。The preceding code has several problems:
- メッセージ全体 (行の終わり) が、
ReadAsync
への 1 回の呼び出しで受信されない場合がある。The entire message (end of line) might not be received in a single call toReadAsync
. stream.ReadAsync
の結果が無視される。It's ignoring the result ofstream.ReadAsync
.stream.ReadAsync
で、読み取られたデータの量が返される。stream.ReadAsync
returns how much data was read.- 複数の行が 1 回の
ReadAsync
呼び出しで読み取られるケースが処理されない。It doesn't handle the case where multiple lines are read in a singleReadAsync
call. - 読み取りごとに
byte
配列が割り当てられる。It allocates abyte
array with each read.
上記の問題を解決するには、次の変更が必要です。To fix the preceding problems, the following changes are required:
改行が検出されるまで、受信データをバッファーに格納します。Buffer the incoming data until a new line is found.
バッファーで返されたすべての行を解析します。Parse all the lines returned in the buffer.
行が 1 KB (1024 バイト) より大きい可能性があります。It's possible that the line is bigger than 1 KB (1024 bytes). コードでは、バッファー内の完全な行に収まるように、区切り記号が見つかるまで入力バッファーのサイズを変更する必要があります。The code needs to resize the input buffer until the delimiter is found in order to fit the complete line inside the buffer.
- バッファーのサイズが変更された場合、入力により長い行が表示されると、より多くのバッファー コピーが作成されます。If the buffer is resized, more buffer copies are made as longer lines appear in the input.
- 無駄な領域を減らすには、行の読み取りに使用されるバッファーを小さくします。To reduce wasted space, compact the buffer used for reading lines.
メモリが繰り返し割り当てられないようにするために、バッファー プーリングの使用を検討します。Consider using buffer pooling to avoid allocating memory repeatedly.
次のコードでは、これらの問題の一部に対処します。The following code addresses some of these problems:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
上記のコードは複雑で、特定されたすべての問題には対処していません。The previous code is complex and doesn't address all the problems identified. ハイ パフォーマンス ネットワークは、通常、パフォーマンスを最大化するために非常に複雑なコードを記述することを意味します。High-performance networking usually means writing very complex code to maximize performance. System.IO.Pipelines
は、この種のコードをより簡単に記述できるように設計されています。System.IO.Pipelines
was designed to make writing this type of code easier.
コードのコメントを英語以外の言語に翻訳し表示したい場合、こちらの GitHub ディスカッション イシューにてお知らせください。If you would like to see code comments translated to languages other than English, let us know in this GitHub discussion issue.
パイプPipe
Pipe クラスを使用して、PipeWriter/PipeReader
ペアを作成できます。The Pipe class can be used to create a PipeWriter/PipeReader
pair. PipeWriter
に書き込まれたすべてのデータは、PipeReader
で利用できます。All data written into the PipeWriter
is available in the PipeReader
:
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
パイプの基本的な使用方法Pipe basic usage
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
次の 2 つのループがあります。There are two loops:
FillPipeAsync
ではSocket
から読み取り、PipeWriter
に書き込みます。FillPipeAsync
reads from theSocket
and writes to thePipeWriter
.ReadPipeAsync
ではPipeReader
から読み取り、受信行を解析します。ReadPipeAsync
reads from thePipeReader
and parses incoming lines.
明示的なバッファーは割り当てられていません。There are no explicit buffers allocated. すべてのバッファー管理は、PipeReader
と PipeWriter
の実装に委任されます。All buffer management is delegated to the PipeReader
and PipeWriter
implementations. バッファー管理を委任することで、ビジネス ロジックのみに焦点を当てるコードがより簡単に消費されるようになります。Delegating buffer management makes it easier for consuming code to focus solely on the business logic.
最初のループでは、次のようになります。In the first loop:
- PipeWriter.GetMemory(Int32) は、基になるライターからメモリを取得するために呼び出されます。PipeWriter.GetMemory(Int32) is called to get memory from the underlying writer.
- PipeWriter.Advance(Int32) は、バッファーに書き込まれたデータの量を
PipeWriter
に示すために呼び出されます。PipeWriter.Advance(Int32) is called to tell thePipeWriter
how much data was written to the buffer. - PipeWriter.FlushAsync は、データを
PipeReader
で使用できるようにするために呼び出されます。PipeWriter.FlushAsync is called to make the data available to thePipeReader
.
2 番目のループでは、PipeWriter
によって書き込まれたバッファーが PipeReader
で消費されます。In the second loop, the PipeReader
consumes the buffers written by PipeWriter
. バッファーはソケットから取得されます。The buffers come from the socket. PipeReader.ReadAsync
の呼び出しでは、次のようになります。The call to PipeReader.ReadAsync
:
次の 2 つの重要な情報を含む、ReadResult を返します。Returns a ReadResult that contains two important pieces of information:
ReadOnlySequence<byte>
の形式で読み取られたデータ。The data that was read in the form ofReadOnlySequence<byte>
.- データの終わり (EOF) に到達したかどうかを示すブール値
IsCompleted
。A booleanIsCompleted
that indicates if the end of data (EOF) has been reached.
行の終わり (EOL) の区切り記号が検出され、行が解析された後は、次のようになります。After finding the end of line (EOL) delimiter and parsing the line:
- ロジックでバッファーが処理され、既に処理されているものがスキップされます。The logic processes the buffer to skip what's already processed.
PipeReader.AdvanceTo
は、どのくらいの量のデータが使用され、検査されたかをPipeReader
に示すために呼び出されます。PipeReader.AdvanceTo
is called to tell thePipeReader
how much data has been consumed and examined.
リーダーとライターのループは、Complete
を呼び出すことによって終了します。The reader and writer loops end by calling Complete
. Complete
は、基になるパイプで割り当てられたメモリを解放できるようにします。Complete
lets the underlying Pipe release the memory it allocated.
バックプレッシャとフロー制御Backpressure and flow control
読み取りと解析を連動させるのが理想的です。Ideally, reading and parsing work together:
- 書き込みスレッドでは、ネットワークからのデータを消費し、それをバッファーに格納します。The writing thread consumes data from the network and puts it in buffers.
- 解析スレッドでは、適切なデータ構造の構築を担当します。The parsing thread is responsible for constructing the appropriate data structures.
通常、解析には、ネットワークからデータのブロックをコピーするだけの場合より、時間がかかります。Typically, parsing takes more time than just copying blocks of data from the network:
- 読み取りスレッドは解析スレッドより前になります。The reading thread gets ahead of the parsing thread.
- 読み取りスレッドの速度を落とすか、解析スレッド用のデータを格納するためにより多くのメモリを割り当てる必要があります。The reading thread has to either slow down or allocate more memory to store the data for the parsing thread.
最適なパフォーマンスが得られるように、頻繁な一時停止間のバランスが保たれ、より多くのメモリが割り当てられます。For optimal performance, there's a balance between frequent pauses and allocating more memory.
この問題を解決するために、Pipe
には、データのフローを制御するための次の 2 つの設定があります。To solve the preceding problem, the Pipe
has two settings to control the flow of data:
- PauseWriterThreshold:FlushAsync の呼び出しが一時停止する前に、バッファーに格納する必要があるデータ量を判別します。PauseWriterThreshold: Determines how much data should be buffered before calls to FlushAsync pause.
- ResumeWriterThreshold:
PipeWriter.FlushAsync
の呼び出しが再開される前に、リーダーが監視する必要があるデータの量を判別します。ResumeWriterThreshold: Determines how much data the reader has to observe before calls toPipeWriter.FlushAsync
resume.
PipeWriter.FlushAsync:PipeWriter.FlushAsync:
Pipe
のデータ量がPauseWriterThreshold
を超えたときに、不完全なValueTask<FlushResult>
を返します。Returns an incompleteValueTask<FlushResult>
when the amount of data in thePipe
crossesPauseWriterThreshold
.ResumeWriterThreshold
より低くなったときに、ValueTask<FlushResult>
を完了します。CompletesValueTask<FlushResult>
when it becomes lower thanResumeWriterThreshold
.
2 つの値は、1 つの値が使用された場合に発生する可能性がある、急速な循環を防ぐために使用されます。Two values are used to prevent rapid cycling, which can occur if one value is used.
使用例Examples
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeSchedulerPipeScheduler
通常、async
および await
を使用する場合、非同期コードは TaskScheduler または現在の SynchronizationContext で再開されます。Typically when using async
and await
, asynchronous code resumes on either on a TaskScheduler or on the current SynchronizationContext.
I/O を行う場合は、I/O が実行される場所をきめ細かく制御することが重要です。When doing I/O, it's important to have fine-grained control over where the I/O is performed. この制御により、CPU キャッシュを効果的に利用できます。This control allows taking advantage of CPU caches effectively. 効率的なキャッシュは、Web サーバーなどのハイ パフォーマンス アプリに不可欠です。Efficient caching is critical for high-performance apps like web servers. PipeScheduler では、非同期コールバックが実行される場所を制御できます。PipeScheduler provides control over where asynchronous callbacks run. 既定では:By default:
- 現在の SynchronizationContext が使用されます。The current SynchronizationContext is used.
SynchronizationContext
がない場合は、スレッド プールを使用してコールバックを実行します。If there's noSynchronizationContext
, it uses the thread pool to run callbacks.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object> action, object state)
{
_queue.Add((action, state));
}
}
PipeScheduler.ThreadPool は、スレッド プールへのコールバックをキューに登録する PipeScheduler の実装です。PipeScheduler.ThreadPool is the PipeScheduler implementation that queues callbacks to the thread pool. PipeScheduler.ThreadPool
は既定値であり、一般的に最適な選択肢です。PipeScheduler.ThreadPool
is the default and generally the best choice. PipeScheduler.Inline を使用すると、デッドロックなどの意図しない結果となる可能性があります。PipeScheduler.Inline can cause unintended consequences such as deadlocks.
パイプのリセットPipe reset
多くの場合、Pipe
オブジェクトを再利用するのが効率的です。It's frequently efficient to reuse the Pipe
object. パイプをリセットするには、PipeReader
と PipeWriter
の両方が完了したときに PipeReader Reset を呼び出します。To reset the pipe, call PipeReader Reset when both the PipeReader
and PipeWriter
are complete.
PipeReaderPipeReader
PipeReader では、呼び出し元の代わりにメモリを管理します。PipeReader manages memory on the caller's behalf. PipeReader.ReadAsync を呼び出した後に、常にPipeReader.AdvanceTo を呼び出します。Always call PipeReader.AdvanceTo after calling PipeReader.ReadAsync. これにより、PipeReader
では、呼び出し元がメモリを使用して実行されるタイミングを把握し、追跡できるようになります。This lets the PipeReader
know when the caller is done with the memory so that it can be tracked. PipeReader.ReadAsync
から返された ReadOnlySequence<byte>
が有効なのは、PipeReader.AdvanceTo
が呼び出されるまでのみとなります。The ReadOnlySequence<byte>
returned from PipeReader.ReadAsync
is only valid until the call the PipeReader.AdvanceTo
. PipeReader.AdvanceTo
を呼び出した後、ReadOnlySequence<byte>
を使用することはできません。It's illegal to use ReadOnlySequence<byte>
after calling PipeReader.AdvanceTo
.
PipeReader.AdvanceTo
では、次の 2 つの SequencePosition 引数を受け取ります。PipeReader.AdvanceTo
takes two SequencePosition arguments:
- 最初の引数では、消費されたメモリの量を判別します。The first argument determines how much memory was consumed.
- 2 番目の引数では、監視されたバッファーの量を判別します。The second argument determines how much of the buffer was observed.
データを消費済みとしてマークすることは、パイプでメモリを基になるバッファープ ールに返すことができることを意味します。Marking data as consumed means that the pipe can return the memory to the underlying buffer pool. データを監視済みとしてマークすると、PipeReader.ReadAsync
の次の呼び出しで行われる内容が制御されます。Marking data as observed controls what the next call to PipeReader.ReadAsync
does. すべてを監視済みとしてマークすることは、パイプにデータがさらに書き込まれるまで、PipeReader.ReadAsync
の次の呼び出しでは何も返されないことを意味します。Marking everything as observed means that the next call to PipeReader.ReadAsync
won't return until there's more data written to the pipe. それ以外の値を指定すると、PipeReader.ReadAsync
の次の呼び出しで、監視されたデータ と 監視されていないデータがすぐに返されますが、既に消費されているデータは除きます。Any other value will make the next call to PipeReader.ReadAsync
return immediately with the observed and unobserved data, but not data that has already been consumed.
ストリーミング データの読み取りシナリオRead streaming data scenarios
ストリーミング データを読み取ろうとすると発生する、一般的なパターンがいくつかあります。There are a couple of typical patterns that emerge when trying to read streaming data:
- 指定したデータのストリームで、1 つのメッセージを解析する。Given a stream of data, parse a single message.
- 指定したデータのストリームで、使用可能なメッセージをすべて解析する。Given a stream of data, parse all available messages.
次の例では、ReadOnlySequence<byte>
からのメッセージを解析するために TryParseMessage
メソッドを使用しています。The following examples use the TryParseMessage
method for parsing messages from a ReadOnlySequence<byte>
. TryParseMessage
では 1 つのメッセージを解析し、入力バッファーを更新して、解析されたメッセージをバッファーからトリミングします。TryParseMessage
parses a single message and update the input buffer to trim the parsed message from the buffer. TryParseMessage
は .NET の一部ではありません。これは、次のセクションで使用されるユーザーが記述するメソッドです。TryParseMessage
is not part of .NET, it's a user written method used in the following sections.
bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);
1 つのメッセージを読み取るRead a single message
次のコードでは、PipeReader
からの 1 つのメッセージを読み取り、呼び出し元に返します。The following code reads a single message from a PipeReader
and returns it to the caller.
async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseMessage(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start as the
// parsed buffer as consumed. TryParseMessage trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
上記のコードでは次の操作が行われます。The preceding code:
- 1 つのメッセージを解析します。Parses a single message.
- 消費された
SequencePosition
と検査されたSequencePosition
を更新し、トリミングされた入力バッファーの先頭を指すようにします。Updates the consumedSequencePosition
and examinedSequencePosition
to point to the start of the trimmed input buffer.
2 つの SequencePosition
引数が更新されます。これは、TryParseMessage
で解析されたメッセージが入力バッファーから削除されるためです。The two SequencePosition
arguments are updated because TryParseMessage
removes the parsed message from the input buffer. 一般に、バッファーからの 1 つのメッセージを解析する場合、検査位置は次のいずれかである必要があります。Generally, when parsing a single message from the buffer, the examined position should be one of the following:
- メッセージの終わり。The end of the message.
- メッセージが検出されなかった場合は、受信されたバッファーの終わり。The end of the received buffer if no message was found.
1 つのメッセージ ケースでは、エラーが発生する可能性が最も高くなります。The single message case has the most potential for errors. examined に間違った値を渡すと、メモリ不足の例外または無限ループが発生する可能性があります。Passing the wrong values to examined can result in an out of memory exception or an infinite loop. 詳細については、この記事の「PipeReader の一般的な問題」セクションを参照してください。For more information, see the PipeReader common problems section in this article.
複数のメッセージの読み取りReading multiple messages
次のコードでは、PipeReader
からのすべてのメッセージを読み取り、それぞれに対して ProcessMessageAsync
を呼び出します。The following code reads all messages from a PipeReader
and calls ProcessMessageAsync
on each.
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseMessage(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
キャンセルCancellation
PipeReader.ReadAsync
:PipeReader.ReadAsync
:
- CancellationToken を渡すことをサポートします。Supports passing a CancellationToken.
- 読み取りが保留中のときに
CancellationToken
が取り消された場合は、OperationCanceledException をスローします。Throws an OperationCanceledException if theCancellationToken
is canceled while there's a read pending. - PipeReader.CancelPendingRead を使用して現在の読み取り操作を取り消す方法をサポートします。これにより、例外の発生を回避できます。Supports a way to cancel the current read operation via PipeReader.CancelPendingRead, which avoids raising an exception.
PipeReader.CancelPendingRead
を呼び出すと、PipeReader.ReadAsync
の現在の呼び出しまたは次の呼び出しで、IsCanceled
がtrue
に設定された ReadResult が返されます。CallingPipeReader.CancelPendingRead
causes the current or next call toPipeReader.ReadAsync
to return a ReadResult withIsCanceled
set totrue
. これは、既存の読み取りループを非破壊的で非例外的な方法で停止する際に役立つ場合があります。This can be useful for halting the existing read loop in a non-destructive and non-exceptional way.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseMessage(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
PipeReader の一般的な問題PipeReader common problems
consumed
またはexamined
に間違った値を渡すと、既に読み取られているデータが読み取られる場合があります。Passing the wrong values toconsumed
orexamined
may result in reading already read data.buffer.End
を検査済みとして渡すと、次のようになる場合があります。Passingbuffer.End
as examined may result in:- データが停止するStalled data
- データが消費されていない場合、最終的にメモリ不足 (OOM) 例外が発生する可能性があります。Possibly an eventual Out of Memory (OOM) exception if data isn't consumed. たとえば、バッファーから一度に 1 つのメッセージを処理する場合は、
PipeReader.AdvanceTo(position, buffer.End)
です。For example,PipeReader.AdvanceTo(position, buffer.End)
when processing a single message at a time from the buffer.
consumed
またはexamined
に間違った値を渡すと、無限ループが発生する可能性があります。Passing the wrong values toconsumed
orexamined
may result in an infinite loop. たとえば、buffer.Start
が変更されていない場合、PipeReader.AdvanceTo(buffer.Start)
では、新しいデータが到着する直前にPipeReader.ReadAsync
への次の呼び出しで制御が返されます。For example,PipeReader.AdvanceTo(buffer.Start)
ifbuffer.Start
hasn't changed will cause the next call toPipeReader.ReadAsync
to return immediately before new data arrives.consumed
またはexamined
に間違った値を渡すと、無限バッファーリング (最終的には OOM) が発生する可能性があります。Passing the wrong values toconsumed
orexamined
may result in infinite buffering (eventual OOM).PipeReader.AdvanceTo
を呼び出した後にReadOnlySequence<byte>
を使用すると、メモリが破損する可能性があります (解放後使用)。Using theReadOnlySequence<byte>
after callingPipeReader.AdvanceTo
may result in memory corruption (use after free).PipeReader.Complete/CompleteAsync
を呼び出せないと、メモリ リークが発生する可能性があります。Failing to callPipeReader.Complete/CompleteAsync
may result in a memory leak.バッファーを処理する前に ReadResult.IsCompleted を確認し、読み取りロジックを終了すると、データが失われます。Checking ReadResult.IsCompleted and exiting the reading logic before processing the buffer results in data loss. ループの終了条件は、
ReadResult.Buffer.IsEmpty
およびReadResult.IsCompleted
に基づいている必要があります。The loop exit condition should be based onReadResult.Buffer.IsEmpty
andReadResult.IsCompleted
. これを誤って実行すると、無限ループになる可能性があります。Doing this incorrectly could result in an infinite loop.
問題のあるコードProblematic code
❌ データ損失❌ Data loss
IsCompleted
が true
に設定されている場合、ReadResult
でデータの最終セグメントを返すことができます。The ReadResult
can return the final segment of data when IsCompleted
is set to true
. 読み取りループを終了する前にデータを読み取らないと、データが失われます。Not reading that data before exiting the read loop will result in data loss.
警告
次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
{
break;
}
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
警告
上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.
❌ 無限ループ❌ Infinite loop
次のロジックでは、Result.IsCompleted
が true
でも、バッファー内に完全なメッセージがない場合は、無限ループが発生する可能性があります。The following logic may result in an infinite loop if the Result.IsCompleted
is true
but there's never a complete message in the buffer.
警告
次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
{
break;
}
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.
同じ問題があるもう 1 つのコードを次に示します。Here's another piece of code with the same problem. ReadResult.IsCompleted
を確認する前に、空でないバッファーがあるかどうかが確認されます。It's checking for a non-empty buffer before checking ReadResult.IsCompleted
. これは else if
にあるため、バッファー内に完全なメッセージがない場合は、無限にループされます。Because it's in an else if
, it will loop forever if there's never a complete message in the buffer.
警告
次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
{
Process(ref infiniteLoopBuffer, out Message message);
}
else if (result.IsCompleted)
{
break;
}
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
警告
上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.
❌ 予期しないハング❌ Unexpected Hang
examined
の位置で buffer.End
を使用して PipeReader.AdvanceTo
を無条件に呼び出すと、1 つのメッセージを解析するときにハングが発生する可能性があります。Unconditionally calling PipeReader.AdvanceTo
with buffer.End
in the examined
position may result in hangs when parsing a single message. PipeReader.AdvanceTo
の次の呼び出しでは、以下のようになるまで何も返されません。The next call to PipeReader.AdvanceTo
won't return until:
- パイプにさらにデータが書き込まれている。There's more data written to the pipe.
- また、新しいデータが以前に検査されていない。And the new data wasn't previously examined.
警告
次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
{
break;
}
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
{
return message;
}
}
警告
上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.
❌ メモリ不足 (OOM)❌ Out of Memory (OOM)
次のような状況の場合、以下のコードでは、OutOfMemoryException が発生するまでバッファーが保持されます。With the following conditions, the following code keeps buffering until an OutOfMemoryException occurs:
- 最大メッセージ サイズがない。There's no maximum message size.
PipeReader
から返されたデータで、完全なメッセージが作成されない。The data returned from thePipeReader
doesn't make a complete message. たとえば、他の側で大きなメッセージ (4 GB のメッセージなど) を書き込んでいるため、完全なメッセージが作成されていない。For example, it doesn't make a complete message because the other side is writing a large message (For example, a 4-GB message).
警告
次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
{
break;
}
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
{
return message;
}
}
警告
上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.
❌ メモリの破損❌ Memory Corruption
バッファーを読み取るヘルパーを記述するときは、返されたすべてのペイロードをコピーしてから Advance
を呼び出す必要があります。When writing helpers that read the buffer, any returned payload should be copied before calling Advance
. 次の例では、Pipe
で破棄されたメモリを返し、次の操作 (読み取り/書き込み) でそれを再利用する可能性があります。The following example will return memory that the Pipe
has discarded and may reuse it for the next operation (read/write).
警告
次のコードは使用しないでください。Do NOT use the following code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 以下のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The following sample is provided to explain PipeReader Common problems.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
{
break;
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
警告
上記のコードは使用しない でください。Do NOT use the preceding code. このサンプルを使用すると、データの損失、ハング、セキュリティの問題が発生するため、コピーしないでください。Using this sample will result in data loss, hangs, security issues and should NOT be copied. 上記のサンプルは、PipeReader の一般的な問題を説明するために提供されています。The preceding sample is provided to explain PipeReader Common problems.
PipeWriterPipeWriter
PipeWriter では、呼び出し元の代わりに書き込むバッファーを管理します。The PipeWriter manages buffers for writing on the caller's behalf. PipeWriter
では、IBufferWriter<byte>
を実装します。PipeWriter
implements IBufferWriter<byte>
. IBufferWriter<byte>
は、バッファーのコピーを追加せずに、書き込みを実行するためにバッファーにアクセスできるようにします。IBufferWriter<byte>
makes it possible to get access to buffers to perform writes without additional buffer copies.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
上記のコードでは、次のようになります。The previous code:
- GetMemory を使用して、
PipeWriter
から少なくとも 5 バイトのバッファーを要求します。Requests a buffer of at least 5 bytes from thePipeWriter
using GetMemory. - ASCII 文字列である
"Hello"
のバイトを、返されたMemory<byte>
に書き込みます。Writes bytes for the ASCII string"Hello"
to the returnedMemory<byte>
. - Advance を呼び出して、バッファーに書き込まれたバイト数を示します。Calls Advance to indicate how many bytes were written to the buffer.
- 基になるデバイスにバイトを送信する、
PipeWriter
をフラッシュします。Flushes thePipeWriter
, which sends the bytes to the underlying device.
上記の書き込みメソッドでは、PipeWriter
によって提供されるバッファーが使用されています。The previous method of writing uses the buffers provided by the PipeWriter
. あるいは、PipeWriter.WriteAsync では次のようになります。Alternatively, PipeWriter.WriteAsync:
- 既存のバッファーを
PipeWriter
にコピーします。Copies the existing buffer to thePipeWriter
. GetSpan
を呼び出し、必要に応じてAdvance
を呼び出してから、FlushAsync を呼び出します。CallsGetSpan
,Advance
as appropriate and calls FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
キャンセルCancellation
FlushAsync では、CancellationToken を渡すことがサポートされます。FlushAsync supports passing a CancellationToken. CancellationToken
を渡すと、フラッシュの保留中にトークンが取り消された場合に、OperationCanceledException
となります。Passing a CancellationToken
results in an OperationCanceledException
if the token is canceled while there's a flush pending. PipeWriter.FlushAsync
では、例外を発生させることなく、PipeWriter.CancelPendingFlush を使用して、現在のフラッシュ操作を取り消す方法がサポートされます。PipeWriter.FlushAsync
supports a way to cancel the current flush operation via PipeWriter.CancelPendingFlush without raising an exception. PipeWriter.CancelPendingFlush
を呼び出すと、PipeWriter.FlushAsync
または PipeWriter.WriteAsync
の現在の呼び出しあるいは次の呼び出しで、IsCanceled
が true
に設定された FlushResult が返されます。Calling PipeWriter.CancelPendingFlush
causes the current or next call to PipeWriter.FlushAsync
or PipeWriter.WriteAsync
to return a FlushResult with IsCanceled
set to true
. これは、非破壊的で非例外的な方法で生成されるフラッシュを停止する際に役立つ場合があります。This can be useful for halting the yielding flush in a non-destructive and non-exceptional way.
PipeWriter の一般的な問題PipeWriter common problems
- GetSpan および GetMemory では、少なくとも要求された量のメモリを持つバッファーを返します。GetSpan and GetMemory return a buffer with at least the requested amount of memory. 正確なバッファー サイズを想定 しないでください。Don't assume exact buffer sizes.
- 連続する呼び出しで同じバッファーまたは同じサイズのバッファーが返される保証はありません。There's no guarantee that successive calls will return the same buffer or the same-sized buffer.
- さらにデータの書き込みを続行するには、Advance を呼び出した後に新しいバッファーを要求する必要があります。A new buffer must be requested after calling Advance to continue writing more data. 以前に取得したバッファーに書き込むことはできません。The previously acquired buffer can't be written to.
FlushAsync
の呼び出しが不完全な場合にGetMemory
またはGetSpan
を呼び出すのは安全ではありません。CallingGetMemory
orGetSpan
while there's an incomplete call toFlushAsync
isn't safe.- フラッシュされていないデータがある場合に
Complete
またはCompleteAsync
を呼び出すと、メモリが破損する可能性があります。CallingComplete
orCompleteAsync
while there's unflushed data can result in memory corruption.
IDuplexPipeIDuplexPipe
IDuplexPipe は、読み取りと書き込みの両方をサポートする種類のコントラクトです。The IDuplexPipe is a contract for types that support both reading and writing. たとえば、ネットワーク接続は IDuplexPipe
によって表されます。For example, a network connection would be represented by an IDuplexPipe
.
PipeReader
と PipeWriter
を含む Pipe
とは異なり、IDuplexPipe
は全二重接続の片側を表します。Unlike Pipe
, which contains a PipeReader
and a PipeWriter
, IDuplexPipe
represents a single side of a full duplex connection. つまり、PipeWriter
に書き込まれるものは、PipeReader
からは読み取られません。That means what is written to the PipeWriter
will not be read from the PipeReader
.
ストリームStreams
ストリーム データの読み取りまたは書き込みを行う場合、通常は、デシリアライザーを使用してデータを読み取り、シリアライザーを使用してデータを書き込みます。When reading or writing stream data, you typically read data using a de-serializer and write data using a serializer. これらの読み取りおよび書き込みストリーム API のほとんどに、Stream
パラメーターがあります。Most of these read and write stream APIs have a Stream
parameter. これらの既存の API との統合をより容易にするために、PipeReader
および PipeWriter
で AsStream メソッドが公開されます。To make it easier to integrate with these existing APIs, PipeReader
and PipeWriter
expose an AsStream method. AsStream では、PipeReader
または PipeWriter
に関する Stream
実装を返します。AsStream returns a Stream
implementation around the PipeReader
or PipeWriter
.
ストリームの例Stream example
PipeReader
および PipeWriter
インスタンスは、静的な Create
メソッドを使用し、Stream オブジェクトとそれに対応する任意の作成オプションを指定して作成できます。PipeReader
and PipeWriter
instances can be created using the static Create
methods given a Stream object and optional corresponding creation options.
StreamPipeReaderOptions では、次のパラメーターを使用して PipeReader
インスタンスの作成を制御できます。The StreamPipeReaderOptions allow for control over the creation of the PipeReader
instance with the following parameters:
- StreamPipeReaderOptions.BufferSize はプールからメモリを借りるときに使用される最小バッファー サイズ (バイト単位) であり、既定値は
4096
です。StreamPipeReaderOptions.BufferSize is the minimum buffer size in bytes used when renting memory from the pool, and defaults to4096
. - StreamPipeReaderOptions.LeaveOpen フラグによって、
PipeReader
の完了後、基礎となるストリームを開いたままにするかどうかが決定されます。既定値はfalse
です。StreamPipeReaderOptions.LeaveOpen flag determines whether or not the underlying stream is left open after thePipeReader
completes, and defaults tofalse
. - StreamPipeReaderOptions.MinimumReadSize は、新しいバッファーが割り当てられる前のバッファー内に残るバイト数のしきい値を表します。既定値は
1024
です。StreamPipeReaderOptions.MinimumReadSize represents the threshold of remaining bytes in the buffer before a new buffer is allocated, and defaults to1024
. - StreamPipeReaderOptions.Pool はメモリを割り当てるときに使用される
MemoryPool<byte>
であり、既定値はnull
です。StreamPipeReaderOptions.Pool is theMemoryPool<byte>
used when allocating memory, and defaults tonull
.
StreamPipeWriterOptions では、次のパラメーターを使用して PipeWriter
インスタンスの作成を制御できます。The StreamPipeWriterOptions allow for control over the creation of the PipeWriter
instance with the following parameters:
- StreamPipeWriterOptions.LeaveOpen フラグによって、
PipeWriter
の完了後、基礎となるストリームを開いたままにするかどうかが決定されます。既定値はfalse
です。StreamPipeWriterOptions.LeaveOpen flag determines whether or not the underlying stream is left open after thePipeWriter
completes, and defaults tofalse
. - StreamPipeWriterOptions.MinimumBufferSize は、Pool からメモリをレンタルしているときに使用する最小バッファー サイズを表します。既定値は
4096
です。StreamPipeWriterOptions.MinimumBufferSize represents the minimum buffer size to use when renting memory from the Pool, and defaults to4096
. - StreamPipeWriterOptions.Pool はメモリを割り当てるときに使用される
MemoryPool<byte>
であり、既定値はnull
です。StreamPipeWriterOptions.Pool is theMemoryPool<byte>
used when allocating memory, and defaults tonull
.
重要
Create
メソッドを使用して PipeReader
および PipeWriter
インスタンスを作成するとき、Stream
オブジェクトの有効期間を考慮する必要があります。When creating PipeReader
and PipeWriter
instances using the Create
methods, you need to consider the Stream
object lifetime. リーダーまたはライターでストリームの利用が終わった後にストリームにアクセスする必要がある場合、作成オプションで LeaveOpen
フラグを true
に設定する必要があります。If you need access to the stream after the reader or writer is done with it, you'll need to set the LeaveOpen
flag to true
on the creation options. 設定しない場合、ストリームは閉じられます。Otherwise, the stream will be closed.
次のコードでは、ストリームからの Create
メソッドを利用し、PipeReader
および PipeWriter
インスタンスが作成されます。The following code demonstrates the creation of PipeReader
and PipeWriter
instances using the Create
methods from a stream.
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseMessage(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseMessage(
ref ReadOnlySequence<byte> buffer,
out string message) =>
(message = Encoding.ASCII.GetString(buffer)) != null;
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
このアプリケーションによって StreamReader が使用され、ストリームとして lorem-ipsum.txt ファイルが読み取られます。The application uses a StreamReader to read the lorem-ipsum.txt file as a stream. FileStream は、PipeReader
オブジェクトをインスタンス化する PipeReader.Create に渡されます。The FileStream is passed to PipeReader.Create, which instantiates a PipeReader
object. 次に、コンソール アプリケーションから Console.OpenStandardOutput() を使用して PipeWriter.Create にその標準出力ストリームが渡されます。The console application then passes its standard output stream to PipeWriter.Create using Console.OpenStandardOutput(). この例ではキャンセルがサポートされます。The example supports cancellation.