我试图产生一些线程,并在它们执行时将它们放在一个列表中。当他们完成处理时,我想收集他们的结果以进行演示。这样我就可以拥有一个包含许多线程的列表,然后一旦它们可用,我就可以调用future.get
并使用它们的回调信息。
出于某种原因,我错过了许多结果。当我单步执行代码f.get()
时,它不应该被忽略,我不知道为什么。
我的代码如下:
public class ThreadTesterRunner {
static List<Integer> randoms = new ArrayList<>();
public static void main(String[] args) throws InterruptedException {
final Phaser cb = new Phaser();
ThreadRunner tr = new ThreadRunner(cb);
Thread t = new Thread(tr, "Thread Runner");
t.start();
boolean process = true;
// wait until all threads process, then print reports
while (process){
if(tr.isFinished()){
System.out.println("Print metrics");
process = false;
}
Thread.sleep(1000);
}
}
}
class ThreadRunner implements Runnable {
private ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Phaser barrier;
private boolean finished=false;
public ThreadRunner(Phaser phaser) {this.barrier = phaser;}
public void run(){
try {
List<Future<Integer>> list = new ArrayList<>();
boolean stillLoop = true; int i = 0;
final Phaser p = this.barrier;
Callable<Integer> task = new Callable<Integer>() {
public Integer call() throws Exception {
return new Reader().doRun(p);
}
};
List<Integer> randoms = new ArrayList<>();
Integer size;
while (stillLoop){
System.out.println("i "+i);
list.add(executorService.submit(task));
for(Future<Integer> f: list){
if(f.isDone()){
size = f.get();
System.out.println("size "+size);
list.remove(f);
} else {
// keep processing
}
}
if(i == 2){
System.out.println("breaking out of loop");
stillLoop = false;
}
i++;
}
this.barrier.awaitAdvance(0);
this.finished=true;
} catch (Exception e1) {
e1.printStackTrace();
}
}
public boolean isFinished(){
return this.finished;
}
}
class Reader {
private Phaser readBarrier;
private ExecutorService executorService = Executors.newFixedThreadPool(20);
public Reader() {
}
Random randomGenerator = new Random();
public Integer doRun(Phaser phaser) throws Exception {
phaser.register();
this.readBarrier = phaser;
System.out.println("Reading...");
int i;
int r = randomGenerator.nextInt(100);
System.out.println("r "+r);
ThreadTesterRunner.randoms.add(r);
int a = this.readBarrier.arrive();
return r; //i;
}
}
关于为什么会发生这种情况的任何想法?
编辑:
好吧,我想我已经启动并运行了:
class ThreadRunner implements Runnable {
// static int timeOutTime = 2;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Phaser barrier;
private boolean finished = false;
public ThreadRunner(Phaser phaser) {
this.barrier = phaser;
}
public void run() {
try {
List<Future<Integer>> list = new CopyOnWriteArrayList<>();
boolean stillLoop = true;
int i = 0;
final Phaser p = this.barrier;
Callable<Integer> readerTask = new Callable<Integer>() {
public Integer call() throws Exception {
return new Reader().doRun(p);
}
};
List<Integer> randoms = new ArrayList<>();
Integer size;
while (stillLoop) {
if (i <= 2) {
list.add(executorService.submit(readerTask));
}
if (!list.isEmpty()) {
for (Future<Integer> f : list) {
if (f.isDone()) {
size = f.get();
randoms.add(size);
System.out.println("Process read with a size of "+ size);
list.remove(f);
} else {
// System.out.println("skipping");
}
}
} else {
stillLoop = false;
}
i++;
}
System.out.println("at barrier waiting");
this.barrier.awaitAdvance(0);
System.out.println("barrier crossed");
this.finished = true;
} catch (Exception e1) {
e1.printStackTrace();
}
}
public boolean isFinished() {
return this.finished;
}
}