24
    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++){

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

        }       
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){

        System.out.println("name is:"+future.get(i).get().getName());
    }

现在我想invokeAll在进入 for 循环之前将所有任务池化,但是当我运行这个程序时,for 循环会在此之前执行invokeAll并抛出这个异常:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source)
at java.lang.Thread.run
4

3 回答 3

21

an 的ExecutorService工作方式是,当您调用invokeAll它时,它会等待所有任务完成:

执行给定的任务,返回一个 Futures 列表,在所有完成时保存它们的状态和结果。Future.isDone() 对于返回列表的每个元素都是 true。请注意,已完成的任务可以正常终止,也可以通过引发异常终止。如果在此操作进行时修改了给定的集合,则此方法的结果是不确定的。1(强调添加)

这意味着您的任务已全部完成,但有些任务可能引发了异常。此异常是Future调用get的一部分,导致异常被重新抛出并包裹在ExecutionException.

来自你的堆栈跟踪

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

你可以看到确实是这样。您的一项任务因 NPE 而失败。ExecutorService捕获到异常并通过在调用时抛出一个来告诉ExecutionExceptionFuture.get

现在,如果您想在完成任务时接受任务您需要一个ExecutorCompletionService. 这BlockingQueue可以让您在任务完成时轮询任务。

public static void main(String[] args) throws Exception {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    return;
                } catch (ExecutionException ex) {
                    System.err.println("TASK FAILED");
                }
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("FAILED");
                }
                return "SUCCESS";
            }
        });
    }
    executorService.shutdown();
}

在此示例中,我有一个任务调用s,当它们可用take时,ExecutorCompletionService它会获取Futures,然后我将任务提交给ExecutorCompletionService.

这将允许您在失败时立即获取失败的任务,而不必等待所有任务一起失败或成功。

唯一的复杂性是很难告诉轮询线程所有任务都已完成,因为现在一切都是异步的。在这种情况下,我使用了将提交 100 个任务的知识,因此它只需要轮询 100 次。一种更通用Future的方法是也从方法中收集 s submit,然后遍历它们以查看是否一切都已完成。

于 2013-08-13T07:28:52.460 回答
11

Future.get()抛出以下异常。

CancellationException- 如果计算被取消

ExecutionException- 如果计算抛出异常

InterruptedException- 如果当前线程在等待时被中断

get()调用方法时捕获所有这些异常。

Callable我已经为某些任务模拟了除以零异常,但如果您捕获上述三个异常,则一个异常Callable不会影响Callable提交的其他任务,ExecutorService如示例代码中所示。

示例代码片段:

import java.util.concurrent.*;
import java.util.*;

public class InvokeAllUsage{
    public InvokeAllUsage(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(10);

        List<MyCallable> futureList = new ArrayList<MyCallable>();
        for ( int i=0; i<10; i++){
            MyCallable myCallable = new MyCallable((long)i+1);
            futureList.add(myCallable);
        }
        System.out.println("Start");
        try{
            List<Future<Long>> futures = service.invokeAll(futureList);  
            for(Future<Long> future : futures){
                try{
                    System.out.println("future.isDone = " + future.isDone());
                    System.out.println("future: call ="+future.get());
                }
                catch (CancellationException ce) {
                    ce.printStackTrace();
                } catch (ExecutionException ee) {
                    ee.printStackTrace();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        System.out.println("Completed");
        service.shutdown();
    }
    public static void main(String args[]){
        InvokeAllUsage demo = new InvokeAllUsage();
    }
    class MyCallable implements Callable<Long>{
        Long id = 0L;
        public MyCallable(Long val){
            this.id = val;
        }
        public Long call(){

            if ( id % 5 == 0){
                id = id / 0;
            }           
            return id;
        }
    }
}

输出:

creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Completed
于 2016-09-17T13:55:29.833 回答
0

invokeAll 是一个阻塞方法。这意味着——在所有线程都完成之前,JVM 不会继续执行下一行。所以我认为你的线程未来结果有问题。

System.out.println("name is:"+future.get(i).get().getName());

从这一行我认为有些期货没有结果并且可以为空,所以你应该检查你的代码,如果有一些期货为空,如果是这样,请在执行此行之前获取一个 if。

于 2013-08-13T08:38:13.837 回答