我正在做我的第一个多线程项目,因此有一些我不确定的事情。关于我的设置的详细信息是关于上一个问题,简而言之:我有一个由Executors.newFixedThreadPool(N)
. 给一个线程一个操作,该操作对本地和远程资源进行一系列查询并迭代填充一个ArrayBlockingQueue
,而其余线程调用take()
队列上的方法并处理队列中的对象。
尽管小型和有监督的测试似乎运行良好,但我不确定如何处理特殊情况,例如开始(队列还没有项目)、结束(队列已清空)以及任何最终InterruptedExceptions
的 . 我在这里阅读了一些关于 SO 的文章,这让我看到了Goetz和Kabutz的两篇非常好的文章。共识似乎是不应忽视这些例外。但是我不确定提供的示例与我的情况有何关系,我没有thread.interrupt()
在我的代码中调用任何地方......说到这里,我不确定我是否应该这样做......
综上所述,鉴于下面的代码,我如何最好地处理特殊情况,例如终止条件和 InterrruptedExceptions?希望这些问题有意义,否则我会尽我所能进一步描述它。
提前致谢,
编辑:我一直在努力实施一段时间,我遇到了一个新的问题,所以我想我会更新情况。我很不幸遇到了ConcurrentModificationException
这很可能是由于线程池的不完全关闭/终止。一旦我发现我可以使用isTerminated()
我就尝试了,然后我得到了一个IllegalMonitorStateException
由于不同步的wait()
. 代码的当前状态如下:
我遵循了@Jonathan 回答中的一些建议,但我认为他的建议与我需要/想要的不太一样。背景故事和我上面提到的一样,相关代码如下:
类持有/管理池,并提交可运行文件:
public void serve() {
try {
this.started = true;
pool.execute(new QueryingAction(pcqs));
for(;;){
PathwayImpl p = bq.take();
if (p.getId().equals("0")){
System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
pool.shutdown();
// give 3 minutes per item in queue to finish up
pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
break;
}
int sortMethod = AnalysisParameters.getInstance().getSort_method();
pool.submit(new AnalysisAction(p));
}
} catch (Exception ex) {
ex.printStackTrace();
System.err.println("Unexpected error in core analysis, terminating execution!");
System.exit(0);
}finally{ pool.shutdown(); }
}
public boolean isDone(){
if(this.started)
return pool.isTerminated();
else
return false;
}
元素通过位于单独类中的以下代码添加到队列中:
this.queue.offer(path, offer_wait, TimeUnit.MINUTES);
......背后的动机offer()
而不是take()
乔纳森提到的。由于我的分析需要很长时间,因此无法预料的块很烦人并且很难弄清楚。所以我需要相对快速地知道失败是由于坏块,还是只是处理数字......
最后;这是我的测试类中的代码,我在其中检查“并发服务”(此处命名为 cs)与要分析的其余对象之间的交互:
cs.serve();
synchronized (this) {
while(!cs.isDone())
this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();
我意识到这是一个很长的问题,但我试图做到详细和具体。希望它不会太拖累,如果它是我道歉......