2

我正在尝试使用生产者消费者模式来处理和保存一些数据。我正在使用 AutoResetEvent 在两个电极之间发出信号,这是我拥有的代码

这是生产者函数

 public Results[] Evaluate()
    {
        processingComplete = false;
        resultQueue.Clear();
        for (int i = 0; i < data.Length; ++i)
            {
                if (saveThread.ThreadState == ThreadState.Unstarted)
                    saveThread.Start();
               //-....
               //Process data 
               // 
                lock (lockobject)
                {
                    resultQueue.Enqueue(result);
                }

                signal.Set();
            }
            processingComplete = true;
        }

这是消费者功能

   private void SaveResults()
    {
        Model dataAccess = new Model();

        while (!processingComplete || resultQueue.Count > 0)
        {
            if (resultQueue.Count == 0)
                signal.WaitOne();
            ModelResults result;
            lock (lockobject)

            {
                result = resultQueue.Dequeue();
            }
            dataAccess.Save(result);
        }
        SaveCompleteSignal.Set();
    }

所以我的问题有时是 resultQueue.Dequeue() 抛出 InvalidOperation 异常,因为队列是空的。我不确定我做错了什么不应该阻止队列为空的 signal.WaitOne() 上面的信号吗?

4

3 回答 3

2

由于缺乏适当的锁定,您遇到了同步问题。

您应该锁定所有队列访问,包括计数检查。

此外,以这种方式使用Thread.ThreadState是一个“坏主意”。来自 ThreadState 的 MSDN 文档:

“线程状态只对调试场景感兴趣。你的代码不应该使用线程状态来同步线程的活动。”

您不能将其作为处理同步的手段。您应该重新设计以确保线程将在使用之前启动。如果它没有启动,就不要初始化它。(您始终可以使用空检查 - 如果线程为空,则创建它并启动它)。

于 2009-12-10T18:27:28.803 回答
1

您在同步上下文之外检查队列的计数。由于 Queue 不是线程安全的,这可能是一个问题(可能在 Enqueue 处于进程中时 Count 返回 1 但没有项目可以出队),如果您要使用多个消费者,则会出现严重错误。

您可能想阅读 Joseph Albahari 撰写的线程文章,他也为您的问题提供了一个很好的示例,以及没有 OS 同步对象的“更好”解决方案

于 2009-12-10T18:24:15.513 回答
0

您必须将 lock() 放在对队列的所有引用周围。您在识别处理完成方面也存在一些问题(在队列结束时您会收到一个信号,但队列将为空)。

public Results[] Evaluate()
{
    processingComplete = false;
    lock(lockobject) 
    {
        resultQueue.Clear();
    }
    for (int i = 0; i < data.Length; ++i)
    {
        if (saveThread.ThreadState == ThreadState.Unstarted)
            saveThread.Start();
       //-....
       //Process data 
       // 
        lock (lockobject)
        {
            resultQueue.Enqueue(result);
        }

        signal.Set();
    }
    processingComplete = true;
}

private void SaveResults()
{
    Model dataAccess = new Model();

    while (true)
    {
        int count;

        lock(lockobject)  
        {
            count = resultQueue.Count;
        }
        if (count == 0)
            signal.WaitOne();

        lock(lockobject)  
        {
            count = resultQueue.Count;
        }
        // we got a signal, but queue is empty, processing is complete
        if (count == 0)
            break;

        ModelResults result;
        lock (lockobject)
        {
            result = resultQueue.Dequeue();
        }
        dataAccess.Save(result);
    }
    SaveCompleteSignal.Set();
}
于 2009-12-10T19:04:01.307 回答