我写了以下多线程程序。如果其中一个线程返回 false 作为返回,我想取消所有线程。但是,尽管我通过取消单个任务来取消线程。它不工作。为了取消线程,我需要进行哪些更改?
我写了以下多线程程序。如果其中一个线程返回 false 作为返回,我想取消所有线程。但是,尽管我通过取消单个任务来取消线程。它不工作。为了取消线程,我需要进行哪些更改?
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
public class BeamWorkerThread implements Callable<Boolean> {
private List<BeamData> beamData;
private String threadId;
public BeamScallopingWorkerThread(
List<BeamData> beamData, String threadId) {
super();
this.beamData = beamData;
this.threadId = threadId;
}
@Override
public Boolean call() throws Exception {
Boolean result = true;
DataValidator validator = new DataValidator();
Iterator<BeamScallopingData> it = beamData.iterator();
BeamData data = null;
while(it.hasNext()){
data = it.next();
if(!validator.validateDensity(data.getBin_ll_lat(), data.getBin_ll_lon(), data.getBin_ur_lat(), data.getBin_ur_lon())){
result = false;
break;
}
}
return result;
}
}
ExecutorService threadPool = Executors.newFixedThreadPool(100);
List<Future<Boolean>> results = new ArrayList<Future<Boolean>>();
long count = 0;
final long RowLimt = 10000;
long threadCount = 1;
while ((beamData = csvReader.read(
BeamData.class, headers1, processors)) != null) {
if (count == 0) {
beamDataList = new ArrayList<BeamData>();
}
beamDataList.add(beamData);
count++;
if (count == RowLimt) {
results.add(threadPool
.submit(new BeamWorkerThread(
beamDataList, "thread:"
+ (threadCount++))));
count = 0;
}
}
results.add(threadPool.submit(new BeamWorkerThread(
beamDataList, "thread:" + (threadCount++))));
System.out.println("Number of threads" + threadCount);
for (Future<Boolean> fs : results)
try {
if(fs.get() == false){
System.out.println("Thread is false");
for(Future<Boolean> fs1 : results){
fs1.cancel(true);
}
}
} catch(CancellationException e){
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} finally {
threadPool.shutdownNow();
}
}
我的评论
感谢大家的投入,我对回复感到不知所措。我确实知道,良好实现的线程会使应用程序达到高潮,而糟糕的实现也会使应用程序陷入困境。我同意我有好主意,但我没有其他选择。我有超过 1000 万的记录,因此我将受到内存限制和时间限制。我需要解决这两个问题。因此,我不是吞下整个数据,而是将其分解成块,而且如果一个数据无效,我不想浪费时间处理剩余的数百万个数据。我发现@Mark Peters 的建议是一种选择。相应地进行了更改,我的意思是添加标志来中断任务,我很困惑未来列表的工作方式。我的理解是,一旦所有线程返回其值,就会开始循环遍历未来列表的每个字段。在这种情况下,无法从主列表中途取消所有任务。我需要将对象的引用传递给每个线程。如果一个线程使用线程引用发现无效数据,则调用每个线程的取消方法来设置中断标志。
while(it.hasNext() && !cancelled) {
if(!validate){
// loop through each thread reference and call Cancel method
}
}