3

我需要将数据缓冲区逐个缓冲区写入来自不同线程的文件。为了避免锁定,我正在写入不同的文件,比如'file_1','file_2',最后将所有这些文件合并到'file'。这种方法好吗?有没有更好的建议?

有些文件非常大,包含数千个缓冲区。因此,创建了数千个临时文件,然后合并和清理。

4

2 回答 2

9

我的直觉是按摩文件会很昂贵,管理数千个文件听起来很复杂且容易出错。

不如让一个专门的线程来做写作。其他线程只是将它们的消息添加到等待写入的队列中。虽然会有一些同步开销,但在锁中所做的实际工作非常少,只需将“指针”复制到消息到队列中即可。由于打开文件并写入文件可能比使用互斥锁更昂贵,因此您实际上可能会提高性能。

于 2013-11-11T09:08:03.243 回答
5

这是一个示例方法(没有错误处理!),展示了如何使用 aBlockingCollection来管理缓冲区队列以写入文件。

这个想法是你创建一个ParallelFileWriter然后在所有想要写入文件的线程中使用它。完成后,只需释放它(但请确保在所有线程都完成写入之前不要释放它!)。

这只是一个让您入门的简单示例 - 您需要添加参数检查和错误处理:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class ParallelFileWriter: IDisposable
    {
        // maxQueueSize is the maximum number of buffers you want in the queue at once.
        // If this value is reached, any threads calling Write() will block until there's
        // room in the queue.

        public ParallelFileWriter(string filename, int maxQueueSize)
        {
            _stream     = new FileStream(filename, FileMode.Create);
            _queue      = new BlockingCollection<byte[]>(maxQueueSize);
            _writerTask = Task.Run(() => writerTask());
        }

        public void Write(byte[] data)
        {
            _queue.Add(data);
        }

        public void Dispose()
        {            
            _queue.CompleteAdding();
            _writerTask.Wait();
            _stream.Close();
        }

        private void writerTask()
        {
            foreach (var data in _queue.GetConsumingEnumerable())
            {
                Debug.WriteLine("Queue size = {0}", _queue.Count);
                _stream.Write(data, 0, data.Length);
            }
        }

        private readonly Task _writerTask;
        private readonly BlockingCollection<byte[]> _queue;
        private readonly FileStream _stream;
    }

    class Program
    {
        private void run()
        {
            // For demo purposes, cancel after a couple of seconds.

            using (var fileWriter = new ParallelFileWriter(@"C:\TEST\TEST.DATA", 100))
            using (var cancellationSource = new CancellationTokenSource(2000))
            {
                const int NUM_THREADS = 8;
                Action[] actions = new Action[NUM_THREADS];

                for (int i = 0; i < NUM_THREADS; ++i)
                {
                    int id = i;
                    actions[i] = () => writer(cancellationSource.Token, fileWriter, id);
                }

                Parallel.Invoke(actions);
            }
        }

        private void writer(CancellationToken cancellation, ParallelFileWriter fileWriter, int id)
        {
            int index = 0;

            while (!cancellation.IsCancellationRequested)
            {
                string text = string.Format("{0}:{1}\n", id, index++);
                byte[] data = Encoding.UTF8.GetBytes(text);
                fileWriter.Write(data);
            }
        }

        static void Main(string[] args)
        {
            new Program().run();
        }
    }
}
于 2013-11-11T09:26:37.333 回答