您没有在任何代码中利用任何异步 I/O API。您所做的一切都受 CPU 限制,所有 I/O 操作都会浪费 CPU 资源阻塞。AsParallel
用于计算绑定任务,如果您想利用异步 I/O,您需要在 <= v4.0 中利用基于异步编程模型 (APM) 的 API。这是通过在BeginXXX/EndXXX
您正在使用的基于 I/O 的类上查找方法并在可用时利用这些方法来完成的。
为初学者阅读这篇文章:TPL TaskFactory.FromAsync vs Tasks with blocking methods
接下来,无论如何您都不想AsParallel
在这种情况下使用。AsParallel
启用流式传输,这将导致立即为每个项目安排一个新任务,但您在这里不需要/不想要那个。使用Parallel::ForEach
.
让我们看看如何在您的特定情况下使用这些知识来实现最大并发:
var refs = GetReferencesFromDB();
// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
refs,
ref =>
{
string filePath = GetFilePath(ref);
byte[] fileDataBuffer = new byte[1048576];
// Need to use FileStream API directly so we can enable async I/O
FileStream sourceFileStream = new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
8192,
true);
// Use FromAsync to read the data from the file
Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
sourceFileStream.BeginRead
sourceFileStream.EndRead
fileDataBuffer,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async read is completed
readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
{
int soureFileStreamBytesRead;
try
{
// Determine exactly how many bytes were read
// NOTE: this will propagate any potential exception that may have occurred in EndRead
sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
}
finally
{
// Always clean up the source stream
sourceFileStream.Close();
sourceFileStream = null;
}
// This is here to make sure you don't end up trying to read files larger than this sample code can handle
if(sourceFileStreamBytesRead == fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
}
// Convert the file data to a string
string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);
// Parse the HTML
string convertedHtml = ParseHtml(html);
// This is here to make sure you don't end up trying to write files larger than this sample code can handle
if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
}
// Convert the file data back to bytes for writing
Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);
// Need to use FileStream API directly so we can enable async I/O
FileStream destinationFileStream = new FileStream(
destinationFilePath,
FileMode.OpenOrCreate,
FileAccess.Write,
FileShare.None,
8192,
true);
// Use FromAsync to read the data from the file
Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
destinationFileStream.BeginWrite,
destinationFileStream.EndWrite,
fileDataBuffer,
0,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async write is completed
destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
{
try
{
// NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
destinationFileStreamWriteAntecedent.Wait();
}
finally
{
// Always close the destination file stream
destinationFileStream.Close();
destinationFileStream = null;
}
},
TaskContinuationOptions.AttachedToParent);
// Send to external system **concurrent** to writing to destination file system above
SendToWs(ref, convertedHtml);
},
TaskContinuationOptions.AttachedToParent);
});
现在,这里有一些注意事项:
- 这是示例代码,所以我使用 1MB 缓冲区来读取/写入文件。这对于 HTML 文件来说是多余的,并且浪费了系统资源。您可以降低它以满足您的最大需求,或者在 StringBuilder 中实现链式读/写,这是我留给您的练习,因为我将编写大约 500 多行代码来执行异步链式读/写。:P
- 您会注意到,在我的读/写任务的延续中
TaskContinuationOptions.AttachedToParent
。这非常重要,因为它将阻止Parallel::ForEach
启动工作的工作线程完成,直到所有底层异步调用都完成。如果这不在这里,您将同时启动所有 5000 个项目的工作,这将用数千个计划任务污染 TPL 子系统,并且根本无法正常扩展。
- 我调用 SendToWs 并发将文件写入此处的文件共享。我不知道 SendToWs 实现的基础是什么,但它听起来也很适合进行异步。现在假设它是纯粹的计算工作,因此在执行时会消耗 CPU 线程。我将其作为练习留给您,以弄清楚如何最好地利用我向您展示的内容来提高那里的吞吐量。
- 这是所有类型的自由形式,我的大脑是这里唯一的编译器,SO 的语法高亮是我用来确保语法良好的全部。所以,请原谅任何语法错误,如果我把任何事情搞砸了,你无法做出正面或反面,请告诉我,我会跟进。