5

我有一种情况,不同的线程填充一个队列(生产者),一个消费者从这个队列中检索元素。我的问题是,当从队列中检索其中一个元素时,会丢失一些元素(丢失信号?)。生产者代码是:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

它们是通过以下方式创建和运行的:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

消费者代码是:

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

运行代码时,有时会添加 20 个元素并检索 20 个元素,但在其他情况下检索的元素少于 20 个。知道如何解决这个问题吗?

4

3 回答 3

10

我建议您使用 BlockingQueue 而不是 Queue。LinkedBlockingDeque 可能是您的理想选择。

您的代码如下所示:

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}

然后你需要

queue.take()

在您的消费者线程上

这个想法是 .take() 将阻塞,直到队列中有一个可用的项目,然后正好返回一个(我认为您的实现会受到影响:轮询时缺少通知)。.put() 负责为您处理所有通知。无需等待/通知。

于 2012-04-04T07:37:32.913 回答
2

您的代码中的问题可能是因为您使用notify的是notifyAll. 如果有一个等待锁的线程,前者只会唤醒一个线程。这允许没有线程在等待并且信号丢失的竞争条件。notifyAll 将通过要求所有线程唤醒以检查它们是否可以获得锁来以较小的性能成本强制正确性。

这在Effective Java 第一版(参见第 150 页)中得到了最好的解释。第 2 版删除了这个提示,因为程序员应该使用 java.util.concurrent 来提供更强的正确性保证。

于 2012-04-04T08:27:21.660 回答
2

同时使用 ConcurrentLinkedQueue 和同步似乎是个坏主意。它首先违背了并发数据结构的目的。

ConcurrentLinkedQueue 数据结构没有问题,将其替换为 BlockingQueue 将解决问题,但这不是根本原因。

问题出在 queue.wait(10) 上。这是定时等待方法。一旦 10ms 过去,它将再次获取锁。

  1. 通知 (queue.notify() ) 将丢失,因为如果 10 毫秒已过,则没有消费者线程在等待它。

  2. 生产者将无法添加到队列中,因为他们无法获取锁,因为消费者再次声明了锁。

迁移到 BlockingQueue 解决了您的问题,因为您删除了 wait(10) 代码,并且 BlockingQueue 数据结构处理了等待和通知。

于 2014-09-03T17:16:34.807 回答