System.IO. .net ' te Pipelines
System.IO.Pipelines , .NET ortamında yüksek performanslı g/ç 'yi daha kolay hale getirmek için tasarlanan bir kitaplıktır. Bu, tüm .NET uygulamalarında çalışacak .NET Standard hedefleme bir kitaplıktır.
Sorun System.IO. Pipelines çöz
Akış verilerini ayrıştırmaya yönelik uygulamalar, çok sayıda özel ve olağandışı kod akışına sahip ortak koddan oluşur. Ortak ve özel durum kodu karmaşık ve devam etmek zordur.
System.IO.Pipelines Şu şekilde tasarlanmıştır:
- Akış verilerinin yüksek performans ayrıştırması vardır.
- Kod karmaşıklığını azaltın.
Aşağıdaki kod, bir istemciden satır sınırlı iletileri (ile ayrılmış) alan bir TCP sunucusu için tipik bir noktadır '\n' :
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
Yukarıdaki kodda birkaç sorun vardır:
- Tüm ileti (satır sonu), için tek bir çağrıda alınmayabilir
ReadAsync. - Sonucu yok sayılıyor
stream.ReadAsync.stream.ReadAsyncne kadar veri okunduğunu döndürür. - Birden çok satırın tek bir çağrıda okunduğu durumu işlemez
ReadAsync. byteHer okunan bir diziyi ayırır.
Yukarıdaki sorunları onarmak için aşağıdaki değişiklikler gereklidir:
Yeni bir satır bulunana kadar gelen verileri arabelleğe koyun.
Arabellekte döndürülen tüm satırları ayrıştırın.
Satır 1 KB 'den büyük (1024 bayt) olabilir. Bu kodun, arabelleğin içindeki tamamlanma satırına sığması için sınırlayıcı bulunana kadar giriş arabelleğini yeniden boyutlandırması gerekir.
- Arabellek yeniden boyutlandırılırsa, girişte daha uzun çizgiler göründüğü sürece daha fazla arabellek kopyası yapılır.
- Harcanan alanı azaltmak için, satırları okumak için kullanılan arabelleği sıkıştırın.
Belleği sürekli ayırmayı önlemek için arabellek havuzu kullanmayı düşünün.
Aşağıdaki kod bu sorunlardan bazılarını ele alınmaktadır:
async Task ProcessLinesAsync(NetworkStream stream)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
// Calculate the amount of bytes remaining in the buffer.
var bytesRemaining = buffer.Length - bytesBuffered;
if (bytesRemaining == 0)
{
// Double the buffer size and copy the previously buffered data into the new buffer.
var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
// Return the old buffer to the pool.
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
bytesRemaining = buffer.Length - bytesBuffered;
}
var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
if (bytesRead == 0)
{
// EOF
break;
}
// Keep track of the amount of buffered bytes.
bytesBuffered += bytesRead;
var linePosition = -1;
do
{
// Look for a EOL in the buffered data.
linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
bytesBuffered - bytesConsumed);
if (linePosition >= 0)
{
// Calculate the length of the line based on the offset.
var lineLength = linePosition - bytesConsumed;
// Process the line.
ProcessLine(buffer, bytesConsumed, lineLength);
// Move the bytesConsumed to skip past the line consumed (including \n).
bytesConsumed += lineLength + 1;
}
}
while (linePosition >= 0);
}
}
Önceki kod karmaşıktır ve tanımlanan tüm sorunları gidermez. Yüksek performanslı ağ genellikle performansı en üst düzeye çıkarmak için karmaşık kod yazmak anlamına gelir. System.IO.Pipelines Bu tür bir kodu daha kolay yazmak için tasarlandı.
Ingilizce dışındaki dillere çevrilmiş kod açıklamalarını görmek isterseniz, Bu GitHub tartışma sorununubize tanıyın.
Kapatıldığı
PipeSınıf, bir çift oluşturmak için kullanılabilir PipeWriter/PipeReader . İçine yazılan tüm veriler PipeWriter ' de kullanılabilir PipeReader :
var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;
Kanal temel kullanımı
async Task ProcessLinesAsync(Socket socket)
{
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Allocate at least 512 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
try
{
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket.
writer.Advance(bytesRead);
}
catch (Exception ex)
{
LogError(ex);
break;
}
// Make the data available to the PipeReader.
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// By completing PipeWriter, tell the PipeReader that there's no more data coming.
await writer.CompleteAsync();
}
async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
// Process the line.
ProcessLine(line);
}
// Tell the PipeReader how much of the buffer has been consumed.
reader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
// Mark the PipeReader as complete.
await reader.CompleteAsync();
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
// Look for a EOL in the buffer.
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
// Skip the line + the \n.
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
İki döngü vardır:
FillPipeAsyncöğesinden okurSocketve öğesine yazarPipeWriter.ReadPipeAsyncöğesinden okurPipeReaderve gelen satırları ayrıştırır.
Ayrılmış açık arabellek yok. Tüm arabellek yönetimi, PipeReader ve uygulamalarına Temsilcili PipeWriter . Arabellek yönetimi için temsilci seçme, kodun yalnızca iş mantığına odaklanılmasını kolaylaştırır.
İlk döngüde:
- PipeWriter.GetMemory(Int32) , temel alınan yazıcıya bellek almak için çağrılır.
- PipeWriter.Advance(Int32) ,
PipeWriterarabelleğe ne kadar veri yazıldığını bildirmek için çağırılır. - PipeWriter.FlushAsync , verileri için kullanılabilir hale getirmek üzere çağırılır
PipeReader.
İkinci döngüde, PipeReader tarafından yazılan arabellekleri kullanır PipeWriter . Arabellekler yuvadan gelir. Çağrısı PipeReader.ReadAsync :
ReadResultİki önemli bilgi parçasını içeren bir döndürür:
- Biçiminde okunan veriler
ReadOnlySequence<byte>. IsCompletedVerilerin sonuna (EOF) ulaşıldığını gösteren bir Boole değeri.
- Biçiminde okunan veriler
Satır sonu (EOL) sınırlayıcısı bulduktan ve satırı ayrıştırdıktan sonra:
- Mantık, zaten işlenmiş olan öğeleri atlamak için arabelleği işler.
PipeReader.AdvanceTonePipeReaderkadar veri tüketiğini ve incelendiğini söylemek için çağırılır.
Okuyucu ve yazıcı döngüleri, çağırarak biter Complete . Complete temeldeki kanalın ayrılan belleği serbest bırakmasına olanak tanır.
Arka basınç ve akış denetimi
İdeal olarak, birlikte iş okuma ve ayrıştırma:
- Yazma iş parçacığı ağdan verileri tüketir ve arabelleğe koyar.
- Ayrıştırma iş parçacığı, uygun veri yapılarını oluşturmaktan sorumludur.
Genellikle, ayrıştırma yalnızca ağdan veri bloklarını kopyalamaya kıyasla daha fazla zaman alır:
- Okuma iş parçacığı ayrıştırma iş parçacığının önüne alınır.
- Okuma iş parçacığı, ayrıştırma iş parçacığı için verileri depolamak üzere yavaşlıyor veya daha fazla bellek ayırmıştır.
En iyi performans için, sık duraklamalar ve daha fazla bellek ayırma arasında bir denge vardır.
Önceki sorunu çözmek için, Pipe veri akışını denetlemek üzere iki ayarı vardır:
- PauseWriterThreshold: Duraklatma çağrılarına önce ne kadar veri ara belleğe alınacağını belirler FlushAsync .
- ResumeWriterThreshold: Okuyucunun sürdürülmeye yönelik çağrılardan önce ne kadar veri gözlemleyecek olduğunu belirler
PipeWriter.FlushAsync.

ValueTask<FlushResult>Kesiştiği içindeki veri miktarı için bir tamamlanmamış döndürürPipePauseWriterThreshold.- Daha
ValueTask<FlushResult>düşük hale geldiğinde tamamlanırResumeWriterThreshold.
Tek bir değer kullanılırsa oluşabilecek hızlı döngüyü engellemek için iki değer kullanılır.
Örnekler
// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);
PipeScheduler
Genellikle ve kullanılırken async await , zaman uyumsuz kod bir veya geçerli olarak devam eder TaskScheduler SynchronizationContext .
G/ç yaparken, g/ç 'nin gerçekleştirildiği noktada ayrıntılı denetime sahip olmak önemlidir. Bu denetim, CPU önbelleklerinin etkin bir şekilde yararlanmasını sağlar. Verimli önbelleğe alma, Web sunucuları gibi yüksek performanslı uygulamalar için kritik öneme sahiptir. PipeScheduler zaman uyumsuz geri çağırmaların çalıştığı konum üzerinde denetim sağlar. Varsayılan olarak:
- Geçerli, SynchronizationContext kullanılır.
- Hayır ise
SynchronizationContext, geri çağırmaları çalıştırmak için iş parçacığı havuzunu kullanır.
public static void Main(string[] args)
{
var writeScheduler = new SingleThreadPipeScheduler();
var readScheduler = new SingleThreadPipeScheduler();
// Tell the Pipe what schedulers to use and disable the SynchronizationContext.
var options = new PipeOptions(readerScheduler: readScheduler,
writerScheduler: writeScheduler,
useSynchronizationContext: false);
var pipe = new Pipe(options);
}
// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
new BlockingCollection<(Action<object> Action, object State)>();
private readonly Thread _thread;
public SingleThreadPipeScheduler()
{
_thread = new Thread(DoWork);
_thread.Start();
}
private void DoWork()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
item.Action(item.State);
}
}
public override void Schedule(Action<object> action, object state)
{
_queue.Add((action, state));
}
}
Pipescheduler. ThreadPool , PipeScheduler iş parçacığı havuzuna geri çağırmaları sıraya alan uygulamasıdır. PipeScheduler.ThreadPool Varsayılan ve genellikle en iyi seçenektir. Pipescheduler. Inline , kilitlenme gibi istenmeyen sonuçlara neden olabilir.
Kanal sıfırlama
Nesneyi yeniden kullanmak sıklıkla etkilidir Pipe . Kanalı sıfırlamak için PipeReader Reset hem hem de PipeReader tamamlandığında çağırın PipeWriter .
Piypereader
PipeReader arayan adına belleği yönetir. Çağrıldıktan sonra her zaman çağırın PipeReader.AdvanceTo PipeReader.ReadAsync . Bu, PipeReader çağıranın bellek ile ne zaman yapıldığını, böylece izlenebilmesini sağlar. ReadOnlySequence<byte>Döndürülen öğesinden PipeReader.ReadAsync yalnızca öğesine çağrı yapılıncaya kadar geçerlidir PipeReader.AdvanceTo . Çağrıldıktan sonra kullanım geçersizdir ReadOnlySequence<byte> PipeReader.AdvanceTo .
PipeReader.AdvanceTo iki SequencePosition bağımsız değişken alır:
- İlk bağımsız değişken, ne kadar bellek tüketildiğini belirler.
- İkinci bağımsız değişken, arabelleğin ne kadarının gözlemlendiğini belirler.
Verileri tüketilen olarak işaretlemek, kanalın belleği temel alınan arabellek havuzuna döndürebileceği anlamına gelir. Verileri gözlemlenen olarak işaretlemek, sonraki çağrının ne olduğunu denetler PipeReader.ReadAsync . Her şeyi gözlemlenen olarak işaretlemek, bir sonraki çağrının PipeReader.ReadAsync kanala yazılmış daha fazla veri olana kadar dönemeyeceği anlamına gelir. Diğer herhangi bir değer, sonraki çağrının PipeReader.ReadAsync gözlemlenen ve gözlemlenen verilerle hemen döndürülmesini sağlar, ancak daha önce tüketilen verileri değil.
Akış verilerini okuma senaryoları
Akış verilerini okumaya çalışırken ortaya çıktı olan birkaç tipik desen vardır:
- Veri akışı verildiğinde, tek bir iletiyi ayrıştırır.
- Veri akışı verildiğinde, tüm kullanılabilir iletileri ayrıştırır.
Aşağıdaki örneklerde, ' TryParseLines dan ileti ayrıştırma yöntemi kullanılmaktadır ReadOnlySequence<byte> . TryParseLines tek bir iletiyi ayrıştırır ve ayrıştırılmış iletiyi arabellekten kırpmak için giriş arabelleğini güncelleştirir. TryParseLines .NET ' in parçası değildir, aşağıdaki bölümlerde kullanılan bir kullanıcı yazılmış yöntemidir.
bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);
Tek bir iletiyi okuyun
Aşağıdaki kod, bir öğesinden tek bir ileti okur PipeReader ve bunu çağırana döndürür.
async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader,
CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
// In the event that no message is parsed successfully, mark consumed
// as nothing and examined as the entire buffer.
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (TryParseLines(ref buffer, out Message message))
{
// A single message was successfully parsed so mark the start of the
// parsed buffer as consumed. TryParseLines trims the buffer to
// point to the data after the message was parsed.
consumed = buffer.Start;
// Examined is marked the same as consumed here, so the next call
// to ReadSingleMessageAsync will process the next message if there's
// one.
examined = consumed;
return message;
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
return null;
}
Yukarıdaki kod:
- Tek bir ileti ayrıştırır.
- Tüketilen
SequencePositionve incelenen,SequencePositionkırpılan giriş arabelleğinin başlangıcını işaret eden şekilde güncelleştirir.
İki SequencePosition bağımsız değişken güncelleştirilir çünkü bu, TryParseLines ayrıştırılmış iletiyi giriş arabelleğinden kaldırır. Genellikle, arabellekteki tek bir ileti ayrıştırılırken, incelenen konum aşağıdakilerden biri olmalıdır:
- İletinin sonu.
- İleti bulunmazsa alınan arabelleğin sonu.
Tek ileti durumunun hata için en olası olasılığı vardır. Yanlış değerleri incelenmeye geçirmek, yetersiz bellek özel durumu veya sonsuz bir döngüye neden olabilir. Daha fazla bilgi için bu makaledeki Pıpereader ortak sorunlar bölümüne bakın.
Birden çok ileti okunuyor
Aşağıdaki kod, PipeReader her bir üzerinde ve çağrılarındaki tüm iletileri okur ProcessMessageAsync .
async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
if (buffer.Length > 0)
{
// The message is incomplete and there's no more data to process.
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
İptal
PipeReader.ReadAsync:
- , Geçişini destekler CancellationToken .
- Bir okuma beklemede olduğunda, bir iptal edilirse bir oluşturur OperationCanceledException
CancellationToken. - PipeReader.CancelPendingRead, Bir özel durumun çıkarılmasını engelleyen geçerli okuma işlemini iptal etmek için bir yol destekler. Çağırma
PipeReader.CancelPendingRead, geçerli veya sonraki çağrısının 'PipeReader.ReadAsynca ayarlanmış olarak dönüşmesine neden olur ReadResultIsCanceledtrue. Bu, var olan okuma döngüsünü bozucu olmayan ve olağanüstü olmayan bir şekilde sonlandırmanız için yararlı olabilir.
private PipeReader reader;
public MyConnection(PipeReader reader)
{
this.reader = reader;
}
public void Abort()
{
// Cancel the pending read so the process loop ends without an exception.
reader.CancelPendingRead();
}
public async Task ProcessMessagesAsync()
{
try
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
// The read was canceled. You can quit without reading the existing data.
break;
}
// Process all messages from the buffer, modifying the input buffer on each
// iteration.
while (TryParseLines(ref buffer, out Message message))
{
await ProcessMessageAsync(message);
}
// There's no more data to be processed.
if (result.IsCompleted)
{
break;
}
}
finally
{
// Since all messages in the buffer are being processed, you can use the
// remaining buffer's Start and End position to determine consumed and examined.
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
finally
{
await reader.CompleteAsync();
}
}
Piypereader yaygın sorunlar
Yanlış değerleri veya ' a geçirmek,
consumedexaminedzaten okuma verilerinin okunmasına neden olabilir.buffer.Endİncelenen şekilde geçirme şu şekilde olabilir:- Durdurulmuş veriler
- Veriler tüketilmemişse, büyük olasılıkla yetersiz bellek (OOM) özel durumu. Örneğin,
PipeReader.AdvanceTo(position, buffer.End)arabellekteki bir anda tek bir ileti işlenirken.
Yanlış değerleri ' a geçirme
consumedveyaexaminedsonsuz bir döngüye neden olabilir. Örneğin,PipeReader.AdvanceTo(buffer.Start)buffer.Starthenüz değiştirilmediyse,PipeReader.ReadAsyncYeni veri ulaşmadan hemen önce geri dönmesi için bir sonraki çağrıya neden olur.Yanlış değerleri ' a geçirme
consumedveyaexaminedsonsuz arabelleğe alma (NIHAI OOM) ile sonuçlanabilir.Çağrıldıktan
ReadOnlySequence<byte>sonra kullanılması,PipeReader.AdvanceTobellek bozulmasına yol açabilir (ücretsiz olarak kullanabilirsiniz).Çağrı başarısız
PipeReader.Complete/CompleteAsyncolabilir ve bellek sızıntısına yol açabilir.ReadResult.IsCompletedVeri kaybına neden olan arabellek sonuçlarını işlemeden önce okuma mantığı denetleniyor ve çıkılıyor. Döngü çıkış koşulunun ve tabanlı olması gerekir
ReadResult.Buffer.IsEmptyReadResult.IsCompleted. Bunu yanlış yapmak sonsuz bir döngüye neden olabilir.
Sorunlu kod
❌ Veri kaybı
, ReadResult Olarak ayarlandığında verilerin son segmentini döndürebilir IsCompleted true . Okuma döngüsünden çıkmadan önce bu verilerin okunmamasından dolayı veri kaybı olur.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> dataLossBuffer = result.Buffer;
if (result.IsCompleted)
break;
Process(ref dataLossBuffer, out Message message);
reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.
❌Sonsuz döngü
Aşağıdaki mantık,, Result.IsCompleted true ancak arabellekte hiç tamamlanmamış bir ileti varsa sonsuz döngüye neden olabilir.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
break;
Process(ref infiniteLoopBuffer, out Message message);
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.
Aynı sorunu içeren başka bir kod parçası aşağıda verilmiştir. Denetlemeden önce boş olmayan bir arabellek denetleniyor ReadResult.IsCompleted . Bir içinde olduğu için else if , arabellekte hiç bir ileti yoksa sonsuza kadar döngüye alınır.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
if (!infiniteLoopBuffer.IsEmpty)
Process(ref infiniteLoopBuffer, out Message message);
else if (result.IsCompleted)
break;
reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.
❌Yanıt vermeyen uygulama
Konumda Koşullu çağrı PipeReader.AdvanceTo buffer.End , examined tek bir ileti ayrıştırılırken uygulamanın yanıt vermemesine neden olabilir. Sonraki çağrısı şu PipeReader.AdvanceTo kadar geri dönemeyecek:
- Kanala yazılan daha fazla veri var.
- Ve yeni veriler daha önce incelendi.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> hangBuffer = result.Buffer;
Process(ref hangBuffer, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);
if (message != null)
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.
❌Yetersiz bellek (OOM)
Aşağıdaki koşullara göre aşağıdaki kod arabelleğe alma işlemi gerçekleşene kadar devam eder OutOfMemoryException :
- En büyük ileti boyutu yok.
- Öğesinden döndürülen veriler,
PipeReaderbir ileti yapmaz. Örneğin, diğer taraf büyük bir ileti (örneğin, 4 GB bir ileti) yazıldığı için, bu tam bir ileti yapmaz.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.
Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;
Process(ref thisCouldOutOfMemory, out Message message);
if (result.IsCompleted)
break;
reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);
if (message != null)
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.
❌Bellek bozulması
Arabelleği okuyan yardımcıları yazarken, çağrılmadan önce döndürülen tüm yükün kopyalanması gerekir Advance . Aşağıdaki örnek, atılan belleği döndürür ve bir Pipe sonraki işlem (okuma/yazma) için onu tekrar kullanabilir.
Uyarı
Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.
public class Message
{
public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
Environment.FailFast("This code is terrible, don't use it!");
Message message = null;
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;
ReadHeader(ref buffer, out int length);
if (length <= buffer.Length)
{
message = new Message
{
// Slice the payload from the existing buffer
CorruptedPayload = buffer.Slice(0, length)
};
buffer = buffer.Slice(length);
}
if (result.IsCompleted)
break;
reader.AdvanceTo(buffer.Start, buffer.End);
if (message != null)
{
// This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
// was captured.
break;
}
}
return message;
}
Uyarı
Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.
PipeWriter
PipeWriterArayan adına yazmak için arabellekleri yönetir. PipeWriter uygular IBufferWriter<byte> . IBufferWriter<byte> ek arabellek kopyaları olmadan yazma işlemleri gerçekleştirmek için arabelleklere erişim sağlamak mümkün hale gelir.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
// Request at least 5 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(5);
// Write directly into the buffer.
int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);
// Tell the writer how many bytes were written.
writer.Advance(written);
await writer.FlushAsync(cancellationToken);
}
Önceki kod:
- Kullanarak en az 5 baytlık bir arabellek ister
PipeWriterGetMemory . - ASCII dizesinin bayt sayısını
"Hello"döndürülen öğesine yazarMemory<byte>. - AdvanceArabelleğe kaç bayt yazıldığını belirten çağrılar.
- ,
PipeWriterBaytları temeldeki cihaza gönderen öğesini temizler.
Önceki yazma yöntemi, tarafından sağlanmış arabellekleri kullanır PipeWriter . Ayrıca, bu da kullanılabilir PipeWriter.WriteAsync :
- Varolan arabelleği öğesine kopyalar
PipeWriter. GetSpanAdvanceUygun ve çağrılar için çağrılar FlushAsync .
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");
// Write helloBytes to the writer, there's no need to call Advance here
// (Write does that).
await writer.WriteAsync(helloBytes, cancellationToken);
}
İptal
FlushAsync , geçişini destekler CancellationToken . Bekleyen bir CancellationToken OperationCanceledException Temizleme işlemi varken belirteç iptal edilirse bir sonuçları bir ile geçirme. PipeWriter.FlushAsync , özel durum oluşturmadan geçerli temizleme işlemini iptal etmenin bir yolunu destekler PipeWriter.CancelPendingFlush . Çağırma PipeWriter.CancelPendingFlush , geçerli veya sonraki çağrıya, PipeWriter.FlushAsync veya olarak PipeWriter.WriteAsync ayarlanmış olarak dönüşmesine neden olur FlushResult IsCanceled true . Bu, bozucu olmayan ve olağanüstü olmayan bir şekilde boşaltmayı Temizleme için yararlı olabilir.
PipeWriter ortak sorunları
- GetSpan ve GetMemory en az istenen bellek miktarına sahip bir arabellek döndürür. Tam arabellek boyutlarını kabul etmeyin .
- Art arda yapılan çağrıların aynı arabelleğe veya aynı boyutlu arabelleğe Döneceğinin garantisi yoktur.
- AdvanceDaha fazla veri yazmaya devam etmek için çağrıldıktan sonra yeni bir arabellek istenmesi gerekir. Daha önce edinilen arabelleğin üzerine yazılamaz.
GetMemoryGetSpanEksik bir çağrı olduğu için veya çağrısı sırasındaFlushAsyncgüvenli değildir.CompleteCompleteAsyncVerilerin temizlenme sırasında veya boşaltılmasından sonra bellek bozulmasına yol açabilir.
IDuplexPipe
, IDuplexPipe Hem okumayı hem de yazmayı destekleyen türler için bir sözleşmedir. Örneğin, bir ağ bağlantısı bir tarafından temsil edilir IDuplexPipe .
PipeVe içeren öğelerinden farklı olarak PipeReader PipeWriter , IDuplexPipe tam çift yönlü bağlantının tek tarafını temsil eder. Bu, üzerine yazılan ve ' PipeWriter den okunmayacağı anlamına gelir PipeReader .
Akışlar
Akış verilerini okurken veya yazarken genellikle seri hale getirici kullanarak verileri okur ve serileştirici kullanarak verileri yazın. Bu okuma ve yazma akışı API 'Lerinin çoğu bir Stream parametreye sahiptir. Bu var olan API 'lerle tümleştirmeyi kolaylaştırmak PipeReader ve PipeWriter bir yöntemi ortaya çıkarmak için AsStream . AsStreamStreamveya etrafında bir uygulama döndürür PipeReader PipeWriter .
Akış örneği
PipeReader ve PipeWriter örnekleri, bir nesne ve isteğe Create bağlı olarak karşılık gelen oluşturma seçenekleri verilen statik yöntemler kullanılarak Stream oluşturulabilir.
StreamPipeReaderOptionsAşağıdaki parametrelerle örneğin oluşturulması üzerinde PipeReader denetime olanak sağlar:
- StreamPipeReaderOptions.BufferSize , havuzdan bellek kiralarken kullanılan bayt cinsinden en düşük arabellek boyutudur ve varsayılan olarak değerine
4096sahiptir. - StreamPipeReaderOptions.LeaveOpen bayrağı, temel alınan akışın tamamlandıktan sonra açık bırak isteyip olmadığını belirler ve varsayılan
PipeReaderolarak olarakfalsekullanılır. - StreamPipeReaderOptions.MinimumReadSize , yeni bir arabellek ayrılmadan önce arabellekte kalan bayt eşiğini temsil eder ve varsayılan olarak olarak
1024kullanılır. - StreamPipeReaderOptions.Pool ,
MemoryPool<byte>bellek için kullanılırken kullanılır ve varsayılan olaraknullkullanılır.
StreamPipeWriterOptionsAşağıdaki parametrelerle örneğin oluşturulması üzerinde PipeWriter denetime olanak sağlar:
- StreamPipeWriterOptions.LeaveOpen bayrağı, temel alınan akışın tamamlandıktan sonra açık bırak isteyip olmadığını belirler ve varsayılan
PipeWriterolarak olarakfalsekullanılır. - StreamPipeWriterOptions.MinimumBufferSize , 'den bellek kiralarken kullanmak üzere en düşük arabellek boyutunu temsil eder Pool ve varsayılan olarak değerine
4096ayarlar. - StreamPipeWriterOptions.Pool ,
MemoryPool<byte>bellek için kullanılırken kullanılır ve varsayılan olaraknullkullanılır.
Önemli
yöntemlerini PipeReader kullanarak PipeWriter ve örnekleri oluştururken nesne ömrünü göz önünde Create Stream bulundurabilirsiniz. Okuyucu veya yazıcı ile işlemi tamamladikten sonra akışa erişmeniz gerekirse oluşturma seçenekleri için LeaveOpen true bayrağını olarak ayarlayabilirsiniz. Aksi takdirde akış kapatılır.
Aşağıdaki kod, bir akıştan PipeReader yöntemler PipeWriter kullanılarak ve Create örneklerinin oluşturulmasını gösteriyor.
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
using var stream = File.OpenRead("lorem-ipsum.txt");
var reader = PipeReader.Create(stream);
var writer = PipeWriter.Create(
Console.OpenStandardOutput(),
new StreamPipeWriterOptions(leaveOpen: true));
WriteUserCancellationPrompt();
var processMessagesTask = ProcessMessagesAsync(reader, writer);
var userCanceled = false;
var cancelProcessingTask = Task.Run(() =>
{
while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
{
WriteUserCancellationPrompt();
}
userCanceled = true;
// No exceptions thrown
reader.CancelPendingRead();
writer.CancelPendingFlush();
});
await Task.WhenAny(cancelProcessingTask, processMessagesTask);
Console.WriteLine(
$"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
}
static void WriteUserCancellationPrompt() =>
Console.WriteLine("Press 'C' to cancel processing...\n");
static async Task ProcessMessagesAsync(
PipeReader reader,
PipeWriter writer)
{
try
{
while (true)
{
ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
try
{
if (readResult.IsCanceled)
{
break;
}
if (TryParseLines(ref buffer, out string message))
{
FlushResult flushResult =
await WriteMessagesAsync(writer, message);
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
if (readResult.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Incomplete message.");
}
break;
}
}
finally
{
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
static bool TryParseLines(
ref ReadOnlySequence<byte> buffer,
out string message)
{
SequencePosition? position;
StringBuilder outputMessage = new();
while(true)
{
position = buffer.PositionOf((byte)'\n');
if (!position.HasValue)
break;
outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
.AppendLine();
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
};
message = outputMessage.ToString();
return message.Length != 0;
}
static ValueTask<FlushResult> WriteMessagesAsync(
PipeWriter writer,
string message) =>
writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}
Uygulama, bir akış olaraklorem-ipsum.txtiçin bir kullanır ve boş StreamReader bir satırla bitebilir. FileStream, nesnesine PipeReader.Create geçirildi. PipeReader Konsol uygulaması daha sonra kullanarak standart çıkış akışını 'a PipeWriter.Create Console.OpenStandardOutput() iletir. Örnek iptali destekler.