1

我写了以下多线程程序。如果其中一个线程返回 false 作为返回,我想取消所有线程。但是,尽管我通过取消单个任务来取消线程。它不工作。为了取消线程,我需要进行哪些更改?

我写了以下多线程程序。如果其中一个线程返回 false 作为返回,我想取消所有线程。但是,尽管我通过取消单个任务来取消线程。它不工作。为了取消线程,我需要进行哪些更改?

import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;

public class BeamWorkerThread implements Callable<Boolean> {


  private List<BeamData> beamData;
  private String threadId;



 public BeamScallopingWorkerThread(
    List<BeamData> beamData, String threadId) {
   super();
   this.beamData = beamData;
   this.threadId = threadId;
  }





 @Override
  public Boolean call() throws Exception {

  Boolean result = true;

  DataValidator validator = new DataValidator();
   Iterator<BeamScallopingData> it = beamData.iterator();

  BeamData data = null;
  while(it.hasNext()){

   data = it.next();

   if(!validator.validateDensity(data.getBin_ll_lat(), data.getBin_ll_lon(), data.getBin_ur_lat(), data.getBin_ur_lon())){
     result = false;
     break;
    }
   }
   return result;
  }

}









   ExecutorService threadPool = Executors.newFixedThreadPool(100);
        List<Future<Boolean>> results = new ArrayList<Future<Boolean>>();

        long count = 0;
         final long RowLimt = 10000;
         long threadCount = 1;

        while ((beamData = csvReader.read(
           BeamData.class, headers1, processors)) != null) {

         if (count == 0) {
           beamDataList = new ArrayList<BeamData>();
          }

         beamDataList.add(beamData);
          count++;
          if (count == RowLimt) {
           results.add(threadPool
             .submit(new BeamWorkerThread(
               beamDataList, "thread:"
                 + (threadCount++))));
           count = 0;
          }
         }

        results.add(threadPool.submit(new BeamWorkerThread(
           beamDataList, "thread:" + (threadCount++))));
         System.out.println("Number of threads" + threadCount);
         for (Future<Boolean> fs : results)
          try {


           if(fs.get() == false){
            System.out.println("Thread is false");
            for(Future<Boolean> fs1 : results){
             fs1.cancel(true);
            }
           }
          } catch(CancellationException e){

         } catch (InterruptedException e) {

         } catch (ExecutionException e) {

         } finally {
           threadPool.shutdownNow();
          }

       }

我的评论

感谢大家的投入,我对回复感到不知所措。我确实知道,良好实现的线程会使应用程序达到高潮,而糟糕的实现也会使应用程序陷入困境。我同意我有好主意,但我没有其他选择。我有超过 1000 万的记录,因此我将受到内存限制和时间限制。我需要解决这两个问题。因此,我不是吞下整个数据,而是将其分解成块,而且如果一个数据无效,我不想浪费时间处理剩余的数百万个数据。我发现@Mark Peters 的建议是一种选择。相应地进行了更改,我的意思是添加标志来中断任务,我很困惑未来列表的工作方式。我的理解是,一旦所有线程返回其值,就会开始循环遍历未来列表的每个字段。在这种情况下,无法从主列表中途取消所有任务。我需要将对象的引用传递给每个线程。如果一个线程使用线程引用发现无效数据,则调用每个线程的取消方法来设置中断标志。

while(it.hasNext() && !cancelled) {
         if(!validate){
               // loop through each thread reference and call Cancel method
         }
      }
4

3 回答 3

7

无论您尝试取消所有剩余的任务,如果您的代码没有仔细编写为可中断的,它将失败。这究竟意味着什么不仅仅是一个 StackOverflow 答案。一些指导方针:

  • 不要吞咽InterruptedException。使其发生中断任务;
  • 如果您的代码在可中断方法上花费的时间不多,则必须插入显式Thread.interrupted()检查并做出适当的反应。

编写可中断代码通常不是初学者的事,所以要小心。

于 2013-08-13T18:58:52.337 回答
2

取消Future不会中断正在运行的代码。它主要用于防止任务首先运行。

虽然您可以提供 atrue作为参数,这将中断运行任务的线程,但只有当线程在抛出InterruptedException. 除此之外,没有任何东西隐式检查interrupted线程的状态。

就您而言,没有阻塞;忙碌的工作需要时间。一种选择是在循环的每个阶段检查一个 volatile 布尔值:

public class BeamWorkerThread implements Callable<Boolean> {

   private volatile boolean cancelled = false;
   @Override
   public Boolean call() throws Exception {
      //...
      while(it.hasNext() && !cancelled) {
         //...
      }
   }

   public void cancel() {
       cancelled = true;
   }
}

然后你会保留对你的BeamWorkerThread对象的引用并调用cancel()它来抢占它的执行。

为什么我不喜欢打断?

Marko 提到cancelled上面的标志本质上是在重新发明Thread.interrupted()。这是一个有效的批评。这就是为什么我不喜欢在这种情况下使用中断的原因。

1. 它依赖于某些线程配置。

如果您的任务表示可以提交给执行程序或直接调用的可取消代码段,则在一般情况下Thread.interrupt()使用取消执行假定接收中断的代码将是应该知道如何干净地取消任务的代码。

在这种情况下可能是这样,但我们之所以知道是因为我们知道取消和任务在内部是如何工作的。但是想象一下我们有这样的事情:

  • 任务完成工作
  • 侦听器会收到第一个工作的线程通知
    • 第一个侦听器决定使用取消任务Thread.interrupt()
    • 第二个听众做了一些可中断的工作,然后被打断了。它记录但忽略中断。
  • 任务没有收到中断,也没有取消任务。

换句话说,我觉得这种机制interrupt()过于全球化。像任何共享的全局状态一样,它对所有参与者做出假设。这就是我所说的使用interrupt()暴露/耦合来获取有关运行上下文的详细信息的意思。通过将其封装在cancel()仅适用于该任务实例的方法中,您可以消除该全局状态。

2. 这并不总是一种选择。

这里的经典示例是InputStream. 如果您有一项阻止从 读取的任务InputStreaminterrupt()则不会采取任何措施来解除阻止。解除阻塞的唯一方法是手动关闭流,这是cancel()任务本身的最佳方法。有一种方法来取消任务(例如Cancellable),不管它的实现如何,对我来说似乎是理想的。

于 2013-08-13T18:59:20.760 回答
1

使用ExecutorService.shutdownNow()方法。它将阻止执行者接受更多提交并返回Future正在进行的任务的对象,您可以调用这些对象cancel(true)来中断执行。当然,您将不得不丢弃此执行程序,因为它无法重新启动。

如果线程没有在监视器上等待(没有被中断地阻塞),并且如果您吞下将在这种情况下引发的 InterruptedException,则该cancel()方法可能不会立即终止执行。

于 2013-08-13T18:58:32.900 回答