System. IO. Pipelines v .NET

System.IO.Pipelines je nová knihovna, která je navržena tak, aby usnadnila vysoce výkonné vstupně-výstupní operace v rozhraní .NET. Jedná se o knihovnu, která cílí na .NET Standard, která funguje na všech implementacích rozhraní .NET.

Jaký problém řeší System. IO. Pipelines

Aplikace, které analyzují streamovaná data, se skládají ze standardizovaného kódu, který má mnoho toků specializovaného a neobvyklého kódu Často používaný kód a zvláštní případ je složitý a obtížně udržovatelný.

System.IO.Pipelines bylo navrženo pro:

  • Analýza dat streamování je vysoce výkonná.
  • Snižte složitost kódu.

Následující kód je typický pro server TCP, který přijímá zprávy oddělené řádky (oddělený od '\n' ) od klienta:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

Předchozí kód má několik problémů:

  • V jednom volání metody nemusí být přijata celá zpráva (konce řádku) ReadAsync .
  • Ignoruje výsledek stream.ReadAsync . stream.ReadAsync Vrátí, kolik dat bylo přečteno.
  • Nezpracovává případ, kdy je více řádků čteno v jednom ReadAsync volání.
  • Přiděluje byte pole s každým čtením.

Chcete-li opravit předchozí problémy, jsou vyžadovány následující změny:

  • Ukládat příchozí data do vyrovnávací paměti, dokud se nenajde nový řádek.

  • Analyzuje všechny řádky vrácené ve vyrovnávací paměti.

  • Je možné, že je řádek větší než 1 KB (1024 bajtů). Kód musí změnit velikost vstupní vyrovnávací paměti, dokud není nalezen oddělovač, aby odpovídal celému řádku uvnitř vyrovnávací paměti.

    • Pokud se změní velikost vyrovnávací paměti, ve vstupu se zobrazí další kopie vyrovnávací paměti, ve kterých se objeví delší řádky.
    • Chcete-li snížit množství nevyužitého místa, Zkomprimujte vyrovnávací paměť použitou pro čtení řádků.
  • Zvažte použití sdružování vyrovnávací paměti, abyste se vyhnuli opakovanému přidělování paměti.

  • Následující kód řeší některé z těchto problémů:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Předchozí kód je složitý a neřeší všechny zjištěné problémy. Vysoce výkonné sítě obvykle znamenají psaní velmi složitého kódu pro maximalizaci výkonu. System.IO.Pipelines bylo navrženo tak, aby byl tento typ kódu snazší.

Pokud chcete zobrazit komentáře kódu přeložené do jiných jazyků než angličtiny, dejte nám v tomto problému diskuze na GitHubuinformace.

Příkazem

PipeTřídu lze použít k vytvoření PipeWriter/PipeReader páru. Všechna data zapsaná do nástroje PipeWriter jsou k dispozici v PipeReader :

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Základní použití kanálu

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Existují dvě smyčky:

  • FillPipeAsync čte z Socket a zápisy do PipeWriter .
  • ReadPipeAsync čte z PipeReader a analyzuje příchozí řádky.

Nejsou přiděleny žádné explicitní vyrovnávací paměti. Veškerá správa vyrovnávací paměti je delegována na PipeReader PipeWriter implementace a. Delegování správy vyrovnávací paměti usnadňuje využívání kódu pro zaměření pouze na obchodní logiku.

V první smyčce:

Ve druhé smyčce PipeReader využívá vyrovnávací paměti zapsané PipeWriter . Vyrovnávací paměti přicházejí ze soketu. Volání na PipeReader.ReadAsync :

  • Vrátí ReadResult , který obsahuje dvě důležité informace:

    • Data, která byla načtena ve formě ReadOnlySequence<byte> .
    • Logická hodnota IsCompleted , která označuje, zda bylo dosaženo konce dat (EOF).

Po nalezení oddělovače na konci řádku (konce řádku) a při analýze řádku:

  • Logika zpracuje vyrovnávací paměť a přeskočí, co je již zpracováno.
  • PipeReader.AdvanceTo je volána, aby informovala, PipeReader kolik dat bylo spotřebováno a zkontrolováno.

Smyčka čtečky a zapisovače končí voláním Complete . Complete umožňuje původnímu kanálu uvolnit přidělenou paměť.

Řízení zatížení a tok

V ideálním případě se jedná o společné čtení a analýzu:

  • Vlákno zápisu spotřebovává data ze sítě a umístí je do vyrovnávacích pamětí.
  • Podproces analýzy zodpovídá za vytváření vhodných datových struktur.

Analýza obvykle trvá více času, než stačí kopírovat bloky dat ze sítě:

  • Vlákno čtení získá před vláknem analýzy.
  • Vlákno čtení musí buď zpomalit, nebo přidělit více paměti pro uložení dat pro vlákno analýzy.

Pro zajištění optimálního výkonu existuje zůstatek mezi častými pozastavením a přidělením více paměti.

Chcete-li vyřešit předchozí problém, Pipe má dvě nastavení pro řízení toku dat:

  • PauseWriterThreshold: Určuje, kolik dat by mělo být uloženo do vyrovnávací paměti před voláními pro FlushAsync pozastavení.
  • ResumeWriterThreshold: Určuje, kolik dat musí čtenář sledovat před PipeWriter.FlushAsync pokračováním volání.

Diagram s ResumeWriterThreshold a PauseWriterThreshold

PipeWriter.FlushAsync:

  • Vrátí nedokončenou ValueTask<FlushResult> , pokud je množství dat v Pipe křížení PauseWriterThreshold .
  • Dokončí, ValueTask<FlushResult> když bude menší než ResumeWriterThreshold .

Pomocí dvou hodnot se vyhnete rychlému cyklování, ke kterému může dojít, když se použije jedna hodnota.

Příklady

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Obvykle při použití async a await se asynchronní kód obnovuje buď na, TaskScheduler nebo v aktuální SynchronizationContext .

Při provádění vstupně-výstupních operací je důležité mít detailní kontrolu nad tím, kde se vstupně-výstupní operace provádí. Tento ovládací prvek umožňuje efektivní využití mezipamětí CPU. Efektivní ukládání do mezipaměti je klíčové pro vysoce výkonné aplikace, jako jsou webové servery. PipeScheduler poskytuje kontrolu nad tím, kde jsou spouštěna asynchronní zpětná volání. Ve výchozím nastavení:

  • Používá se aktuální SynchronizationContext .
  • Pokud je k dispozici SynchronizationContext , používá fond vláken ke spouštění zpětných volání.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object> action, object state)
    {
        _queue.Add((action, state));
    }
}

PipeScheduler. Apartment je PipeScheduler implementace, která zařadí zpětná volání do fondu vláken. PipeScheduler.ThreadPool je výchozí a obecně nejlepší volbou. PipeScheduler. inline může způsobit nezamýšlené důsledky, jako je zablokování.

Resetování kanálu

Je často efektivní použít Pipe objekt. Chcete-li obnovit kanál, zavolejte, PipeReader Reset Když PipeReader PipeWriter jsou i dokončeny.

PipeReader

PipeReader spravuje paměť jménem volajícího. Always PipeReader.AdvanceTo Po volání vždy volejte PipeReader.ReadAsync . To vám umožní PipeReader zjistit, kdy se volající provede s pamětí, aby se mohl sledovat. ReadOnlySequence<byte>Vrácená z PipeReader.ReadAsync je platná pouze do volání metody PipeReader.AdvanceTo . Použití po volání není povoleno ReadOnlySequence<byte> PipeReader.AdvanceTo .

PipeReader.AdvanceTo přebírá dva SequencePosition argumenty:

  • První argument určuje, kolik paměti bylo spotřebováno.
  • Druhý argument určuje, kolik paměti bylo pozorováno.

Označení dat jako spotřebované znamená, že kanál může vrátit paměť do podkladového fondu vyrovnávací paměti. Označení dat jako pozorovaných řídí, co dělá další volání PipeReader.ReadAsync . Označení všeho jako pozorovaného znamená, že další volání PipeReader.ReadAsync nebude vráceno, dokud nebudou do kanálu zapsána další data. Jakákoli jiná hodnota provede další volání PipeReader.ReadAsync ihned se zjištěnými a nepozorovanými daty, ale ne data, která již byla spotřebována.

Čtení scénářů streamování dat

Při pokusu o čtení dat streamování je k dispozici několik typických vzorů:

  • Pokud má datový proud data, analyzujte jednu zprávu.
  • Pokud má datový proud data, analyzujte všechny dostupné zprávy.

Následující příklady používají TryParseMessage metodu pro analýzu zpráv z ReadOnlySequence<byte> . TryParseMessage analyzuje jednu zprávu a aktualizuje vstupní vyrovnávací paměť pro oříznutí analyzované zprávy z vyrovnávací paměti. TryParseMessage není součástí .NET, je metoda psaná uživatelem, která se používá v následujících oddílech.

bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out Message message);

Číst jednu zprávu

Následující kód přečte jednu zprávu z PipeReader a a vrátí ji volajícímu.

async ValueTask<Message> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseMessage(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseMessage trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Předcházející kód:

  • Analyzuje jednu zprávu.
  • Aktualizuje spotřebované SequencePosition a zkoumané, SequencePosition aby odkazoval na začátek oříznuté vstupní vyrovnávací paměti.

Tyto dva SequencePosition argumenty jsou aktualizovány, protože TryParseMessage odebere analyzovanou zprávu ze vstupní vyrovnávací paměti. Obecně platí, že při analýze jedné zprávy z vyrovnávací paměti by měla být prověřená poloha jedna z následujících:

  • Konec zprávy
  • Konec přijaté vyrovnávací paměti, pokud nebyla nalezena žádná zpráva.

Jeden případ zprávy má nejvíce potenciál pro chyby. Předání špatných hodnot do prověření může mít za následek výjimku z důvodu nedostatku paměti nebo nekonečnou smyčku. Další informace najdete v části běžné problémy PipeReader v tomto článku.

Čtení více zpráv

Následující kód přečte všechny zprávy z PipeReader volání a a ProcessMessageAsync každý z nich.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseMessage(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Zrušení

PipeReader.ReadAsync:

  • Podporuje předávání CancellationToken .
  • Vyvolá výjimku OperationCanceledException , pokud CancellationToken je zrušena, když dojde k nedokončenému čtení.
  • Podporuje způsob, jak zrušit aktuální operaci čtení prostřednictvím PipeReader.CancelPendingRead , která zabraňuje vyvolání výjimky. Volání PipeReader.CancelPendingRead způsobí, že aktuální nebo další volání PipeReader.ReadAsync vrátí hodnotu ReadResult s IsCanceled nastavenou na true . To může být užitečné pro zastavení stávající smyčky čtení v nedestruktivním a nevýjimečném způsobu.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseMessage(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader běžné problémy

  • Předání nesprávných hodnot do consumed nebo examined může mít za následek čtení dat již přečtených.

  • buffer.EndPři předávání jako prověření může dojít k následujícím akcím:

    • Data zastavena
    • Možnou výjimku z paměti (OOM), pokud nejsou data spotřebována. Například PipeReader.AdvanceTo(position, buffer.End) při zpracování jedné zprávy v čase z vyrovnávací paměti.
  • Předání špatných hodnot do consumed nebo examined může způsobit nekonečnou smyčku. Například pokud se PipeReader.AdvanceTo(buffer.Start) buffer.Start nezměnila, způsobí to, že se další volání PipeReader.ReadAsync vrátí hned před přijetím nových dat.

  • Předání špatných hodnot do consumed nebo examined může mít za následek nekonečné ukládání do vyrovnávací paměti (OOM).

  • Použití ReadOnlySequence<byte> metody po volání PipeReader.AdvanceTo může mít za následek poškození paměti (použijte po uvolnění).

  • Selhání volání PipeReader.Complete/CompleteAsync může způsobit nevracení paměti.

  • Kontrola ReadResult.IsCompleted a ukončení logiky čtení před zpracováním vyrovnávací paměti má za následek ztrátu dat. Stav ukončení smyčky musí být založen na ReadResult.Buffer.IsEmpty a ReadResult.IsCompleted . Tato nesprávně by mohla způsobit nekonečnou smyčku.

Problematický kód

Ztráta dat

ReadResultMůže vrátit konečný segment dat, když IsCompleted je nastavena na true . Tato data nebudou čtena před ukončením smyčky pro čtení. výsledkem bude ztráta dat.

Upozornění

Nepoužívejte následující kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Následující ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
    {
        break;
    }

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Předchozí ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Nekonečná smyčka

Následující logika může mít za následek nekonečnou smyčku, pokud Result.IsCompleted není, true ale ve vyrovnávací paměti nikdy není úplná zpráva.

Upozornění

Nepoužívejte následující kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Následující ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
    {
        break;
    }

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Předchozí ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Tady je další část kódu se stejným problémem. Před kontrolou se kontroluje, jestli není prázdná vyrovnávací paměť ReadResult.IsCompleted . Vzhledem k tomu, že je v else if , bude tato smyčka nepřetržitě v případě, že ve vyrovnávací paměti stále není úplná zpráva.

Upozornění

Nepoužívejte následující kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Následující ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
    {
        Process(ref infiniteLoopBuffer, out Message message);
    }
    else if (result.IsCompleted)
    {
        break;
    }

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Předchozí ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Neočekávané zablokování

Nepodmíněné volání PipeReader.AdvanceTo s buffer.End na examined pozici může vést k zablokování při analýze jedné zprávy. Následující volání PipeReader.AdvanceTo nebude vráceno do:

  • Do kanálu se zapisují další data.
  • A nová data se předtím nezkoumala.

Upozornění

Nepoužívejte následující kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Následující ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
    {
        break;
    }

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
    {
        return message;
    }
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Předchozí ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Nedostatek paměti (OOM)

V následujících podmínkách uchovává následující kód ukládání do vyrovnávací paměti, dokud OutOfMemoryException nedojde k:

  • Není k dispozici žádná maximální velikost zprávy.
  • Data vrácená z PipeReader nevytvářejí úplnou zprávu. Například neprovádí úplnou zprávu, protože druhá strana zapisuje velkou zprávu (například zpráva o velikosti 4 GB).

Upozornění

Nepoužívejte následující kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Následující ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
    {
        break;
    }

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
    {
        return message;
    }
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Předchozí ukázka je k dispozici vysvětlit PipeReader běžné problémy.

Poškození paměti

Při psaní pomocníků, které čtou vyrovnávací paměť, je nutné před voláním zkopírovat všechny vrácené datové části Advance . Následující příklad vrátí paměť, která Pipe byla zahozena a může ji znovu použít pro další operaci (čtení/zápis).

Upozornění

Nepoužívejte následující kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Následující ukázka je k dispozici vysvětlit PipeReader běžné problémy.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
        {
            break;
        }

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Upozornění

Nepoužívejte předchozí kód. Použití této ukázky bude mít za následek ztrátu dat, zablokování, problémy se zabezpečením a neměly by být zkopírovány. Předchozí ukázka je k dispozici vysvětlit PipeReader běžné problémy.

PipeWriter

PipeWriterSpravuje vyrovnávací paměti pro psaní jménem volajícího. PipeWriter implementuje IBufferWriter<byte> . IBufferWriter<byte> umožňuje získat přístup k vyrovnávací paměti, aby bylo možné provádět zápisy bez dalších kopií vyrovnávací paměti.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Předchozí kód:

  • Vyžádá vyrovnávací paměť o velikosti alespoň 5 bajtů z s PipeWriter použitím GetMemory .
  • Zapíše bajty pro řetězec ASCII "Hello" na vrácenou Memory<byte> .
  • Volání Advance indikující, kolik bajtů bylo zapsáno do vyrovnávací paměti.
  • Vyprázdní PipeWriter , čímž pošle bajty na základní zařízení.

Předchozí metoda zápisu používá vyrovnávací paměti, které poskytuje PipeWriter . Alternativně PipeWriter.WriteAsync :

  • Zkopíruje existující vyrovnávací paměť do PipeWriter .
  • Volání GetSpan Advance podle potřeby a volání FlushAsync .
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Zrušení

FlushAsync podporuje předávání CancellationToken . Předání CancellationToken výsledků v OperationCanceledException případě zrušení tokenu v době, kdy dojde k vyprázdnění. PipeWriter.FlushAsync podporuje způsob, jak zrušit aktuální operaci vyprázdnění prostřednictvím PipeWriter.CancelPendingFlush bez vyvolání výjimky. Volání PipeWriter.CancelPendingFlush způsobí, že aktuální nebo další volání PipeWriter.FlushAsync nebo PipeWriter.WriteAsync vrátí hodnotu FlushResult s IsCanceled nastavenou na true . To může být užitečné, pokud chcete zablokovat vyprázdnit vyřazení v nedestruktivním a nevýjimečném způsobu.

PipeWriter běžné problémy

  • GetSpan a GetMemory vrátí vyrovnávací paměť s minimální požadovanou velikostí paměti. Neberete přesnou velikost vyrovnávací paměti.
  • Není nijak zaručeno, že po sobě jdoucí volání budou vracet stejnou vyrovnávací paměť nebo vyrovnávací paměť se stejnou velikostí.
  • Po volání musí být vyžádána nová vyrovnávací paměť, aby bylo možné Advance pokračovat v zápisu více dat. Dřív získaná vyrovnávací paměť se nedá zapsat do.
  • Volání GetMemory nebo GetSpan i v případě, že je neúplné volání FlushAsync není bezpečné.
  • Volání Complete nebo CompleteAsync i v případě nevyprázdněných dat může mít za následek poškození paměti.

IDuplexPipe

IDuplexPipeJe kontrakt pro typy, které podporují čtení i zápis. Například síťové připojení by představovalo IDuplexPipe .

Na rozdíl od Pipe , který obsahuje PipeReader a a PipeWriter , IDuplexPipe představuje jednu stranu úplného duplexního připojení. To znamená, že obsah, který se zapisuje do, nebude PipeWriter číst z PipeReader .

Streamy

Při čtení nebo zápisu dat datového proudu obvykle čtete data pomocí rušení serializátoru a zapisují data pomocí serializátoru. Většina těchto rozhraní API pro čtení a zápis streamování má Stream parametr. Aby bylo snazší je integrovat s těmito stávajícími rozhraními API, PipeReader a PipeWriter vystavit AsStream metodu. AsStream vrací Stream implementaci kolem PipeReader nebo PipeWriter .

Příklad streamu

PipeReader``PipeWriterinstance a lze vytvořit pomocí statických Create metod daného Stream objektu a volitelných odpovídajících možností vytváření.

StreamPipeReaderOptionsUmožňuje řídit vytvoření PipeReader instance s následujícími parametry:

StreamPipeWriterOptionsUmožňuje řídit vytvoření PipeWriter instance s následujícími parametry:

Důležité

Při vytváření PipeReader PipeWriter instancí a instance pomocí Create metod je nutné vzít v úvahu Stream dobu života objektu. Pokud potřebujete přístup ke streamu i po tom, co se čtenář nebo zapisovač provede, budete muset nastavit LeaveOpen příznak true na možnosti vytváření. V opačném případě bude datový proud zavřen.

Následující kód demonstruje vytvoření PipeReader a PipeWriter instance pomocí Create metod z datového proudu.

using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseMessage(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseMessage(
        ref ReadOnlySequence<byte> buffer,
        out string message) => 
        (message = Encoding.ASCII.GetString(buffer)) != null;

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Aplikace používá StreamReader ke čtení lorem-ipsum.txt souboru jako datového proudu. FileStreamJe předán do PipeReader.Create , který vytvoří instanci PipeReader objektu. Aplikace konzoly pak předá svůj standardní výstupní datový proud PipeWriter.Create pomocí Console.OpenStandardOutput() . V příkladu je podporována zrušení.