1

我有以下示例代码,假设MyCallable("B")执行时间超过一秒,而其他代码执行速度超过一秒。因此在我调用的循环中Future.get(),它会抛出一个TimeoutException.

public static void main(String[] args) {
    ExecutorService es = Executors.newFixedThreadPool(2);

    List<Future<String>> futures = new ArrayList<Future<String>>();

    futures.add(es.submit(new MyCallable("A")));
    futures.add(es.submit(new MyCallable("B")));
    futures.add(es.submit(new MyCallable("C")));
    futures.add(es.submit(new MyCallable("D")));
    futures.add(es.submit(new MyCallable("E")));

    try {
        for(Future<String> f  : futures) {
            try {
                System.out.println("result " + f.get(1, TimeUnit.SECONDS));
            }
            catch (TimeoutException e) {
                // how do I know which MyCallable() has timed out?
            } catch (ExecutionException e) {
                System.out.println(e.getMessage());
            }
        }
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
    finally {
        es.shutdown();
    }
}

正如预期的那样,每个 MyCallable() 实例都会执行,但是对于超时的实例,我想执行一些错误处理,这需要知道哪个Callable与哪个关联Future

是否有这种关联的机制,还是由我Callable来处理它的方法内的所有错误处理call()

4

2 回答 2

1

似乎您可以简单地维护 aMap<Future<String>, Callable<String>>而不是 aList<Future<String>>并以这种方式检索原始的 Callable 。

如果您想变得非常聪明,您可以采用 OO 风格并扩展 ThreadPoolExecutor 并创建一个 Future 装饰器类。我认为这可能是矫枉过正,但你可以这样做:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


public class FutureWithCallable<T> implements Future<T> {
    private final Callable<T> callable;
    private final Future<T> wrapped;

    public FutureWithCallable(Future<T> wrapped, Callable<T> callable) {
        this.callable = callable;
        this.wrapped = wrapped;
    }

    public Callable<T> getCallable() {
        return callable;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return wrapped.cancel(mayInterruptIfRunning);
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return wrapped.get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        return wrapped.get(timeout, unit);
    }

    @Override
    public boolean isCancelled() {
        return wrapped.isCancelled();
    }

    @Override
    public boolean isDone() {
        return wrapped.isDone();
    }
}

进而:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

    public class ExecutorServiceWithCallable extends ThreadPoolExecutor {

        public ExecutorServiceWithCallable(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        public <T> FutureWithCallable submit(Callable<T> callable) {
            Future<T> future = super.submit(callable);
            return new FutureWithCallable<T>(future, callable);
        }

    }
于 2012-08-27T22:40:37.013 回答
-1

public class TimeoutException extends Exception 阻塞操作超时时抛出的异常。指定超时的阻塞操作需要一种方法来指示已发生超时。对于许多此类操作,可以返回一个指示超时的值;如果这是不可能或不可取的,则应声明并抛出 TimeoutException。

于 2015-11-25T07:18:54.433 回答