我需要将数据缓冲区逐个缓冲区写入来自不同线程的文件。为了避免锁定,我正在写入不同的文件,比如'file_1','file_2',最后将所有这些文件合并到'file'。这种方法好吗?有没有更好的建议?
有些文件非常大,包含数千个缓冲区。因此,创建了数千个临时文件,然后合并和清理。
我需要将数据缓冲区逐个缓冲区写入来自不同线程的文件。为了避免锁定,我正在写入不同的文件,比如'file_1','file_2',最后将所有这些文件合并到'file'。这种方法好吗?有没有更好的建议?
有些文件非常大,包含数千个缓冲区。因此,创建了数千个临时文件,然后合并和清理。
我的直觉是按摩文件会很昂贵,管理数千个文件听起来很复杂且容易出错。
不如让一个专门的线程来做写作。其他线程只是将它们的消息添加到等待写入的队列中。虽然会有一些同步开销,但在锁中所做的实际工作非常少,只需将“指针”复制到消息到队列中即可。由于打开文件并写入文件可能比使用互斥锁更昂贵,因此您实际上可能会提高性能。
这是一个示例方法(没有错误处理!),展示了如何使用 aBlockingCollection
来管理缓冲区队列以写入文件。
这个想法是你创建一个ParallelFileWriter
然后在所有想要写入文件的线程中使用它。完成后,只需释放它(但请确保在所有线程都完成写入之前不要释放它!)。
这只是一个让您入门的简单示例 - 您需要添加参数检查和错误处理:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
public sealed class ParallelFileWriter: IDisposable
{
// maxQueueSize is the maximum number of buffers you want in the queue at once.
// If this value is reached, any threads calling Write() will block until there's
// room in the queue.
public ParallelFileWriter(string filename, int maxQueueSize)
{
_stream = new FileStream(filename, FileMode.Create);
_queue = new BlockingCollection<byte[]>(maxQueueSize);
_writerTask = Task.Run(() => writerTask());
}
public void Write(byte[] data)
{
_queue.Add(data);
}
public void Dispose()
{
_queue.CompleteAdding();
_writerTask.Wait();
_stream.Close();
}
private void writerTask()
{
foreach (var data in _queue.GetConsumingEnumerable())
{
Debug.WriteLine("Queue size = {0}", _queue.Count);
_stream.Write(data, 0, data.Length);
}
}
private readonly Task _writerTask;
private readonly BlockingCollection<byte[]> _queue;
private readonly FileStream _stream;
}
class Program
{
private void run()
{
// For demo purposes, cancel after a couple of seconds.
using (var fileWriter = new ParallelFileWriter(@"C:\TEST\TEST.DATA", 100))
using (var cancellationSource = new CancellationTokenSource(2000))
{
const int NUM_THREADS = 8;
Action[] actions = new Action[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; ++i)
{
int id = i;
actions[i] = () => writer(cancellationSource.Token, fileWriter, id);
}
Parallel.Invoke(actions);
}
}
private void writer(CancellationToken cancellation, ParallelFileWriter fileWriter, int id)
{
int index = 0;
while (!cancellation.IsCancellationRequested)
{
string text = string.Format("{0}:{1}\n", id, index++);
byte[] data = Encoding.UTF8.GetBytes(text);
fileWriter.Write(data);
}
}
static void Main(string[] args)
{
new Program().run();
}
}
}