0

我有一个 tcp 监听器,它监听和写入来自服务器的数据。我用 aBlockingCollection来存储数据。这里我不知道文件什么时候结束。所以,我的文件流总是打开的。

我的部分代码是:

private static BlockingCollection<string> Buffer = new   BlockingCollection<string>();

Process()

{
 var consumer = Task.Factory.StartNew(() =>WriteData());
 while()

 {
  string request = await reader.ReadLineAsync();
  Buffer.Add(request);
 }
} 

WriteData()
{
  FileStream fStream = new FileStream(filename,FileMode.Append,FileAccess.Write,FileShare.Write, 16392);

 foreach(var val in Buffer.GetConsumingEnumerable(token))
 {

 fStream.Write(Encoding.UTF8.GetBytes(val), 0, val.Length);
                            fStream.Flush();
 }

}

问题是我不能在循环中处理文件流,否则我必须为每一行创建文件流,循环可能永远不会结束。

4

1 回答 1

0

如果您使用 DataFlow ActionBlock,这在 .NET 4.5 中会容易得多。ActionBlock 接受和缓冲传入的消息,并使用一个或多个任务异步处理它们。

你可以这样写:

public static async Task ProcessFile(string sourceFileName,string targetFileName)
{
    //Pass the target stream as part of the message to avoid globals
    var block = new ActionBlock<Tuple<string, FileStream>>(async tuple =>
    {
        var line = tuple.Item1;
        var stream = tuple.Item2;
        await stream.WriteAsync(Encoding.UTF8.GetBytes(line), 0, line.Length);
    });


    //Post lines to block
    using (var targetStream = new FileStream(targetFileName, FileMode.Append, 
                                   FileAccess.Write, FileShare.Write, 16392))
    {
        using (var sourceStream = File.OpenRead(sourceFileName))
        {
            await PostLines(sourceStream, targetStream, block);
        }
        //Tell the block we are done
        block.Complete();
        //And wait fo it to finish
        await block.Completion;
    }

}

private static async Task PostLines(FileStream sourceStream, FileStream targetStream, 
                                    ActionBlock<Tuple<string, FileStream>> block)
{
    using (var reader = new StreamReader(sourceStream))
    {
        while (true)
        {
            var line = await reader.ReadLineAsync();
            if (line == null)
                break;
            var tuple = Tuple.Create(line, targetStream);
            block.Post(tuple);
        }
    }
}

大多数代码处理读取每一行并将其发布到块中。默认情况下,ActionBlock 一次只使用一个任务来处理一条消息,这在这种情况下很好。如果需要并行处理数据,可以使用更多任务。

一旦读取了所有行,我们通过调用通知块Complete并等待它完成处理await block.Completion

一旦块的Completion任务完成,我们就可以关闭目标流。

DataFlow 库的美妙之处在于您可以将多个块链接在一起,以创建处理步骤的管道。ActionBlock 通常是此类链中的最后一步。该库负责将数据从一个块传递到下一个块,并沿着链传播完成。

例如,一个步骤可以从日志中读取文件,第二个可以使用正则表达式解析它们以查找特定模式(例如错误消息)并将它们传递,第三个可以接收错误消息并将它们写入另一个文件。每个步骤都将在不同的线程上执行,每个步骤都会缓冲中间消息。

于 2015-04-23T15:47:28.207 回答