异步文件 I/O

同步 I/O 意味着在 I/O 操作完成之前,方法被阻塞,I/O 操作完成后,方法返回其数据。 使用异步 I/O,用户可以调用 BeginRead。 主线程可以继续进行其他工作,稍后,用户将能够处理数据。 另外,多个 I/O 请求可以被同时挂起。

要在此数据可用时得到通知,您可以调用 EndReadEndWrite,传入与您发出的 I/O 请求对应的 IAsyncResult。 您还可以提供回调方法,该回调方法应调用 EndReadEndWrite 以计算读取或写入了多少字节。 当许多 I/O 请求被同时挂起时,异步 I/O 可以提供较好的性能,但通常要求对您的应用程序进行一些重要的调整以使其正常工作。

Stream 类支持对同一个流混合执行同步读写和异步读写,而不论操作系统是否允许这样做。 Stream 将按照同步实现的方式提供异步读写操作的默认实现,并按照异步实现的方式提供同步读写操作的默认实现。

当实现 Stream 的派生类时,需要为同步或异步 ReadWrite 方法提供实现。 尽管允许重写 ReadWrite,并且异步方法(BeginReadEndReadBeginWriteEndWrite)的默认实现可配合同步方法的实现使用,但这样做不能提供最佳性能。 同样,如果您提供了一个异步方法的实现,同步 ReadWrite 方法也将正常工作;但如果您专门实现同步方法,性能通常会更好。 ReadByteWriteByte 的默认实现调用带有一个元素字节数组的同步 ReadWrite 方法。 当从 Stream 派生类时,如果有内部字节缓冲区,强烈建议重写这些方法以访问内部缓冲区,这样性能将得到提高。

连接到后备存储器的流重写同步或异步 ReadWrite 方法,以获取默认情况下另一种方法的功能。 如果流不支持异步或同步操作,实施者只需让适当的方法引发异常即可。

下面的示例是一个假设的批量图像处理器的异步实现,其后是同步实现的示例。 本代码用于在目录中的每个文件上执行耗费 CPU 资源的操作。 有关更多信息,请参见主题异步编程设计模式

Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions
Imports Microsoft.Win32.SafeHandles


Module BulkImageProcAsync
    Dim ImageBaseName As String = "tmpImage-"
    Dim numImages As Integer = 200
    Dim numPixels As Integer = 512 * 512

    ' ProcessImage has a simple O(N) loop, and you can vary the number
    ' of times you repeat that loop to make the application more CPU-
    ' bound or more IO-bound.
    Dim processImageRepeats As Integer = 20

    ' Threads must decrement NumImagesToFinish, and protect
    ' their access to it through a mutex.
    Dim NumImagesToFinish As Integer = numImages
    Dim NumImagesMutex(-1) As [Object]
    ' WaitObject is signalled when all image processing is done.
    Dim WaitObject(-1) As [Object]

    Structure ImageStateObject
        Public pixels() As Byte
        Public imageNum As Integer
        Public fs As FileStream
    End Structure


    <SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
    Sub MakeImageFiles()
        Dim sides As Integer = Fix(Math.Sqrt(numPixels))
        Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
        Dim pixels(numPixels) As Byte
        Dim i As Integer
        For i = 0 To numPixels
            pixels(i) = 255
        Next i
        Dim fs As FileStream
        For i = 0 To numImages
            fs = New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
            fs.Write(pixels, 0, pixels.Length)
            FlushFileBuffers(fs.SafeFileHandle)
            fs.Close()
        Next i
        fs = Nothing
        Console.WriteLine("Done.")

    End Sub


    Sub ReadInImageCallback(ByVal asyncResult As IAsyncResult)
        Dim state As ImageStateObject = CType(asyncResult.AsyncState, ImageStateObject)
        Dim stream As Stream = state.fs
        Dim bytesRead As Integer = stream.EndRead(asyncResult)
        If bytesRead <> numPixels Then
            Throw New Exception(String.Format("In ReadInImageCallback, got the wrong number of " + "bytes from the image: {0}.", bytesRead))
        End If
        ProcessImage(state.pixels, state.imageNum)
        stream.Close()

        ' Now write out the image.  
        ' Using asynchronous I/O here appears not to be best practice.
        ' It ends up swamping the threadpool, because the threadpool
        ' threads are blocked on I/O requests that were just queued to
        ' the threadpool. 
        Dim fs As New FileStream(ImageBaseName + state.imageNum.ToString() + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4096, False)
        fs.Write(state.pixels, 0, numPixels)
        fs.Close()

        ' This application model uses too much memory.
        ' Releasing memory as soon as possible is a good idea, 
        ' especially global state.
        state.pixels = Nothing
        fs = Nothing
        ' Record that an image is finished now.
        SyncLock NumImagesMutex
            NumImagesToFinish -= 1
            If NumImagesToFinish = 0 Then
                Monitor.Enter(WaitObject)
                Monitor.Pulse(WaitObject)
                Monitor.Exit(WaitObject)
            End If
        End SyncLock

    End Sub


    Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
        Console.WriteLine("ProcessImage {0}", imageNum)
        Dim y As Integer
        ' Perform some CPU-intensive operation on the image.
        Dim x As Integer
        For x = 0 To processImageRepeats
            For y = 0 To numPixels
                pixels(y) = 1
            Next y
        Next x
        Console.WriteLine("ProcessImage {0} done.", imageNum)

    End Sub


    Sub ProcessImagesInBulk()
        Console.WriteLine("Processing images...  ")
        Dim t0 As Long = Environment.TickCount
        NumImagesToFinish = numImages
        Dim readImageCallback As New AsyncCallback(AddressOf ReadInImageCallback)
        Dim i As Integer
        For i = 0 To numImages
            Dim state As New ImageStateObject()
            state.pixels = New Byte(numPixels) {}
            state.imageNum = i
            ' Very large items are read only once, so you can make the 
            ' buffer on the FileStream very small to save memory.
            Dim fs As New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 1, True)
            state.fs = fs
            fs.BeginRead(state.pixels, 0, numPixels, readImageCallback, state)
        Next i

        ' Determine whether all images are done being processed.  
        ' If not, block until all are finished.
        Dim mustBlock As Boolean = False
        SyncLock NumImagesMutex
            If NumImagesToFinish > 0 Then
                mustBlock = True
            End If
        End SyncLock
        If mustBlock Then
            Console.WriteLine("All worker threads are queued. " + " Blocking until they complete. numLeft: {0}", NumImagesToFinish)
            Monitor.Enter(WaitObject)
            Monitor.Wait(WaitObject)
            Monitor.Exit(WaitObject)
        End If
        Dim t1 As Long = Environment.TickCount
        Console.WriteLine("Total time processing images: {0}ms", t1 - t0)

    End Sub


    Sub Cleanup()
        Dim i As Integer
        For i = 0 To numImages
            File.Delete(ImageBaseName + i.ToString + ".tmp")
            File.Delete(ImageBaseName + i.ToString + ".done")
        Next i

    End Sub


    Sub TryToClearDiskCache()
        ' Try to force all pending writes to disk, and clear the
        ' disk cache of any data.
        Dim bytes(100 * (1 << 20)) As Byte
        Dim i As Integer
        For i = 0 To bytes.Length - 1
            bytes(i) = 0
        Next i
        bytes = Nothing
        GC.Collect()
        Thread.Sleep(2000)

    End Sub


    Sub Main(ByVal args() As String)
        Console.WriteLine("Bulk image processing sample application," + " using asynchronous IO")
        Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images""", numImages)
        Console.WriteLine("(Async FileStream & Threadpool benchmark)")
        Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)

        If args.Length = 1 Then
            processImageRepeats = Int32.Parse(args(0))
            Console.WriteLine("ProcessImage inner loop - {0}.", processImageRepeats)
        End If
        MakeImageFiles()
        TryToClearDiskCache()
        ProcessImagesInBulk()
        Cleanup()

    End Sub

    <DllImport("KERNEL32", SetLastError:=True)> _
    Sub FlushFileBuffers(ByVal handle As SafeFileHandle)
    End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
using Microsoft.Win32.SafeHandles;

public class BulkImageProcAsync
{
    public const String ImageBaseName = "tmpImage-";
    public const int numImages = 200;
    public const int numPixels = 512 * 512;

    // ProcessImage has a simple O(N) loop, and you can vary the number
    // of times you repeat that loop to make the application more CPU-
    // bound or more IO-bound.
    public static int processImageRepeats = 20;

    // Threads must decrement NumImagesToFinish, and protect
    // their access to it through a mutex.
    public static int NumImagesToFinish = numImages;
    public static Object[] NumImagesMutex = new Object[0];
    // WaitObject is signalled when all image processing is done.
    public static Object[] WaitObject = new Object[0];
    public class ImageStateObject
    {
        public byte[] pixels;
        public int imageNum;
        public FileStream fs;
    }

    [SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
    public static void MakeImageFiles()
    {
        int sides = (int)Math.Sqrt(numPixels);
        Console.Write("Making {0} {1}x{1} images... ", numImages,
            sides);
        byte[] pixels = new byte[numPixels];
        int i;
        for (i = 0; i < numPixels; i++)
            pixels[i] = (byte)i;
        FileStream fs;
        for (i = 0; i < numImages; i++)
        {
            fs = new FileStream(ImageBaseName + i + ".tmp",
                FileMode.Create, FileAccess.Write, FileShare.None,
                8192, false);
            fs.Write(pixels, 0, pixels.Length);
            FlushFileBuffers(fs.SafeFileHandle);
            fs.Close();
        }
        fs = null;
        Console.WriteLine("Done.");
    }

    public static void ReadInImageCallback(IAsyncResult asyncResult)
    {
        ImageStateObject state = (ImageStateObject)asyncResult.AsyncState;
        Stream stream = state.fs;
        int bytesRead = stream.EndRead(asyncResult);
        if (bytesRead != numPixels)
            throw new Exception(String.Format
                ("In ReadInImageCallback, got the wrong number of " +
                "bytes from the image: {0}.", bytesRead));
        ProcessImage(state.pixels, state.imageNum);
        stream.Close();

        // Now write out the image.  
        // Using asynchronous I/O here appears not to be best practice.
        // It ends up swamping the threadpool, because the threadpool
        // threads are blocked on I/O requests that were just queued to
        // the threadpool. 
        FileStream fs = new FileStream(ImageBaseName + state.imageNum +
            ".done", FileMode.Create, FileAccess.Write, FileShare.None,
            4096, false);
        fs.Write(state.pixels, 0, numPixels);
        fs.Close();

        // This application model uses too much memory.
        // Releasing memory as soon as possible is a good idea, 
        // especially global state.
        state.pixels = null;
        fs = null;
        // Record that an image is finished now.
        lock (NumImagesMutex)
        {
            NumImagesToFinish--;
            if (NumImagesToFinish == 0)
            {
                Monitor.Enter(WaitObject);
                Monitor.Pulse(WaitObject);
                Monitor.Exit(WaitObject);
            }
        }
    }

    public static void ProcessImage(byte[] pixels, int imageNum)
    {
        Console.WriteLine("ProcessImage {0}", imageNum);
        int y;
        // Perform some CPU-intensive operation on the image.
        for (int x = 0; x < processImageRepeats; x += 1)
            for (y = 0; y < numPixels; y += 1)
                pixels[y] += 1;
        Console.WriteLine("ProcessImage {0} done.", imageNum);
    }

    public static void ProcessImagesInBulk()
    {
        Console.WriteLine("Processing images...  ");
        long t0 = Environment.TickCount;
        NumImagesToFinish = numImages;
        AsyncCallback readImageCallback = new
            AsyncCallback(ReadInImageCallback);
        for (int i = 0; i < numImages; i++)
        {
            ImageStateObject state = new ImageStateObject();
            state.pixels = new byte[numPixels];
            state.imageNum = i;
            // Very large items are read only once, so you can make the 
            // buffer on the FileStream very small to save memory.
            FileStream fs = new FileStream(ImageBaseName + i + ".tmp",
                FileMode.Open, FileAccess.Read, FileShare.Read, 1, true);
            state.fs = fs;
            fs.BeginRead(state.pixels, 0, numPixels, readImageCallback,
                state);
        }

        // Determine whether all images are done being processed.  
        // If not, block until all are finished.
        bool mustBlock = false;
        lock (NumImagesMutex)
        {
            if (NumImagesToFinish > 0)
                mustBlock = true;
        }
        if (mustBlock)
        {
            Console.WriteLine("All worker threads are queued. " +
                " Blocking until they complete. numLeft: {0}",
                NumImagesToFinish);
            Monitor.Enter(WaitObject);
            Monitor.Wait(WaitObject);
            Monitor.Exit(WaitObject);
        }
        long t1 = Environment.TickCount;
        Console.WriteLine("Total time processing images: {0}ms",
            (t1 - t0));
    }

    public static void Cleanup()
    {
        for (int i = 0; i < numImages; i++)
        {
            File.Delete(ImageBaseName + i + ".tmp");
            File.Delete(ImageBaseName + i + ".done");
        }
    }

    public static void TryToClearDiskCache()
    {
        // Try to force all pending writes to disk, and clear the
        // disk cache of any data.
        byte[] bytes = new byte[100 * (1 << 20)];
        for (int i = 0; i < bytes.Length; i++)
            bytes[i] = 0;
        bytes = null;
        GC.Collect();
        Thread.Sleep(2000);
    }

    public static void Main(String[] args)
    {
        Console.WriteLine("Bulk image processing sample application," +
            " using asynchronous IO");
        Console.WriteLine("Simulates applying a simple " +
            "transformation to {0} \"images\"", numImages);
        Console.WriteLine("(Async FileStream & Threadpool benchmark)");
        Console.WriteLine("Warning - this test requires {0} " +
            "bytes of temporary space", (numPixels * numImages * 2));

        if (args.Length == 1)
        {
            processImageRepeats = Int32.Parse(args[0]);
            Console.WriteLine("ProcessImage inner loop - {0}.",
                processImageRepeats);
        }
        MakeImageFiles();
        TryToClearDiskCache();
        ProcessImagesInBulk();
        Cleanup();
    }
    [DllImport("KERNEL32", SetLastError = true)]
    private static extern void FlushFileBuffers(SafeFileHandle handle);
}

以下是同一假设的同步示例。

Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions
Imports Microsoft.Win32.SafeHandles


Module BulkImageProcSync
    Dim ImageBaseName As String = "tmpImage-"
    Dim numImages As Integer = 200
    Dim numPixels As Integer = 512 * 512

    ' ProcessImage has a simple O(N) loop, and you can vary the number
    ' of times you repeat that loop to make the application more CPU-
    ' bound or more IO-bound.
    Dim processImageRepeats As Integer = 20

    <SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
    Sub MakeImageFiles()
        Dim sides As Integer = Fix(Math.Sqrt(numPixels))
        Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
        Dim pixels(numPixels) As Byte
        Dim i As Integer
        For i = 0 To numPixels
            pixels(i) = 255
        Next i
        Dim fs As FileStream
        For i = 0 To numImages
            fs = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
            fs.Write(pixels, 0, pixels.Length)
            FlushFileBuffers(fs.SafeFileHandle)
            fs.Close()
        Next i
        fs = Nothing
        Console.WriteLine("Done.")

    End Sub


    Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
        Console.WriteLine("ProcessImage {0}", imageNum)
        Dim y As Integer
        ' Perform some CPU-intensive operation on the image.
        Dim x As Integer
        For x = 0 To processImageRepeats
            For y = 0 To numPixels
                pixels(y) = 1
            Next y
        Next x
        Console.WriteLine("ProcessImage {0} done.", imageNum)

    End Sub


    Sub ProcessImagesInBulk()
        Console.WriteLine("Processing images... ")
        Dim t0 As Long = Environment.TickCount
        Dim pixels(numPixels) As Byte
        Dim input As FileStream
        Dim output As FileStream
        Dim i As Integer
        For i = 0 To numImages
            input = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 4196, False)
            input.Read(pixels, 0, numPixels)
            input.Close()
            ProcessImage(pixels, i)
            output = New FileStream(ImageBaseName + i.ToString + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4196, False)
            output.Write(pixels, 0, numPixels)
            output.Close()
        Next i
        input = Nothing
        output = Nothing
        Dim t1 As Long = Environment.TickCount
        Console.WriteLine("Total time processing images: {0}ms", t1 - t0)

    End Sub


    Sub Cleanup()
        Dim i As Integer
        For i = 0 To numImages
            File.Delete(ImageBaseName + i.ToString + ".tmp")
            File.Delete(ImageBaseName + i.ToString + ".done")
        Next i

    End Sub


    Sub TryToClearDiskCache()
        Dim bytes(100 * (1 << 20)) As Byte
        Dim i As Integer
        For i = 0 To bytes.Length - 1
            bytes(i) = 0
        Next i
        bytes = Nothing
        GC.Collect()
        Thread.Sleep(2000)

    End Sub


    Sub Main(ByVal args() As String)
        Console.WriteLine("Bulk image processing sample application," + " using synchronous I/O.")
        Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images.""", numImages)
        Console.WriteLine("(ie, Sync FileStream benchmark).")
        Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)

        If args.Length = 1 Then
            processImageRepeats = Int32.Parse(args(0))
            Console.WriteLine("ProcessImage inner loop  {0}", processImageRepeats)
        End If

        MakeImageFiles()
        TryToClearDiskCache()
        ProcessImagesInBulk()
        Cleanup()

    End Sub


    <DllImport("KERNEL32", SetLastError:=True)> _
    Sub FlushFileBuffers(ByVal handle As SafeFileHandle)
    End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
using Microsoft.Win32.SafeHandles;

public class BulkImageProcSync
{
    public const String ImageBaseName = "tmpImage-";
    public const int numImages = 200;
    public const int numPixels = 512 * 512;

    // ProcessImage has a simple O(N) loop, and you can vary the number
    // of times you repeat that loop to make the application more CPU-
    // bound or more IO-bound.
    public static int processImageRepeats = 20;

    [SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
    public static void MakeImageFiles()
    {
        int sides = (int)Math.Sqrt(numPixels);
        Console.Write("Making {0} {1}x{1} images... ", numImages,
            sides);
        byte[] pixels = new byte[numPixels];
        int i;
        for (i = 0; i < numPixels; i++)
            pixels[i] = (byte)i;
        FileStream fs;
        for (i = 0; i < numImages; i++)
        {
            fs = new FileStream(ImageBaseName + i + ".tmp",
                FileMode.Create, FileAccess.Write, FileShare.None,
                8192, false);
            fs.Write(pixels, 0, pixels.Length);
            FlushFileBuffers(fs.SafeFileHandle);
            fs.Close();
        }
        fs = null;
        Console.WriteLine("Done.");
    }

    public static void ProcessImage(byte[] pixels, int imageNum)
    {
        Console.WriteLine("ProcessImage {0}", imageNum);
        int y;
        // Perform some CPU-intensive operation on the image.
        for (int x = 0; x < processImageRepeats; x += 1)
            for (y = 0; y < numPixels; y += 1)
                pixels[y] += 1;
        Console.WriteLine("ProcessImage {0} done.", imageNum);
    }

    public static void ProcessImagesInBulk()
    {
        Console.WriteLine("Processing images... ");
        long t0 = Environment.TickCount;
        byte[] pixels = new byte[numPixels];
        FileStream input;
        FileStream output;
        for (int i = 0; i < numImages; i++)
        {
            input = new FileStream(ImageBaseName + i + ".tmp",
                FileMode.Open, FileAccess.Read, FileShare.Read,
                4196, false);
            input.Read(pixels, 0, numPixels);
            input.Close();
            ProcessImage(pixels, i);
            output = new FileStream(ImageBaseName + i + ".done",
                FileMode.Create, FileAccess.Write, FileShare.None,
                4196, false);
            output.Write(pixels, 0, numPixels);
            output.Close();
        }
        input = null;
        output = null;
        long t1 = Environment.TickCount;
        Console.WriteLine("Total time processing images: {0}ms",
            (t1 - t0));
    }

    public static void Cleanup()
    {
        for (int i = 0; i < numImages; i++)
        {
            File.Delete(ImageBaseName + i + ".tmp");
            File.Delete(ImageBaseName + i + ".done");
        }
    }

    public static void TryToClearDiskCache()
    {
        byte[] bytes = new byte[100 * (1 << 20)];
        for (int i = 0; i < bytes.Length; i++)
            bytes[i] = 0;
        bytes = null;
        GC.Collect();
        Thread.Sleep(2000);
    }

    public static void Main(String[] args)
    {
        Console.WriteLine("Bulk image processing sample application," +
            " using synchronous I/O.");
        Console.WriteLine("Simulates applying a simple " +
            "transformation to {0} \"images.\"", numImages);
        Console.WriteLine("(ie, Sync FileStream benchmark).");
        Console.WriteLine("Warning - this test requires {0} " +
            "bytes of temporary space", (numPixels * numImages * 2));

        if (args.Length == 1)
        {
            processImageRepeats = Int32.Parse(args[0]);
            Console.WriteLine("ProcessImage inner loop � {0}",
                processImageRepeats);
        }

        MakeImageFiles();
        TryToClearDiskCache();
        ProcessImagesInBulk();
        Cleanup();
    }

    [DllImport("KERNEL32", SetLastError = true)]
    private static extern void FlushFileBuffers(SafeFileHandle handle);
}

请参见

参考

Stream

Stream.Read

Stream.Write

Stream.BeginRead

Stream.BeginWrite

Stream.EndRead

Stream.EndWrite

IAsyncResult

Mutex

其他资源

文件和流 I/O