0

n 个线程产生一个BlockingQueue。当队列已满时,消费者排空队列并进行一些处理。

我应该如何在以下两种实施选择之间做出决定?

选择A:消费者定期轮询队列以检查它是否已满,所有写入者都在等待(毕竟这是一个阻塞队列:)。

选择 B:我使用同步的“放置”方法实现了我自己的队列。在放置提供的元素之前,我测试队列是否接近满(满减 1 个元素)。然后我放置元素并通知我的消费者(正在等待)。

第一个解决方案是最简单的,但会轮询;这让我很恼火。在我看来,第二种解决方案更容易出错,并且需要更多的编码。

4

5 回答 5

2

我建议编写您的代理队列,该队列将在内部与 Exchanger 实例一起包装队列实例。您的代理方法将调用委托给您的内部队列。添加时检查内部队列是否已满,当已满时,与消费者线程交换内部队列。消费者线程将交换一个空队列以换取已填充的队列。您的代理队列将继续填充空队列,而消费者可以继续处理已填充的队列。这两个活动可以并行运行。当双方都准备好后,他们可以再次交换。

class MyQueue implements BlockingQueue{
    Queue internalQueue = ...
    Exchanger<Queue> exchanger;

    MyQueue(Exchanger<BlockingQueue> ex){
    this.exchanger = ex;
    }

     .
     .
     .

    boolean add (E e) {
      try{
        internalQueue.add(e);
      }catch(IllegalStateException ise){
        internalQueue = exchanger.exchange(internalQueue);
      }
      internalQueue.add(e);     
    }

}

class Consumer implements Runnable {
    public void run() {
        Queue currentQueue = new empty queue;
        while (...){
           Object o = currentQueue.remove();
           if (o == null){
              currentQueue = exchanger.exchange(currentQueue);
              continue;
           }
           //cast and process the element
        } 
    }
}
于 2012-05-15T11:53:58.640 回答
0

第二种解决方案显然更好。它并没有那么复杂。您可以继承或包装任何其他BlockingQueue方法并覆盖其方法offer(),如下所示:调用“真实”offer()。如果返回true,则退出。否则触发工作线程工作并立即offer()超时调用。

这是几乎伪代码:

public boolean offer(E e) {
    if (queue.offer(e)) {
        return true;
    }
    boolean result = queue.offer(e, timeout, unit); // e.g. 20 sec. - enough for worker to dequeue at least one task from the queue, so the place will be available.
    worker.doYouJob();
    return result; }
于 2012-05-15T10:40:00.237 回答
0

我不知道您是否需要这样的队列实现:消费者在队列变满时等待,并且只有当它完全耗尽并开始处理时。

您应该为消费者阻塞队列,直到它变满为止。认为您需要覆盖 drain() 方法以使其在队列已满时等待。比您的消费者只需调用并等待排水方法。不需要从生产者到消费者的通知。

于 2012-05-15T11:10:27.663 回答
0

我使用了 CountdownLatch,它很简单,效果很好。感谢其他想法:)

于 2012-05-16T08:10:22.713 回答
0

使用观察者模式。让您的消费者向队列通知程序注册。当生产者进行放置时,队列将决定是否通知任何侦听器。

于 2012-05-15T11:17:02.013 回答