4

问题

我需要能够使用 C# 保存和读取一个非常大的数据结构。结构本身相当简单;它是一个非常长的数组,包含一个恒定大小的简单结构。

只是为了清楚起见的例子:

struct st {
UInt32 a;
UInt16 b;
//etc.
} completion ports

st[] data = new st[1024*1024*100]

我希望能够尽可能快速高效地保存这些文件并将其加载到文件中。

总体方向

到目前为止,我的想法是将数据切割成段,当然从概念上讲,将这些段分配给任务,然后将它们异步写入文件。FileStream.WriteAsync 似乎非常适合这一点。

我的问题在于阅读,从 FileStream.ReadAsync API 看来,可以在每个结构的中间切割结果似乎是完全合理的,实际上是一个原语的一半。当然我可以解决这个问题,但我不确定最好的方法是什么,以及我会在多大程度上干扰操作系统的缓冲机制。

最终,我计划从每个缓冲区创建一个 MemoryStream,MemoryStream.MemoryStream(byte[])并使用二进制读取器将每个缓冲区读入结构。

问题

那么解决这个问题的最佳方法是什么?我的方向好吗?有没有更好的解决方案?代码示例和链接将不胜感激......

结论

在进行性能测试后,我发现使用 BinaryReader 读取文件或使用 FileStream.ReadAsync 使用多个读取器可以提供大致相同的性能。

苏……这个问题毫无意义。

4

3 回答 3

3

[编辑] 我更新了这篇文章以包含完整的可编译示例,并解决@Daniel 在下面的评论中提出的问题。因此,此代码不再使用任何“危险”方法,也没有代码分析警告。[/编辑]

如果您的结构仅包含blittable类型,则有一种方法可以加快速度。

您可以使用封送处理将数据直接读取到数组中,而无需制作额外的副本,如下所示(完整的可编译示例):

using System;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Microsoft.Win32.SafeHandles;

namespace ConsoleApplication1
{
    internal class Program
    {
        struct TestStruct // Mutable for brevity; real structs should be immutable.
        {
            public byte   ByteValue;
            public short  ShortValue;
            public int    IntValue;
            public long   LongValue;
            public float  FloatValue;
            public double DoubleValue;
        }

        static void Main()
        {
            var array = new TestStruct[10];

            for (byte i = 0; i < array.Length; ++i)
            {
                array[i].ByteValue   = i;
                array[i].ShortValue  = i;
                array[i].IntValue    = i;
                array[i].LongValue   = i;
                array[i].FloatValue  = i;
                array[i].DoubleValue = i;
            }

            Directory.CreateDirectory("C:\\TEST");

            using (var output = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Create))
                FastWrite(output, array, 0, array.Length);

            using (var input = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Open))
                array = FastRead<TestStruct>(input, array.Length);

            for (byte i = 0; i < array.Length; ++i)
            {
                Trace.Assert(array[i].ByteValue   == i);
                Trace.Assert(array[i].ShortValue  == i);
                Trace.Assert(array[i].IntValue    == i);
                Trace.Assert(array[i].LongValue   == i);
                Trace.Assert(array[i].FloatValue  == i);
                Trace.Assert(array[i].DoubleValue == i);
            }
        }

        /// <summary>
        /// Writes a part of an array to a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream to which to write.</param>
        /// <param name="array">The array containing the data to write.</param>
        /// <param name="offset">The offset of the start of the data in the array to write.</param>
        /// <param name="count">The number of array elements to write.</param>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static void FastWrite<T>(FileStream fs, T[] array, int offset, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));
            GCHandle gcHandle = GCHandle.Alloc(array, GCHandleType.Pinned);

            try
            {
                uint bytesWritten;
                uint bytesToWrite = (uint)(count * sizeOfT);

                if
                (
                    !WriteFile
                    (
                        fs.SafeFileHandle,
                        new IntPtr(gcHandle.AddrOfPinnedObject().ToInt64() + (offset*sizeOfT)),
                        bytesToWrite,
                        out bytesWritten,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to write file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesWritten == bytesToWrite);
            }

            finally
            {
                gcHandle.Free();
            }
        }

        /// <summary>
        /// Reads array data from a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream from which to read.</param>
        /// <param name="count">The number of elements to read.</param>
        /// <returns>
        /// The array of elements that was read. This may be less than the number that was
        /// requested if the end of the file was reached. It may even be empty.
        /// NOTE: There may still be data left in the file, even if not all the requested
        /// elements were returned - this happens if the number of bytes remaining in the
        /// file is less than the size of the array elements.
        /// </returns>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static T[] FastRead<T>(FileStream fs, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));

            long bytesRemaining  = fs.Length - fs.Position;
            long wantedBytes     = count * sizeOfT;
            long bytesAvailable  = Math.Min(bytesRemaining, wantedBytes);
            long availableValues = bytesAvailable / sizeOfT;
            long bytesToRead     = (availableValues * sizeOfT);

            if ((bytesRemaining < wantedBytes) && ((bytesRemaining - bytesToRead) > 0))
            {
                Debug.WriteLine("Requested data exceeds available data and partial data remains in the file.", "Dmr.Common.IO.Arrays.FastRead(fs,count)");
            }

            T[] result = new T[availableValues];

            if (availableValues == 0)
                return result;

            GCHandle gcHandle = GCHandle.Alloc(result, GCHandleType.Pinned);

            try
            {
                uint bytesRead;

                if
                (
                    !ReadFile
                    (
                        fs.SafeFileHandle,
                        gcHandle.AddrOfPinnedObject(),
                        (uint)bytesToRead,
                        out bytesRead,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to read file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesRead == bytesToRead);
            }

            finally
            {
                gcHandle.Free();
            }

            return result;
        }

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool WriteFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToWrite,
            out uint             lpNumberOfBytesWritten,
            IntPtr               lpOverlapped
        );

        /// <summary>See the Windows API documentation for details.</summary>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool ReadFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToRead,
            out uint             lpNumberOfBytesRead,
            IntPtr               lpOverlapped
        );
    }
}

然后,您可以创建一个BlockingCollection来存储传入数据并使用一个线程来填充它,并使用一个单独的线程来使用它。

将数据读入队列的线程可能如下所示:

public void ReadIntoQueue<T>(FileStream fs, BlockingCollection<T[]> queue, int blockSize) where T: struct
{
    while (true)
    {
        var data = FastRead<T>(fs, blockSize);

        if (data.Length == 0)
        {
            queue.CompleteAdding();
            break;
        }

        queue.Add(data);
    }
}

消费线程会像这样从队列中删除东西:

public void ProcessDataFromQueue<T>(BlockingCollection<T[]> queue) where T : struct
{
    foreach (var array in queue.GetConsumingEnumerable())
    {
        // Do something with 'array'
    }
}
于 2013-04-26T09:42:01.670 回答
3

您最大的瓶颈将是 IO,它必须以对文件的独占访问权限执行。实际的字节处理将很快 - 您将直接将其写入文件(注意它FileStream本身有一个缓冲区,或者您可以添加一个额外的层BufferedStream)而不是序列化不同的部分在内存中,然后将每个内存部分分别复制到流中。

我的建议:只需在一个线程中写入数据。坦率地说,我不确定我什至会费心async(提示:异步代码会增加开销),尤其是在缓冲区跟上的情况下。我也不会使用BiaryWriter/ BinaryReader- 我只是把它写成原始的。您可以做的一个棘手的事情是使用一些unsafe代码以块的形式复制数据,以避免甚至不得不查看单个对象,但这是更难的事情......我会尝试做一个例子。

这是一个读/写的例子,首先注意性能:

Write: 2012ms
Read: 1089ms
File: 838,860,804 bytes

代码:

[DllImport("msvcrt.dll", EntryPoint = "memcpy", CallingConvention = CallingConvention.Cdecl, SetLastError = false)]
public static extern IntPtr memcpy(IntPtr dest, IntPtr src, UIntPtr count);

unsafe static st[] Read(string path)
{
    using (var file = File.OpenRead(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (byte* bufferPtr = buffer)
        {
            Fill(file, buffer, 0, 4);
            int len = ((int*)bufferPtr)[0];

            st[] result = new st[len];
            fixed (st* dataPtr = result)
            {
                st* rawPtr = dataPtr;
                IntPtr source= new IntPtr(bufferPtr);
                while (len >= BLOCK_SIZE)
                {
                    Fill(file, buffer, 0, buffer.Length);
                    memcpy(new IntPtr(rawPtr), source, bufferLen);
                    len -= BLOCK_SIZE;
                    rawPtr += BLOCK_SIZE;
                }
                if (len > 0)
                {
                    Fill(file, buffer, 0, len * size);
                    memcpy(new IntPtr(rawPtr), source, new UIntPtr((uint)(len * size)));
                }
            }
            return result;
        }
    }


}
static void Fill(Stream source, byte[] buffer, int offset, int count)
{
    int read;
    while (count > 0 && (read = source.Read(buffer, offset, count)) > 0)
    {
        offset += read;
        count -= read;
    }
    if (count > 0) throw new EndOfStreamException();
}

unsafe static void Write(st[] data, string path)
{
    using (var file = File.Create(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        int len = data.Length;
        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (st* dataPtr = data)
        fixed (byte* bufferPtr = buffer)
        {
            // write the number of elements
            ((int*)bufferPtr)[0] = data.Length;
            file.Write(buffer, 0, 4);

            st* rawPtr = dataPtr;
            IntPtr destination = new IntPtr(bufferPtr);
            // write complete blocks of BLOCK_SIZE
            while (len >= BLOCK_SIZE)
            {
                memcpy(destination, new IntPtr(rawPtr), bufferLen);
                len -= BLOCK_SIZE;
                rawPtr += BLOCK_SIZE;
                file.Write(buffer, 0, buffer.Length);
            }
            if (len > 0)
            {   // write an incomplete block, if necessary
                memcpy(destination, new IntPtr(rawPtr), new UIntPtr((uint)(len * size)));
                file.Write(buffer, 0, len * size);
            }
        }
    }
}
于 2013-04-26T09:34:24.540 回答
1

据我所知,读取或写入文件的最快方法是单个只进进程。否则,除了强制读/写之外,磁盘还必须在文件中来回移动。

当然这并不意味着你不能在多个并发线程中处理数据。

如果这些段足够大,则磁盘移动的开销可能不会很明显。

于 2013-04-26T09:34:48.057 回答