如果您使用 DataFlow ActionBlock,这在 .NET 4.5 中会容易得多。ActionBlock 接受和缓冲传入的消息,并使用一个或多个任务异步处理它们。
你可以这样写:
public static async Task ProcessFile(string sourceFileName,string targetFileName)
{
//Pass the target stream as part of the message to avoid globals
var block = new ActionBlock<Tuple<string, FileStream>>(async tuple =>
{
var line = tuple.Item1;
var stream = tuple.Item2;
await stream.WriteAsync(Encoding.UTF8.GetBytes(line), 0, line.Length);
});
//Post lines to block
using (var targetStream = new FileStream(targetFileName, FileMode.Append,
FileAccess.Write, FileShare.Write, 16392))
{
using (var sourceStream = File.OpenRead(sourceFileName))
{
await PostLines(sourceStream, targetStream, block);
}
//Tell the block we are done
block.Complete();
//And wait fo it to finish
await block.Completion;
}
}
private static async Task PostLines(FileStream sourceStream, FileStream targetStream,
ActionBlock<Tuple<string, FileStream>> block)
{
using (var reader = new StreamReader(sourceStream))
{
while (true)
{
var line = await reader.ReadLineAsync();
if (line == null)
break;
var tuple = Tuple.Create(line, targetStream);
block.Post(tuple);
}
}
}
大多数代码处理读取每一行并将其发布到块中。默认情况下,ActionBlock 一次只使用一个任务来处理一条消息,这在这种情况下很好。如果需要并行处理数据,可以使用更多任务。
一旦读取了所有行,我们通过调用通知块Complete
并等待它完成处理await block.Completion
。
一旦块的Completion
任务完成,我们就可以关闭目标流。
DataFlow 库的美妙之处在于您可以将多个块链接在一起,以创建处理步骤的管道。ActionBlock 通常是此类链中的最后一步。该库负责将数据从一个块传递到下一个块,并沿着链传播完成。
例如,一个步骤可以从日志中读取文件,第二个可以使用正则表达式解析它们以查找特定模式(例如错误消息)并将它们传递,第三个可以接收错误消息并将它们写入另一个文件。每个步骤都将在不同的线程上执行,每个步骤都会缓冲中间消息。