7

我对生产者-消费者模式的理解是,它可以使用生产者和消费者之间共享的队列来实现。生产者将工作提交到共享队列,消费者检索并处理它。也可以由生产者直接提交给消费者来实现(生产者线程直接提交给消费者的执行者服务)。 

现在,我一直在研究提供线程池的一些常见实现的 Executors 类。根据规范,newFixedThreadPool 方法“重用在共享无界队列上运行的固定数量的线程”。他们在这里谈论的是哪个队列? 

如果Producer直接向consumer提交任务,是不是ExecutorService的内部队列包含了Runnables的列表?

或者它是中间队列,以防生产者提交到共享队列? 

可能是我错过了重点,但有人可以澄清一下吗?

4

3 回答 3

4

你是对的,ExecutorService它不仅是一个线程池,而且它是一个完整的 Producer-Consumer 实现。这个内部队列实际上是一个线程安全的Runnables 队列(FutureTask准确地说)持有任务 you submit()

池中的所有线程都被阻塞在该队列上,等待执行任务。当您submit()执行一项任务时,只有一个线程会拾取并运行它。当然submit()不是等待池中的线程完成处理。

另一方面,如果您提交大量任务(或长时间运行的任务),您最终可能会导致池中的所有线程都被占用,并且一些任务在队列中等待。一旦任何线程完成其任务,它将立即从队列中选择第一个。

于 2011-08-11T20:16:55.077 回答
1
public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  
于 2012-06-12T09:59:22.860 回答
0

检查一下:
Java (RabbitMQ) 中的生产者-消费者示例(它是为另一个库编写的,但它是用 Java 编写的,它清楚地展示了这个概念;)
希望它有所帮助!

PS:实际上,它有几个例子,但你明白了;)

于 2011-08-11T22:47:03.363 回答