2

我有一个生产者-消费者模式。1 个生产者(做一些工作)和 7 个消费者(在 8 核机器上)。生产者意味着一次下载 7 个文件(超过 2000 个)并等待 7 个线程处理它们,然后再继续下一个 7 个。设置大致如下:

            ExecutorService threadPool = Executors.newFixedThreadPool(8);

        int pollWait = 4; 
        int queSize = 7;
        Broker broker = new Broker(queSize,pollWait);

Broker 实现了一个容量为 7 的 LinkedBlockingQueue。

        Producer producer = new Producer(); 
        producer.setBroker(broker);
        Future<Integer> producerStatus = threadPool.submit(producer);

        int numConsumerThreads = 7;

        ArrayList<Future<Integer>> consumers = new ArrayList<Future<Integer>>();

        for(int c=0; c < numConsumerThreads;c++)
        {
            String threadName = "consumer-"+(c+1);
            Consumer consumer = new Consumer(threadName);
            consumer.setBroker(broker);

            Future<Integer> consumerStatus = threadPool.submit(consumer);
            consumers.add(consumerStatus);                            
        }   

        if(producerStatus.isDone())
        {
            Integer numFilesRead = producerStatus.get();
            System.out.println("[INFO] Total number of files downloaded by producer: " + numFilesRead);
        }

        int k=0,numIndexedByThread=0;
        while(!consumers.isEmpty())
        {
            final Iterator<Future<Integer>> i = consumers.iterator();
            while (i.hasNext())
            {                   
               Future<Integer> f = i.next();
               if (f.isDone())
               {
                  i.remove();
                  numIndexedByThread = f.get();
                  k += numIndexedByThread;
               } 
             }
        }
        System.out.println("[INFO] Total number of files indexed: " + k);
        threadPool.shutdown();

我看到的问题是程序没有执行。

4

1 回答 1

1

你已经序列化了结果的处理

以下行阻塞并序列化结果的整个处理:

Integer numConsumedByThread = consumerStatus.get();

正确的习惯是检查每一个,只有在返回 trueisDone()时才调用,否则阻塞循环,直到它有结果要返回。.get()isDone().get()

伪代码

while(!listOfFutures.isEmpty())
{
    final Iterator<Future> i = listofFutures.iterator;
    while (i.hasNext())
    {
       Future f = i.next();
       if (f.isDone())
       {
          i.remove();
          // call .get() and process the completed result 
       } 
     }
}

阅读 JavaDocsFuture.get()

于 2013-10-30T20:08:07.653 回答