我基本上有一个Future<List<T>>
从服务器批量获取的。对于某些客户,我想在未来完成时在加载整个集合时提供增量结果。
是否为此在某处定义了通用的 Future 扩展?这种期货存在哪些典型的模式/组合?
我假设IncrementalListFuture<T>
我可以很容易地定义map
操作。你还会想到什么?
我基本上有一个Future<List<T>>
从服务器批量获取的。对于某些客户,我想在未来完成时在加载整个集合时提供增量结果。
是否为此在某处定义了通用的 Future 扩展?这种期货存在哪些典型的模式/组合?
我假设IncrementalListFuture<T>
我可以很容易地定义map
操作。你还会想到什么?
是否为此在某处定义了通用的 Future 扩展?
我假设您正在谈论来自ExecutorService
. 您应该考虑使用一个可以让您在其中一个对象可以获取ExecutorCompletionService
时立即得到通知的方法。Future
引用 javadocs:
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers) {
ecs.submit(s);
}
int n = solvers.size();
for (int i = 0; i < n; ++i) {
// this waits for one of the futures to finish and provide a result
Future<Result> future = ecs.take();
Result result = future.get();
if (result != null) {
// do something with the result
}
}
对不起。我最初误读了这个问题,并认为您在询问 List<Future<?>>。可能你可以重构你的代码以实际返回一些期货,所以我将把它留给后代。
在这种情况下,我不会在Future
. 在工作完成之前,您将无法获得回报。
如果可能的话,我会传递某种形式,以便BlockingQueue
调用者和线程都可以访问它:
final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
// build out job with the queue
threadPool.submit(new SomeJob(queue));
threadPool.shutdown();
// now we can consume from the queue as it is built:
while (true) {
T result = queue.take();
// you could some constant result object to mean that the job finished
if (result == SOME_END_OBJECT) {
break;
}
// provide intermediate results
}
您还可以使用某种SomeJob.take()
方法调用BlockingQueue
作业类内部定义的方法。
// the blocking queue in this case is hidden inside your job object
T result = someJob.take();
...
这是我要做的:
Collections.synchronizedList
Collections.unmodifiableList
或者,正如 Gray 所建议的那样,BlockingQueue
s 非常适合这样的线程协调。但是,这可能需要对您的客户端代码进行更多更改。
回答我自己的问题:最近这个领域有很多发展。其中最常用的是:Play iteratees ( http://www.playframework.org/documentation/2.0/Iteratees ) 和 Rx for .NET ( http://msdn.microsoft.com/en-us/data/gg577609.aspx )
他们定义的不是 Future,而是:
interface Observable<T> {
Disposable subscribe(Observer<T> observer);
}
interface Observer<T> {
void onCompleted();
void onError(Exception error);
void onNext(T value);
}
和很多组合器。
Future 真正设计为返回单个(原子)结果,而不是用于以这种方式传达中间结果。您真正想要做的是使用多个期货,每批一个。
我们有一个类似的要求,我们需要从不同的远程服务器获取一堆东西,每个东西都会在不同的时间返回。我们不想等到最后一个返回,而是按照返回的顺序处理它们。为此,我们创建了AsyncCompleter,它在迭代时接受Iterable<Callable<T>>
并返回一个Iterable<T>
阻塞,完全抽象了Future
接口的使用。
如果您查看该类是如何实现的,您将了解如何使用 aCompletionService
按可用顺序从 an 接收结果Executor
,如果您需要自己构建它。
编辑:刚看到Gray的后半部分的回答也差不多,基本上是用了一个ExecutorCompletionService