System.IO. .net ' te Pipelines

System.IO.Pipelines , .NET ortamında yüksek performanslı g/ç 'yi daha kolay hale getirmek için tasarlanan bir kitaplıktır. Bu, tüm .NET uygulamalarında çalışacak .NET Standard hedefleme bir kitaplıktır.

Sorun System.IO. Pipelines çöz

Akış verilerini ayrıştırmaya yönelik uygulamalar, çok sayıda özel ve olağandışı kod akışına sahip ortak koddan oluşur. Ortak ve özel durum kodu karmaşık ve devam etmek zordur.

System.IO.Pipelines Şu şekilde tasarlanmıştır:

  • Akış verilerinin yüksek performans ayrıştırması vardır.
  • Kod karmaşıklığını azaltın.

Aşağıdaki kod, bir istemciden satır sınırlı iletileri (ile ayrılmış) alan bir TCP sunucusu için tipik bir noktadır '\n' :

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Yukarıdaki kodda birkaç sorun vardır:

  • Tüm ileti (satır sonu), için tek bir çağrıda alınmayabilir ReadAsync .
  • Sonucu yok sayılıyor stream.ReadAsync . stream.ReadAsync ne kadar veri okunduğunu döndürür.
  • Birden çok satırın tek bir çağrıda okunduğu durumu işlemez ReadAsync .
  • byteHer okunan bir diziyi ayırır.

Yukarıdaki sorunları onarmak için aşağıdaki değişiklikler gereklidir:

  • Yeni bir satır bulunana kadar gelen verileri arabelleğe koyun.

  • Arabellekte döndürülen tüm satırları ayrıştırın.

  • Satır 1 KB 'den büyük (1024 bayt) olabilir. Bu kodun, arabelleğin içindeki tamamlanma satırına sığması için sınırlayıcı bulunana kadar giriş arabelleğini yeniden boyutlandırması gerekir.

    • Arabellek yeniden boyutlandırılırsa, girişte daha uzun çizgiler göründüğü sürece daha fazla arabellek kopyası yapılır.
    • Harcanan alanı azaltmak için, satırları okumak için kullanılan arabelleği sıkıştırın.
  • Belleği sürekli ayırmayı önlemek için arabellek havuzu kullanmayı düşünün.

  • Aşağıdaki kod bu sorunlardan bazılarını ele alınmaktadır:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Önceki kod karmaşıktır ve tanımlanan tüm sorunları gidermez. Yüksek performanslı ağ genellikle performansı en üst düzeye çıkarmak için karmaşık kod yazmak anlamına gelir. System.IO.Pipelines Bu tür bir kodu daha kolay yazmak için tasarlandı.

Ingilizce dışındaki dillere çevrilmiş kod açıklamalarını görmek isterseniz, Bu GitHub tartışma sorununubize tanıyın.

Kapatıldığı

PipeSınıf, bir çift oluşturmak için kullanılabilir PipeWriter/PipeReader . İçine yazılan tüm veriler PipeWriter ' de kullanılabilir PipeReader :

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Kanal temel kullanımı

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

İki döngü vardır:

  • FillPipeAsync öğesinden okur Socket ve öğesine yazar PipeWriter .
  • ReadPipeAsync öğesinden okur PipeReader ve gelen satırları ayrıştırır.

Ayrılmış açık arabellek yok. Tüm arabellek yönetimi, PipeReader ve uygulamalarına Temsilcili PipeWriter . Arabellek yönetimi için temsilci seçme, kodun yalnızca iş mantığına odaklanılmasını kolaylaştırır.

İlk döngüde:

İkinci döngüde, PipeReader tarafından yazılan arabellekleri kullanır PipeWriter . Arabellekler yuvadan gelir. Çağrısı PipeReader.ReadAsync :

  • ReadResultİki önemli bilgi parçasını içeren bir döndürür:

    • Biçiminde okunan veriler ReadOnlySequence<byte> .
    • IsCompletedVerilerin sonuna (EOF) ulaşıldığını gösteren bir Boole değeri.

Satır sonu (EOL) sınırlayıcısı bulduktan ve satırı ayrıştırdıktan sonra:

  • Mantık, zaten işlenmiş olan öğeleri atlamak için arabelleği işler.
  • PipeReader.AdvanceTo ne PipeReader kadar veri tüketiğini ve incelendiğini söylemek için çağırılır.

Okuyucu ve yazıcı döngüleri, çağırarak biter Complete . Complete temeldeki kanalın ayrılan belleği serbest bırakmasına olanak tanır.

Arka basınç ve akış denetimi

İdeal olarak, birlikte iş okuma ve ayrıştırma:

  • Yazma iş parçacığı ağdan verileri tüketir ve arabelleğe koyar.
  • Ayrıştırma iş parçacığı, uygun veri yapılarını oluşturmaktan sorumludur.

Genellikle, ayrıştırma yalnızca ağdan veri bloklarını kopyalamaya kıyasla daha fazla zaman alır:

  • Okuma iş parçacığı ayrıştırma iş parçacığının önüne alınır.
  • Okuma iş parçacığı, ayrıştırma iş parçacığı için verileri depolamak üzere yavaşlıyor veya daha fazla bellek ayırmıştır.

En iyi performans için, sık duraklamalar ve daha fazla bellek ayırma arasında bir denge vardır.

Önceki sorunu çözmek için, Pipe veri akışını denetlemek üzere iki ayarı vardır:

  • PauseWriterThreshold: Duraklatma çağrılarına önce ne kadar veri ara belleğe alınacağını belirler FlushAsync .
  • ResumeWriterThreshold: Okuyucunun sürdürülmeye yönelik çağrılardan önce ne kadar veri gözlemleyecek olduğunu belirler PipeWriter.FlushAsync .

ResumeWriterThreshold ve PauseWriterThreshold ile diyagram

PipeWriter.FlushAsync:

  • ValueTask<FlushResult>Kesiştiği içindeki veri miktarı için bir tamamlanmamış döndürür Pipe PauseWriterThreshold .
  • Daha ValueTask<FlushResult> düşük hale geldiğinde tamamlanır ResumeWriterThreshold .

Tek bir değer kullanılırsa oluşabilecek hızlı döngüyü engellemek için iki değer kullanılır.

Örnekler

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Genellikle ve kullanılırken async await , zaman uyumsuz kod bir veya geçerli olarak devam eder TaskScheduler SynchronizationContext .

G/ç yaparken, g/ç 'nin gerçekleştirildiği noktada ayrıntılı denetime sahip olmak önemlidir. Bu denetim, CPU önbelleklerinin etkin bir şekilde yararlanmasını sağlar. Verimli önbelleğe alma, Web sunucuları gibi yüksek performanslı uygulamalar için kritik öneme sahiptir. PipeScheduler zaman uyumsuz geri çağırmaların çalıştığı konum üzerinde denetim sağlar. Varsayılan olarak:

  • Geçerli, SynchronizationContext kullanılır.
  • Hayır ise SynchronizationContext , geri çağırmaları çalıştırmak için iş parçacığı havuzunu kullanır.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object> action, object state)
    {
        _queue.Add((action, state));
    }
}

Pipescheduler. ThreadPool , PipeScheduler iş parçacığı havuzuna geri çağırmaları sıraya alan uygulamasıdır. PipeScheduler.ThreadPool Varsayılan ve genellikle en iyi seçenektir. Pipescheduler. Inline , kilitlenme gibi istenmeyen sonuçlara neden olabilir.

Kanal sıfırlama

Nesneyi yeniden kullanmak sıklıkla etkilidir Pipe . Kanalı sıfırlamak için PipeReader Reset hem hem de PipeReader tamamlandığında çağırın PipeWriter .

Piypereader

PipeReader arayan adına belleği yönetir. Çağrıldıktan sonra her zaman çağırın PipeReader.AdvanceTo PipeReader.ReadAsync . Bu, PipeReader çağıranın bellek ile ne zaman yapıldığını, böylece izlenebilmesini sağlar. ReadOnlySequence<byte>Döndürülen öğesinden PipeReader.ReadAsync yalnızca öğesine çağrı yapılıncaya kadar geçerlidir PipeReader.AdvanceTo . Çağrıldıktan sonra kullanım geçersizdir ReadOnlySequence<byte> PipeReader.AdvanceTo .

PipeReader.AdvanceTo iki SequencePosition bağımsız değişken alır:

  • İlk bağımsız değişken, ne kadar bellek tüketildiğini belirler.
  • İkinci bağımsız değişken, arabelleğin ne kadarının gözlemlendiğini belirler.

Verileri tüketilen olarak işaretlemek, kanalın belleği temel alınan arabellek havuzuna döndürebileceği anlamına gelir. Verileri gözlemlenen olarak işaretlemek, sonraki çağrının ne olduğunu denetler PipeReader.ReadAsync . Her şeyi gözlemlenen olarak işaretlemek, bir sonraki çağrının PipeReader.ReadAsync kanala yazılmış daha fazla veri olana kadar dönemeyeceği anlamına gelir. Diğer herhangi bir değer, sonraki çağrının PipeReader.ReadAsync gözlemlenen ve gözlemlenen verilerle hemen döndürülmesini sağlar, ancak daha önce tüketilen verileri değil.

Akış verilerini okuma senaryoları

Akış verilerini okumaya çalışırken ortaya çıktı olan birkaç tipik desen vardır:

  • Veri akışı verildiğinde, tek bir iletiyi ayrıştırır.
  • Veri akışı verildiğinde, tüm kullanılabilir iletileri ayrıştırır.

Aşağıdaki örneklerde, ' TryParseLines dan ileti ayrıştırma yöntemi kullanılmaktadır ReadOnlySequence<byte> . TryParseLines tek bir iletiyi ayrıştırır ve ayrıştırılmış iletiyi arabellekten kırpmak için giriş arabelleğini güncelleştirir. TryParseLines .NET ' in parçası değildir, aşağıdaki bölümlerde kullanılan bir kullanıcı yazılmış yöntemidir.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Tek bir iletiyi okuyun

Aşağıdaki kod, bir öğesinden tek bir ileti okur PipeReader ve bunu çağırana döndürür.

async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Yukarıdaki kod:

  • Tek bir ileti ayrıştırır.
  • Tüketilen SequencePosition ve incelenen, SequencePosition kırpılan giriş arabelleğinin başlangıcını işaret eden şekilde güncelleştirir.

İki SequencePosition bağımsız değişken güncelleştirilir çünkü bu, TryParseLines ayrıştırılmış iletiyi giriş arabelleğinden kaldırır. Genellikle, arabellekteki tek bir ileti ayrıştırılırken, incelenen konum aşağıdakilerden biri olmalıdır:

  • İletinin sonu.
  • İleti bulunmazsa alınan arabelleğin sonu.

Tek ileti durumunun hata için en olası olasılığı vardır. Yanlış değerleri incelenmeye geçirmek, yetersiz bellek özel durumu veya sonsuz bir döngüye neden olabilir. Daha fazla bilgi için bu makaledeki Pıpereader ortak sorunlar bölümüne bakın.

Birden çok ileti okunuyor

Aşağıdaki kod, PipeReader her bir üzerinde ve çağrılarındaki tüm iletileri okur ProcessMessageAsync .

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

İptal

PipeReader.ReadAsync:

  • , Geçişini destekler CancellationToken .
  • Bir okuma beklemede olduğunda, bir iptal edilirse bir oluşturur OperationCanceledException CancellationToken .
  • PipeReader.CancelPendingRead, Bir özel durumun çıkarılmasını engelleyen geçerli okuma işlemini iptal etmek için bir yol destekler. Çağırma PipeReader.CancelPendingRead , geçerli veya sonraki çağrısının ' PipeReader.ReadAsync a ayarlanmış olarak dönüşmesine neden olur ReadResult IsCanceled true . Bu, var olan okuma döngüsünü bozucu olmayan ve olağanüstü olmayan bir şekilde sonlandırmanız için yararlı olabilir.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Piypereader yaygın sorunlar

  • Yanlış değerleri veya ' a geçirmek, consumed examined zaten okuma verilerinin okunmasına neden olabilir.

  • buffer.Endİncelenen şekilde geçirme şu şekilde olabilir:

    • Durdurulmuş veriler
    • Veriler tüketilmemişse, büyük olasılıkla yetersiz bellek (OOM) özel durumu. Örneğin, PipeReader.AdvanceTo(position, buffer.End) arabellekteki bir anda tek bir ileti işlenirken.
  • Yanlış değerleri ' a geçirme consumed veya examined sonsuz bir döngüye neden olabilir. Örneğin, PipeReader.AdvanceTo(buffer.Start) buffer.Start henüz değiştirilmediyse, PipeReader.ReadAsync Yeni veri ulaşmadan hemen önce geri dönmesi için bir sonraki çağrıya neden olur.

  • Yanlış değerleri ' a geçirme consumed veya examined sonsuz arabelleğe alma (NIHAI OOM) ile sonuçlanabilir.

  • Çağrıldıktan ReadOnlySequence<byte> sonra kullanılması, PipeReader.AdvanceTo bellek bozulmasına yol açabilir (ücretsiz olarak kullanabilirsiniz).

  • Çağrı başarısız PipeReader.Complete/CompleteAsync olabilir ve bellek sızıntısına yol açabilir.

  • ReadResult.IsCompletedVeri kaybına neden olan arabellek sonuçlarını işlemeden önce okuma mantığı denetleniyor ve çıkılıyor. Döngü çıkış koşulunun ve tabanlı olması gerekir ReadResult.Buffer.IsEmpty ReadResult.IsCompleted . Bunu yanlış yapmak sonsuz bir döngüye neden olabilir.

Sorunlu kod

Veri kaybı

, ReadResult Olarak ayarlandığında verilerin son segmentini döndürebilir IsCompleted true . Okuma döngüsünden çıkmadan önce bu verilerin okunmamasından dolayı veri kaybı olur.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.

Sonsuz döngü

Aşağıdaki mantık,, Result.IsCompleted true ancak arabellekte hiç tamamlanmamış bir ileti varsa sonsuz döngüye neden olabilir.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.

Aynı sorunu içeren başka bir kod parçası aşağıda verilmiştir. Denetlemeden önce boş olmayan bir arabellek denetleniyor ReadResult.IsCompleted . Bir içinde olduğu için else if , arabellekte hiç bir ileti yoksa sonsuza kadar döngüye alınır.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.

Yanıt vermeyen uygulama

Konumda Koşullu çağrı PipeReader.AdvanceTo buffer.End , examined tek bir ileti ayrıştırılırken uygulamanın yanıt vermemesine neden olabilir. Sonraki çağrısı şu PipeReader.AdvanceTo kadar geri dönemeyecek:

  • Kanala yazılan daha fazla veri var.
  • Ve yeni veriler daha önce incelendi.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.

Yetersiz bellek (OOM)

Aşağıdaki koşullara göre aşağıdaki kod arabelleğe alma işlemi gerçekleşene kadar devam eder OutOfMemoryException :

  • En büyük ileti boyutu yok.
  • Öğesinden döndürülen veriler, PipeReader bir ileti yapmaz. Örneğin, diğer taraf büyük bir ileti (örneğin, 4 GB bir ileti) yazıldığı için, bu tam bir ileti yapmaz.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.

Bellek bozulması

Arabelleği okuyan yardımcıları yazarken, çağrılmadan önce döndürülen tüm yükün kopyalanması gerekir Advance . Aşağıdaki örnek, atılan belleği döndürür ve bir Pipe sonraki işlem (okuma/yazma) için onu tekrar kullanabilir.

Uyarı

Aşağıdaki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Aşağıdaki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanmıştır.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Uyarı

Önceki kodu KULLANMAYIN. Bu örneğin kullanılması veri kaybına, askıda kalmasına, güvenlik sorunlarına neden olur ve KOPYALANMALIDIR. Önceki örnek PipeReader Ortak sorunlarıaçıklamak için sağlanır.

PipeWriter

PipeWriterArayan adına yazmak için arabellekleri yönetir. PipeWriter uygular IBufferWriter<byte> . IBufferWriter<byte> ek arabellek kopyaları olmadan yazma işlemleri gerçekleştirmek için arabelleklere erişim sağlamak mümkün hale gelir.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Önceki kod:

  • Kullanarak en az 5 baytlık bir arabellek ister PipeWriter GetMemory .
  • ASCII dizesinin bayt sayısını "Hello" döndürülen öğesine yazar Memory<byte> .
  • AdvanceArabelleğe kaç bayt yazıldığını belirten çağrılar.
  • , PipeWriter Baytları temeldeki cihaza gönderen öğesini temizler.

Önceki yazma yöntemi, tarafından sağlanmış arabellekleri kullanır PipeWriter . Ayrıca, bu da kullanılabilir PipeWriter.WriteAsync :

  • Varolan arabelleği öğesine kopyalar PipeWriter .
  • GetSpan Advance Uygun ve çağrılar için çağrılar FlushAsync .
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

İptal

FlushAsync , geçişini destekler CancellationToken . Bekleyen bir CancellationToken OperationCanceledException Temizleme işlemi varken belirteç iptal edilirse bir sonuçları bir ile geçirme. PipeWriter.FlushAsync , özel durum oluşturmadan geçerli temizleme işlemini iptal etmenin bir yolunu destekler PipeWriter.CancelPendingFlush . Çağırma PipeWriter.CancelPendingFlush , geçerli veya sonraki çağrıya, PipeWriter.FlushAsync veya olarak PipeWriter.WriteAsync ayarlanmış olarak dönüşmesine neden olur FlushResult IsCanceled true . Bu, bozucu olmayan ve olağanüstü olmayan bir şekilde boşaltmayı Temizleme için yararlı olabilir.

PipeWriter ortak sorunları

  • GetSpan ve GetMemory en az istenen bellek miktarına sahip bir arabellek döndürür. Tam arabellek boyutlarını kabul etmeyin .
  • Art arda yapılan çağrıların aynı arabelleğe veya aynı boyutlu arabelleğe Döneceğinin garantisi yoktur.
  • AdvanceDaha fazla veri yazmaya devam etmek için çağrıldıktan sonra yeni bir arabellek istenmesi gerekir. Daha önce edinilen arabelleğin üzerine yazılamaz.
  • GetMemory GetSpan Eksik bir çağrı olduğu için veya çağrısı sırasında FlushAsync güvenli değildir.
  • Complete CompleteAsync Verilerin temizlenme sırasında veya boşaltılmasından sonra bellek bozulmasına yol açabilir.

IDuplexPipe

, IDuplexPipe Hem okumayı hem de yazmayı destekleyen türler için bir sözleşmedir. Örneğin, bir ağ bağlantısı bir tarafından temsil edilir IDuplexPipe .

PipeVe içeren öğelerinden farklı olarak PipeReader PipeWriter , IDuplexPipe tam çift yönlü bağlantının tek tarafını temsil eder. Bu, üzerine yazılan ve ' PipeWriter den okunmayacağı anlamına gelir PipeReader .

Akışlar

Akış verilerini okurken veya yazarken genellikle seri hale getirici kullanarak verileri okur ve serileştirici kullanarak verileri yazın. Bu okuma ve yazma akışı API 'Lerinin çoğu bir Stream parametreye sahiptir. Bu var olan API 'lerle tümleştirmeyi kolaylaştırmak PipeReader ve PipeWriter bir yöntemi ortaya çıkarmak için AsStream . AsStreamStreamveya etrafında bir uygulama döndürür PipeReader PipeWriter .

Akış örneği

PipeReader ve PipeWriter örnekleri, bir nesne ve isteğe Create bağlı olarak karşılık gelen oluşturma seçenekleri verilen statik yöntemler kullanılarak Stream oluşturulabilir.

StreamPipeReaderOptionsAşağıdaki parametrelerle örneğin oluşturulması üzerinde PipeReader denetime olanak sağlar:

  • StreamPipeReaderOptions.BufferSize , havuzdan bellek kiralarken kullanılan bayt cinsinden en düşük arabellek boyutudur ve varsayılan olarak değerine 4096 sahiptir.
  • StreamPipeReaderOptions.LeaveOpen bayrağı, temel alınan akışın tamamlandıktan sonra açık bırak isteyip olmadığını belirler ve varsayılan PipeReader olarak olarak false kullanılır.
  • StreamPipeReaderOptions.MinimumReadSize , yeni bir arabellek ayrılmadan önce arabellekte kalan bayt eşiğini temsil eder ve varsayılan olarak olarak 1024 kullanılır.
  • StreamPipeReaderOptions.Pool , MemoryPool<byte> bellek için kullanılırken kullanılır ve varsayılan olarak null kullanılır.

StreamPipeWriterOptionsAşağıdaki parametrelerle örneğin oluşturulması üzerinde PipeWriter denetime olanak sağlar:

Önemli

yöntemlerini PipeReader kullanarak PipeWriter ve örnekleri oluştururken nesne ömrünü göz önünde Create Stream bulundurabilirsiniz. Okuyucu veya yazıcı ile işlemi tamamladikten sonra akışa erişmeniz gerekirse oluşturma seçenekleri için LeaveOpen true bayrağını olarak ayarlayabilirsiniz. Aksi takdirde akış kapatılır.

Aşağıdaki kod, bir akıştan PipeReader yöntemler PipeWriter kullanılarak ve Create örneklerinin oluşturulmasını gösteriyor.

using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Uygulama, bir akış olaraklorem-ipsum.txtiçin bir kullanır ve boş StreamReader bir satırla bitebilir. FileStream, nesnesine PipeReader.Create geçirildi. PipeReader Konsol uygulaması daha sonra kullanarak standart çıkış akışını 'a PipeWriter.Create Console.OpenStandardOutput() iletir. Örnek iptali destekler.