我有一个生产者-消费者模式。1 个生产者(做一些工作)和 7 个消费者(在 8 核机器上)。生产者意味着一次下载 7 个文件(超过 2000 个)并等待 7 个线程处理它们,然后再继续下一个 7 个。设置大致如下:
ExecutorService threadPool = Executors.newFixedThreadPool(8);
int pollWait = 4;
int queSize = 7;
Broker broker = new Broker(queSize,pollWait);
Broker 实现了一个容量为 7 的 LinkedBlockingQueue。
Producer producer = new Producer();
producer.setBroker(broker);
Future<Integer> producerStatus = threadPool.submit(producer);
int numConsumerThreads = 7;
ArrayList<Future<Integer>> consumers = new ArrayList<Future<Integer>>();
for(int c=0; c < numConsumerThreads;c++)
{
String threadName = "consumer-"+(c+1);
Consumer consumer = new Consumer(threadName);
consumer.setBroker(broker);
Future<Integer> consumerStatus = threadPool.submit(consumer);
consumers.add(consumerStatus);
}
if(producerStatus.isDone())
{
Integer numFilesRead = producerStatus.get();
System.out.println("[INFO] Total number of files downloaded by producer: " + numFilesRead);
}
int k=0,numIndexedByThread=0;
while(!consumers.isEmpty())
{
final Iterator<Future<Integer>> i = consumers.iterator();
while (i.hasNext())
{
Future<Integer> f = i.next();
if (f.isDone())
{
i.remove();
numIndexedByThread = f.get();
k += numIndexedByThread;
}
}
}
System.out.println("[INFO] Total number of files indexed: " + k);
threadPool.shutdown();
我看到的问题是程序没有执行。