9

所以这是我上一个问题的延续 - 所以问题是“什么是构建线程安全程序的最佳方法,它需要将双精度值写入文件。如果通过流写入器保存值的函数被多个线程调用?最好的方法是什么?

我修改了一些在 MSDN 上找到的代码,下面的呢?这个正确地将所有内容写入文件。

namespace SafeThread
{
    class Program
    {
        static void Main()
        {
            Threading threader = new Threading();

            AutoResetEvent autoEvent = new AutoResetEvent(false);

            Thread regularThread =
                new Thread(new ThreadStart(threader.ThreadMethod));
            regularThread.Start();

            ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
                autoEvent);

            // Wait for foreground thread to end.
            regularThread.Join();

            // Wait for background thread to end.
            autoEvent.WaitOne();
        }
    }


    class Threading
    {
        List<double> Values = new List<double>();
        static readonly Object locker = new Object();
        StreamWriter writer = new StreamWriter("file");
        static int bulkCount = 0;
        static int bulkSize = 100000;

        public void ThreadMethod()
        {
            lock (locker)
            {
                while (bulkCount < bulkSize)
                    Values.Add(bulkCount++);
            }
            bulkCount = 0;
        }

        public void WorkMethod(object stateInfo)
        {
            lock (locker)
            {
                foreach (double V in Values)
                {
                    writer.WriteLine(V);
                    writer.Flush();
                }
            }
            // Signal that this thread is finished.
            ((AutoResetEvent)stateInfo).Set();
        }
    }
}
4

4 回答 4

15

Thread并且QueueUserWorkItem用于线程的最低可用 API。除非我绝对,最后,别无选择,否则我不会使用它们。尝试Task该类以获得更高级别的抽象。有关详细信息,请参阅我最近关于该主题的博客文章

您还可以将BlockingCollection<double>其用作适当的生产者/消费者队列,而不是尝试使用最低可用的同步 API手动构建。

正确地重新发明这些轮子非常困难。我强烈建议使用为这种类型的需求设计的类(Task并且BlockingCollection,具体来说)。它们内置于 .NET 4.0 框架中,可作为 .NET 3.5 的附加组件使用

于 2010-08-28T17:38:18.643 回答
7
  • 该代码将编写器作为实例变量,但使用静态储物柜。如果您有多个实例写入不同的文件,那么它们没有理由需要共享同一个锁
  • 在相关说明中,由于您已经拥有 writer(作为私有实例 var),因此您可以将其用于锁定,而不是在这种情况下使用单独的 locker 对象——这使事情变得更简单一些。

“正确答案”实际上取决于您在锁定/阻塞行为方面要寻找的内容。例如,最简单的方法是跳过中间数据结构,只使用一个 WriteValues 方法,这样每个线程“报告”其结果就会继续并将它们写入文件。就像是:

StreamWriter writer = new StreamWriter("file");
public void WriteValues(IEnumerable<double> values)
{
    lock (writer)
    {
        foreach (var d in values)
        {
            writer.WriteLine(d);
        }
        writer.Flush();
    }
}

当然,这意味着工作线程在其“报告结果”阶段进行序列化 - 根据性能特征,这可能很好(例如,生成 5 分钟,写入 500 毫秒)。

另一方面,您将让工作线程写入数据结构。如果您使用的是 .NET 4,我建议您只使用ConcurrentQueue而不是自己锁定。

此外,您可能希望以比工作线程报告的更大批量执行文件 i/o,因此您可能会选择仅以某种频率在后台线程中写入。频谱的末端看起来像下面这样(您将在实际代码中删除 Console.WriteLine 调用,这些调用就在那里,因此您可以看到它正在运行)

public class ThreadSafeFileBuffer<T> : IDisposable
{
    private readonly StreamWriter m_writer;
    private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
    private readonly Timer m_timer;

    public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5)
    {
        m_writer = new StreamWriter(filePath);
        var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds);
        m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod);
    }

    public void AddResult(T result)
    {
        m_buffer.Enqueue(result);
        Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
    }

    public void Dispose()
    {
        Console.WriteLine("Turning off timer");
        m_timer.Dispose();
        Console.WriteLine("Flushing final buffer output");
        FlushBuffer(); // flush anything left over in the buffer
        Console.WriteLine("Closing file");
        m_writer.Dispose();
    }

    /// <summary>
    /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
    /// </summary>
    /// <param name="unused"></param>
    private void FlushBuffer(object unused = null)
    {
        T current;
        while (m_buffer.TryDequeue(out current))
        {
            Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
            m_writer.WriteLine(current);
        }
        m_writer.Flush();
    }
}

class Program
{
    static void Main(string[] args)
    {
        var tempFile = Path.GetTempFileName();
        using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
        {
            Parallel.For(0, 100, i =>
            {
                // simulate some 'real work' by waiting for awhile
                var sleepTime = new Random().Next(10000);
                Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
                Thread.Sleep(sleepTime);
                resultsBuffer.AddResult(Math.PI*i);
            });
        }
        foreach (var resultLine in File.ReadAllLines(tempFile))
        {
            Console.WriteLine("Line from result: {0}", resultLine);
        }
    }
}
于 2010-08-28T17:39:43.043 回答
4

所以你是说你想要一堆线程使用 StreamWriter 将数据写入单个文件?简单的。只需锁定 StreamWriter 对象。

这里的代码将创建 5 个线程。每个线程将执行 5 个“动作”,在每个动作结束时,它将向名为“file”的文件写入 5 行。

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace ConsoleApplication1 {
    class Program {
        static void Main() {
            StreamWriter Writer = new StreamWriter("file");

            Action<int> ThreadProcedure = (i) => {
                // A thread may perform many actions and write out the result after each action
                // The outer loop here represents the multiple actions this thread will take
                for (int x = 0; x < 5; x++) {
                    // Here is where the thread would generate the data for this action
                    // Well simulate work time using a call to Sleep
                    Thread.Sleep(1000);
                    // After generating the data the thread needs to lock the Writer before using it.
                    lock (Writer) {
                        // Here we'll write a few lines to the Writer
                        for (int y = 0; y < 5; y++) {
                            Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y);
                        }
                    }
                }
            };

            //Now that we have a delegate for the thread code lets make a few instances

            List<IAsyncResult> AsyncResultList = new List<IAsyncResult>();
            for (int w = 0; w < 5; w++) {
                AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null));
            }

            // Wait for all threads to complete
            foreach (IAsyncResult r in AsyncResultList) {
                r.AsyncWaitHandle.WaitOne();
            }

            // Flush/Close the writer so all data goes to disk
            Writer.Flush();
            Writer.Close();
        }
    }
}

结果应该是一个包含 125 行的文件“文件”,其中所有“动作”同时执行,每个动作的结果同步写入文件。

于 2010-08-28T22:07:56.273 回答
2

您那里的代码被巧妙地破坏了-特别是,如果排队的工作项首先运行,那么它将在终止之前立即刷新(空)值列表,之后您的工作人员会去填充列表(这将最终被忽略)。自动重置事件也不做任何事情,因为没有任何东西查询或等待它的状态。

此外,由于每个线程使用不同的锁,锁没有任何意义!您需要确保在访问 streamwriter 时持有单个共享锁。您不需要在刷新代码和生成代码之间加锁;您只需要确保在生成完成后刷新运行。

不过,您可能走在正确的轨道上-尽管我会使用固定大小的数组而不是列表,并在数组已满时刷新数组中的所有条目。这样可以避免线程长期存在时内存不足的可能性。

于 2010-08-28T15:43:56.847 回答