我正在尝试在具有 RAID-5 中的八个 SSD 的数据流应用程序中获得顶级 I/O 性能(每个 SSD 宣传并提供 500 MB/秒的读取)。
我使用 64KB 缓冲区创建 FileStream 并以阻塞方式读取许多块(双关语不是故意的)。这是我现在拥有的 80GB 的 20K 文件,没有碎片:传统阻塞读取速度为 1270 MB/秒,单线程,1556 MB/秒,6 个线程。
我注意到单线程是单个内核的 CPU 时间花费在内核中(在具有 12 个内核的 Process Explorer 中,8.3% 为红色)。使用 6 个线程,大约 5 倍 CPU 时间花费在内核中(41% 的红色在具有 12 个内核的 Process Explorer 中)。
我真的很想避免 I/O 绑定场景中多线程应用程序的复杂性。
是否有可能在单线程应用程序中实现这些传输速率?也就是说,什么是减少内核模式时间的好方法?
如果有的话,C# 中的新 Async 特性将如何提供帮助?
作为比较,ATTO 磁盘基准测试显示在此硬件上的这些块大小和低 CPU 利用率下为 2500 MB/秒。但是,ATTO 数据集大小仅为 2GB。
使用LSI 9265-8i RAID 控制器,具有 64k 条带大小,64k 集群大小。
这是正在使用的代码的草图。我不会以这种方式编写生产代码,它只是一个概念证明。
volatile bool _somethingLeftToRead = false;
long _totalReadInSize = 0;
void ProcessReadThread(object obj)
{
TestThreadJob job = obj as TestThreadJob;
var dirInfo = new DirectoryInfo(job.InFilePath);
int chunk = job.DataBatchSize * 1024;
//var tile = new List<byte[]>();
var sw = new Stopwatch();
var allFiles = dirInfo.GetFiles();
var fileStreams = new List<FileStream>();
long totalSize = 0;
_totalReadInSize = 0;
foreach (var fileInfo in allFiles)
{
totalSize += fileInfo.Length;
var fileStream = new FileStream(fileInfo.FullName,
FileMode.Open, FileAccess.Read, FileShare.None, job.FileBufferSize * 1024);
fileStreams.Add(fileStream);
}
var partial = new byte[chunk];
var taskParam = new TaskParam(null, partial);
var tasks = new List<Task>();
int numTasks = (int)Math.Ceiling(fileStreams.Count * 1.0 / job.NumThreads);
sw.Start();
do
{
_somethingLeftToRead = false;
for (int taskIndex = 0; taskIndex < numTasks; taskIndex++)
{
if (_threadCanceled)
break;
tasks.Clear();
for (int thread = 0; thread < job.NumThreads; thread++)
{
if (_threadCanceled)
break;
int fileIndex = taskIndex * job.NumThreads + thread;
if (fileIndex >= fileStreams.Count)
break;
var fileStream = fileStreams[fileIndex];
taskParam.File = fileStream;
if (job.NumThreads == 1)
ProcessFileRead(taskParam);
else
tasks.Add(Task.Factory.StartNew(ProcessFileRead, taskParam));
//tile.Add(partial);
}
if (_threadCanceled)
break;
if (job.NumThreads > 1)
Task.WaitAll(tasks.ToArray());
}
//tile = new List<byte[]>();
}
while (_somethingLeftToRead);
sw.Stop();
foreach (var fileStream in fileStreams)
fileStream.Close();
totalSize = (long)Math.Round(totalSize / 1024.0 / 1024.0);
UpdateUIRead(false, totalSize, sw.Elapsed.TotalSeconds);
}
void ProcessFileRead(object taskParam)
{
TaskParam param = taskParam as TaskParam;
int readInSize;
if ((readInSize = param.File.Read(param.Bytes, 0, param.Bytes.Length)) != 0)
{
_somethingLeftToRead = true;
_totalReadInSize += readInSize;
}
}