3

(注意:虽然我想了解 .Net 4.0 的未来,但我仅限于这个项目的 .Net 3.5。)

我有一个线程,它从外部设备异步读取数据(在代码示例中由非常有创意的 strSomeData 模拟 :-) 并将其存储在 StringBuilder '缓冲区' (strBuilderBuffer :-)

在“主代码”中,我想“蚕食”这个“缓冲区”。但是,从“操作”的角度来看,我不确定如何以线程安全的方式执行此操作。我理解从“数据”的角度来看它是安全的,因为根据 msdn,“此 ( StringBuilder) 类型的任何公共静态成员都是线程安全的。不保证任何实例成员都是线程安全的。” 但是,我下面的代码说明从“操作”的角度来看它可能不是线程安全的。

关键是我担心两行代码:

string strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
// Thread 'randomly' slept due to 'inconvenient' comp resource scheduling...
ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;

如果计算机操作系统在缓冲区的“读取”和缓冲区的“清除”之间休眠我的线程,我可能会丢失数据(这很糟糕:-(

有没有办法保证“原子性”?这两行并强制计算机不打断它们?

关于弗拉德关于使用的以下建议lock,我尝试过,但它没有用(真的):

    public void BufferAnalyze()
    {
        String strCurrentBuffer;
        lock (ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer)
        {
            strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
            Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept due to comp resource scheduling");
            Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...
            ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;
        }
        Console.WriteLine("[BufferAnalyze()]\r\nstrCurrentBuffer[{0}] == {1}", strCurrentBuffer.Length.ToString(), strCurrentBuffer);
    }

有没有更好的方法来实现线程安全缓冲区?

这是完整的代码:

namespace ExploringThreads
{
    /// <summary>
    /// Description of BasicThreads_TestThreadSafety_v1a
    /// </summary>
    class ThreadWorker_TestThreadSafety_v1a
    {
        private Thread thread;
        public static StringBuilder strBuilderBuffer = new StringBuilder("", 7500);
        public static StringBuilder strBuilderLog = new StringBuilder("", 7500);

        public bool IsAlive
        {
            get { return thread.IsAlive; }
        }

        public ThreadWorker_TestThreadSafety_v1a(string strThreadName)
        {
            // It is possible to have a thread begin execution as soon as it is created.
            // In the case of MyThread this is done by instantiating a Thread object inside MyThread's constructor.
            thread = new Thread(new ThreadStart(this.threadRunMethod));
            thread.Name = strThreadName;
            thread.Start();
        }

        public ThreadWorker_TestThreadSafety_v1a() : this("")
        {
            //   NOTE:  constructor overloading ^|^
        }

        //Entry point of thread.
        public void threadRunMethod()
        {
            Console.WriteLine("[ThreadWorker_TestThreadSafety_v1a threadRunMethod()]");
            Console.WriteLine(thread.Name + " starting.");
            int intSomeCounter = 0;
            string strSomeData = "";
            do
            {
                Console.WriteLine("[ThreadWorker_TestThreadSafety_v1a threadRunMethod()] running.");
                intSomeCounter++;
                strSomeData = "abcdef" + intSomeCounter.ToString() + "|||";
                strBuilderBuffer.Append(strSomeData);
                strBuilderLog.Append(strSomeData);
                Thread.Sleep(200);
            } while(intSomeCounter < 15);

            Console.WriteLine(thread.Name + " terminating.");
        }
    }
    /// <summary>
    /// Description of BasicThreads_TestThreadSafety_v1a.
    /// </summary>
    public class BasicThreads_TestThreadSafety_v1a
    {
        public BasicThreads_TestThreadSafety_v1a()
        {
        }

        public void BufferAnalyze()
        {
            string strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
            Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept due to comp resource scheduling");
            Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...
            ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Length = 0;
            Console.WriteLine("[BufferAnalyze()]\r\nstrCurrentBuffer[{0}] == {1}", strCurrentBuffer.Length.ToString(), strCurrentBuffer);
        }

        public void TestBasicThreads_TestThreadSafety_v1a()
        {
            Console.Write("Starting TestBasicThreads_TestThreadSafety_v1a  >>>  Press any key to continue . . . ");
            Console.Read();

            // First, construct a MyThread object.
            ThreadWorker_TestThreadSafety_v1a threadWorker_TestThreadSafety_v1a = new ThreadWorker_TestThreadSafety_v1a("threadWorker_TestThreadSafety_v1a Child");

            do
            {
                Console.WriteLine("[TestBasicThreads_TestThreadSafety_v1a()]");
                Thread.Sleep(750);
                BufferAnalyze();
                //} while (ThreadWorker_TestThreadSafety_v1a.thread.IsAlive);
            } while (threadWorker_TestThreadSafety_v1a.IsAlive);
            BufferAnalyze();
            Thread.Sleep(1250);
            Console.WriteLine("[TestBasicThreads_TestThreadSafety_v1a()]");
            Console.WriteLine("ThreadWorker_TestThreadSafety_v1a.strBuilderLog[{0}] == {1}", ThreadWorker_TestThreadSafety_v1a.strBuilderLog.Length.ToString(), ThreadWorker_TestThreadSafety_v1a.strBuilderLog);

            Console.Write("Completed TestBasicThreads_TestThreadSafety_v1a  >>>  Press any key to continue . . . ");
            Console.Read();
        }
    }
}
4

3 回答 3

4

在此处下载 3.5 的Reactive Extensions backport 。还有一个NuGet包。下载后,只需在项目中引用 System.Threading.dll。

现在,您也可以在 .NET 3.5 中使用 .NET 4.0 中的所有新并发集合标准。最适合您的情况是BlockingCollection。它基本上是一个缓冲区,允许线程像普通队列一样将项目入队和出队。除了出队操作会阻塞,直到有项目可用。

现在根本不需要使用这个StringBuilder类。以下是我将如何重构您的代码。我试图让我的例子简短,以便更容易理解。

public class Example
{
  private BlockingCollection<string> buffer = new BlockingCollection<string>();

  public Example()
  {
    new Thread(ReadFromExternalDevice).Start();
    new Thread(BufferAnalyze).Start();
  }

  private void ReadFromExteneralDevice()
  {
    while (true)
    {
      string data = GetFromExternalDevice();
      buffer.Add(data);
      Thread.Sleep(200);
    }
  }

  private void BufferAnalyze()
  {
    while (true)
    {
      string data = buffer.Take(); // This blocks if nothing is in the queue.
      Console.WriteLine(data);
    }
  } 
}

为了将来参考,TPL 数据流库中的BufferBlock<T>类将与. 它将在 .NET 4.5 中可用。BlockingCollection

于 2012-05-08T16:11:52.713 回答
3

使用StringBuffer不是线程安全的,但您可以切换到ConcurrentQueue<char>.

如果您需要其他数据结构,.NET 4 中还有其他线程安全集合,请参阅http://msdn.microsoft.com/en-us/library/dd997305.aspx


编辑:在 .NET 3.5 中,同步原语较少。您可以通过Queue<char>ConcurrentQueue. 或者使用相同的StrignBuffer, 再次进行lock读/写操作:

public static StringBuilder strBuilderBuffer = new StringBuilder("", 7500);
private object BufferLock = new object();

...

lock (BufferLock)
    strBuilderBuffer.Append(strSomeData);

...

string strCurrentBuffer;
lock (BufferLock)
{
    strCurrentBuffer = ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.ToString();
    ThreadWorker_TestThreadSafety_v1a.strBuilderBuffer.Clear();
}
Console.WriteLine("[BufferAnalyze()]  ||<<  Thread 'Randomly' Slept ...");
Thread.Sleep(1000);  //  Simulate poor timing of thread resourcing...

编辑:

你不能保证操作系统不会挂起你持有锁的工作线程。但是,锁保证了只要一个线程正在处理缓冲区,其他线程就无法干预和更改缓冲区。

这就是为什么你持有锁的时间应该尽可能短:

  • 获取锁,添加数据,释放锁,-或-
  • 获取锁,复制数据,清空缓冲区,释放锁,开始处理复制的数据。
于 2012-05-08T15:14:34.580 回答
1

如果您正在从缓冲区中读取大量数据,也许这会有所帮助:

http://msdn.microsoft.com/en-us/library/system.threading.readerwriterlock.aspx

多个读者是可能的,但只有一个作家。

它适用于 .NET 1.X 及更高版本...

于 2012-05-08T15:38:38.537 回答