12

好的,所以我知道这里的第一个答案/评论将是“使用一个ExecutorService并使用invokeAll”。但是,我们有充分的理由(我不会让人们感到厌烦)将线程池分开。

所以我有一个线程池列表(ExecutorServices),我需要做的是Callable在每个线程池上调用不同的使用submit(没有问题)。现在我有了这个Future实例集合,每个实例都单独创建ExecutorService,我想等待它们全部完成(并且能够提供一个超时,在该超时时取消任何未完成的操作)。

是否有一个现有的类可以执行此操作(包装Future实例列表并允许等待所有操作完成)?如果没有,将不胜感激有关有效机制的建议。

正在考虑每次调用get超时,但必须计算每次调用的总时间。

我看到这篇文章等待任何未来都完成,但这扩展Future而不是包装它们的列表。

4

4 回答 4

17

根据路易斯的评论,我正在寻找的是Futures.successfulAsList

这使我可以等待所有操作都完成,然后检查是否有任何失败的期货。

番石榴规则!

于 2012-11-02T09:55:04.757 回答
2

我不认为 JDK 提供了一个直接的 API 可以让你做到这一点。但是,我认为创建一个执行此操作的简单方法同样简单。您可能想查看 AbstractExecutorService.invokeAll() 的实现,以了解可以做到这一点。

本质上,您将在每个未来调用future.get(),将等待时间减少每次等待结果所需的时间,并在从方法返回之前取消所有未完成的未来。

于 2012-11-01T20:01:38.253 回答
1

可能我真的没看懂。但是,对我来说,这听起来仍然很简单

public <V> List<V> get(List<Future<V>> futures, long timeout, TimeUnit unit)
          throws InterruptedException, ExecutionException, TimeoutException {
    List<V> result = new ArrayList<V>();
    long end = System.nanoTime() + unit.toNanos(timeout);
    for (Future<V> f: futures) {
        result.add(f.get(end - System.nanoTime(), TimeUnit.NANOSECONDS));
    }
    return result;
}

我错了吗?

我认为您链接的问题要复杂得多,因为他们只想等待最快,当然不知道哪个会最快。

于 2012-11-01T20:14:11.960 回答
1

这可以使用一些清理,但它应该可以解决您的问题。(时间和空间省略了一些封装):

public static <T> LatchWithWrappedCallables<T> wrapCallables(Collection<Callable<T>> callablesToWrap)
{
    CountDownLatch latch = new CountDownLatch(callablesToWrap.size());
    List<Callable<T>> wrapped = new ArrayList<Callable<T>>(callablesToWrap.size());
    for (Callable<T> currCallable : callablesToWrap)
    {
        wrapped.add(new CallableCountdownWrapper<T>(currCallable, latch));
    }

    LatchWithWrappedCallables<T> returnVal = new LatchWithWrappedCallables<T>();
    returnVal.latch = latch;
    returnVal.wrappedCallables = wrapped;
    return returnVal;
}

public static class LatchWithWrappedCallables<T>
{
    public CountDownLatch latch;
    public Collection<Callable<T>> wrappedCallables;
}

public static class CallableCountdownWrapper<T> implements Callable<T>
{
    private final Callable<T> wrapped;

    private final CountDownLatch latch;

    public CallableCountdownWrapper(Callable<T> wrapped, CountDownLatch latch)
    {
        this.wrapped = wrapped;
        this.latch = latch;
    }

    @Override
    public T call() throws Exception
    {
        try
        {
            return wrapped.call();
        }
        finally
        {
            latch.countDown();
        }
    }
}

然后你的代码会这样称呼它:

Collection<Callable<String>> callablesToWrap = [Your callables that you need to wait for here];
LatchWithWrappedCallables<String> latchAndCallables = wrapCallables(callablesToWrap);

[Submit the wrapped callables to the executors here]

if(latchAndCallables.latch.await(timeToWaitInSec, TimeUnit.SECONDS))
{
    [Handling for timeout here]
}
于 2012-11-01T20:45:57.693 回答