0

我写了一个典型的生产者-消费者程序:一个生产者,负责填充队列;一个消费者,负责使队列出队。下面是我的实现。

棘手的部分是还有另一个共享变量vFlagprocess(str)可能会将其更改为true 这将被生产者线程检测到,并导致清空队列。

private class PQueue
{
    private Queue<String> queue;
    PQueue()
    {
        queue = new LinkedList<String>();
    }

    public synchronized void add(String str)
    {
        if(queue.size() > 1)
        {
        wait();
        }
        queue.add(token);
        notify();
    }

    public synchronized String poll()
    {
        if(queue.size() == 0)
        {
            wait();
        }
        String str = queue.poll();
        notify();
        return str;
    }

    public synchronized void clear()
    {
        queue.clear();
    }

}

PQueue queue = new PQueue();
private class Producer implements Runnable
{
    public void run()
    {
        while (true) {
            String str = read();
            queue.add(str);

            if(vFlag.value == false)
            {
                queue.clear();
                vFlag.value = true;
            }

            if (str.equals("end"))
                break;
        }           
        exitFlag = true;
    }
}

private class Consumer implements Runnable
{   
    public void run()
    {
        while(exitFlag == false)
        {
            String str = queue.poll();
            process(str, vFlag);
        }
    }
}

现在,仍然存在一些错误。谁能给我一些一般的线索或建议,比如更好的并发编程理念或机制来避免并发错误?

对于上述实现,程序有时会在process(str, vFlag)中报告错误:

Exception in thread "Thread-3" java.lang.NullPointerException

在其他情况下,程序正常运行。如果我注释掉queue.clear(),效果很好。

4

2 回答 2

2

您需要确保该vFlag.value字段是volatile因为布尔值被多个线程更改。另外,我假设生产者一旦看到value已设置为true,就会排空队列,然后将其设置为false。听起来可能会导致竞争条件。您可能会考虑使用AtomicBoolean,以便您可以使用compareAndSet(...)来避免覆盖。

我不确定这是一个问题,但您应该考虑执行以下操作来删除一些设置/未设置的竞争条件。你不需要AtomicBoolean这个:

while (!vFlag.value) {
    // set to true immediately and then clear to avoid race
    vFlag.value = true;
    queue.clear();
}

您也可以考虑使用 aBlockingQueue而不是同步您自己的Queue. BlockingQueues 为您处理线程安全,并且还有一个为您执行逻辑的take()方法。wait()如果您的队列中有一个项目,您的代码似乎想要阻止,因此您可能希望使用new LinkedBlockingQueue(1)将队列的大小限制为 1 来阻止put(...).

最后,在这些情况下总是建议使用while循环而不是if循环:

// always use while instead of if statements here
while (queue.size() == 0) {
    wait();
}

这解决了意外信号和多个消费者或生产者的问题。

于 2012-07-31T21:59:45.970 回答
2

对于通用线程安全队列,您可以使用ConcurrentLinkedQueue ,或者如果您想一次限制队列中的一个对象SynchronousQueue ,如果一次一个对象,但可以使用ArrayBlockingQueue预先计算下一个对象。

Java 内存模型允许有效地缓存值,以及通过编译器重新排序指令,假设如果只有一个线程正在运行,重新排序不会改变结果。因此,生产者不能保证永远将 vFlag 视为 true,因为它被允许使用缓存的值。因此,消费者线程将 vflag 视为 true,而生产者将其视为 false。将变量声明为 volatile 意味着 JVM 每次都必须查找该值,并在访问的任一侧之间产生一个内存栅栏,以防止重新排序。例如

public class vFlagClass {
  public volatile boolean value;
}

消费者可以在设置 vFlag 为真后继续访问队列,消费者应该自己清除队列,或者需要等待生产者发出队列被清除的信号。即使假设 value 是可变的,在 vFlag.value 设置为 true 之后,在激活生产者并清除队列之前,程序消耗队列中的所有输入仍然是可接受的操作。

此外,您的队列结构假设只有一个线程正在写入和一个线程正在读取,通过让生产者清除队列,生产者有效地从队列中读取并打破您的不变量。

对于您的错误,我想象正在发生的事情是消费者告诉生产者在队列为空时清除队列,然后再次尝试队列,队列仍然是空的,所以它等待。生产者然后醒来,填充队列,这唤醒了消费者,生产者然后清除队列,消费者开始并轮询队列,但是队列是空的。因此返回 NULL。换句话说

Producer : queue.add(str); // queue size is now 1
Producer : goes to end of loop
Consumer : String str = queue.poll(); // queue size is now 0
Consumer : process(str, vFlag); // vFlag.value is set to false 
Consumer : String str = queue.poll(); // waits at
            if(queue.size() == 0)
            {
                wait();
            }
Producer : queue.add(str);
Producer : notify() // wakes up consumer
Producer : queue.clear();
Consumer : String str = queue.poll(); // note queue is empty, so returns NULL

在这种情况下,您需要将 PQueue.poll 更改为

String str = null;
while((str = queue.poll()) == null)
    wait();
return str;

但这不是一个好的解决方案,因为它仍然具有前面提到的所有其他错误。注意:这只允许两个线程从队列中读取,它不能防止多个线程写入队列。

编辑:注意到另一个竞争条件,生产者可能将“结束”放入队列,消费者读取并处理它,然后生产者更新 exitFlag。请注意,更改 exitFlag 并不能解决任何问题,因为它与读取陈旧数据无关。与其使用 exitFlag,不如让 Consumer 在读取“end”时也中断。

有关Java 中内存一致性的更多信息,请参阅文档。

于 2012-07-31T22:19:28.853 回答