0

我正在做我的第一个多线程项目,因此有一些我不确定的事情。关于我的设置的详细信息是关于上一个问题,简而言之:我有一个由Executors.newFixedThreadPool(N). 给一个线程一个操作,该操作对本地和远程资源进行一系列查询并迭代填充一个ArrayBlockingQueue,而其余线程调用take()队列上的方法并处理队列中的对象。

尽管小型和有监督的测试似乎运行良好,但我不确定如何处理特殊情况,例如开始(队列还没有项目)、结束(队列已清空)以及任何最终InterruptedExceptions的 . 我在这里阅读了一些关于 SO 的文章,这让我看到了GoetzKabutz的两篇非常好的文章。共识似乎是不应忽视这些例外。但是我不确定提供的示例与我的情况有何关系,我没有thread.interrupt()在我的代码中调用任何地方......说到这里,我不确定我是否应该这样做......

综上所述,鉴于下面的代码,我如何最好地处理特殊情况,例如终止条件和 InterrruptedExceptions?希望这些问题有意义,否则我会尽我所能进一步描述它。

提前致谢,


编辑:我一直在努力实施一段时间,我遇到了一个新的问题,所以我想我会更新情况。我很不幸遇到了ConcurrentModificationException这很可能是由于线程池的不完全关闭/终止。一旦我发现我可以使用isTerminated()我就尝试了,然后我得到了一个IllegalMonitorStateException由于不同步的wait(). 代码的当前状态如下:

我遵循了@Jonathan 回答中的一些建议,但我认为他的建议与我需要/想要的不太一样。背景故事和我上面提到的一样,相关代码如下:

类持有/管理池,并提交可运行文件:

public void serve() {
    try {
        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            PathwayImpl p = bq.take();

            if (p.getId().equals("0")){
                System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
                pool.shutdown();
                            // give 3 minutes per item in queue to finish up
                pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
                break;
            }
            int sortMethod = AnalysisParameters.getInstance().getSort_method();
            pool.submit(new AnalysisAction(p)); 
        }
      } catch (Exception ex) {
          ex.printStackTrace();
          System.err.println("Unexpected error in core analysis, terminating execution!");
          System.exit(0);
      }finally{   pool.shutdown();     }
}

public boolean isDone(){
    if(this.started)
        return pool.isTerminated();
    else
        return false;
    }

元素通过位于单独类中的以下代码添加到队列中:

this.queue.offer(path, offer_wait, TimeUnit.MINUTES);

......背后的动机offer()而不是take()乔纳森提到的。由于我的分析需要很长时间,因此无法预料的块很烦人并且很难弄清楚。所以我需要相对快速地知道失败是由于坏块,还是只是处理数字......


最后;这是我的测试类中的代码,我在其中检查“并发服务”(此处命名为 cs)与要分析的其余对象之间的交互:

cs.serve();
synchronized (this) {
    while(!cs.isDone())
    this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();

我意识到这是一个很长的问题,但我试图做到详细和具体。希望它不会太拖累,如果它是我道歉......

4

1 回答 1

1

而不是使用take, which 块,使用更像这样的东西:

PathwayImpl p = null;
synchronized (bq) {
    try {
        while (bq.isEmpty() && !stopSignal) {
            bq.wait(3000); // Wait up to 3 seconds and check again
        }

        if (!stopSignal) {
            p = bq.poll();
        }
    }
    catch (InterruptedException ie) {
        // Broke us out of waiting, loop around to test the stopSignal again
    }
}

这假定该块包含在某种while (!stopSignal) {...}.

然后,在添加到队列的代码中,执行以下操作:

synchronized (bq) {
    bq.add(item);
    bq.notify();
}

至于InterruptedExceptions,它们适用于向线程发出立即测试停止信号的信号,而不是等到下一次超时和测试。我建议再次测试您的停止信号,并可能记录异常。

我在发出恐慌信号时使用它们,而不是正常关机,但这种情况很少需要。

于 2011-03-16T13:54:55.120 回答