1

我基本上有一个Future<List<T>>从服务器批量获取的。对于某些客户,我想在未来完成时在加载整个集合时提供增量结果。

是否为此在某处定义了通用的 Future 扩展?这种期货存在哪些典型的模式/组合?

我假设IncrementalListFuture<T>我可以很容易地定义map操作。你还会想到什么?

4

5 回答 5

3

是否为此在某处定义了通用的 Future 扩展?

我假设您正在谈论来自ExecutorService. 您应该考虑使用一个可以让您在其中一个对象可以获取ExecutorCompletionService时立即得到通知的方法。Future

引用 javadocs:

CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers) {
    ecs.submit(s);
}
int n = solvers.size();
for (int i = 0; i < n; ++i) {
    // this waits for one of the futures to finish and provide a result
    Future<Result> future = ecs.take();
    Result result = future.get();
    if (result != null) {
        // do something with the result
    }
}

对不起。我最初误读了这个问题,并认为您在询问 List<Future<?>>。可能你可以重构你的代码以实际返回一些期货,所以我将把它留给后代。

在这种情况下,我不会在Future. 在工作完成之前,您将无法获得回报。

如果可能的话,我会传递某种形式,以便BlockingQueue调用者和线程都可以访问它:

final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
// build out job with the queue
threadPool.submit(new SomeJob(queue));
threadPool.shutdown();
// now we can consume from the queue as it is built:
while (true) {
   T result = queue.take();
   // you could some constant result object to mean that the job finished
   if (result == SOME_END_OBJECT) {
      break;
   }
   // provide intermediate results 
}

您还可以使用某种SomeJob.take()方法调用BlockingQueue作业类内部定义的方法。

// the blocking queue in this case is hidden inside your job object
T result = someJob.take();
...
于 2012-09-10T18:17:11.280 回答
1

这是我要做的:

  1. 在填充列表的线程中,通过使用包装列表使其成为线程安全的Collections.synchronizedList
  2. 使列表公开可用,但不能通过向返回列表的线程添加公共方法来修改,但由Collections.unmodifiableList
  3. 与其给客户一个 Future>,不如给他们一个线程句柄,或者它的某种包装器,这样他们就可以调用上面的公共方法。

或者,正如 Gray 所建议的那样,BlockingQueues 非常适合这样的线程协调。但是,这可能需要对您的客户端代码进行更多更改。

于 2012-09-10T18:47:21.097 回答
1

回答我自己的问题:最近这个领域有很多发展。其中最常用的是:Play iteratees ( http://www.playframework.org/documentation/2.0/Iteratees ) 和 Rx for .NET ( http://msdn.microsoft.com/en-us/data/gg577609.aspx )

他们定义的不是 Future,而是:

interface Observable<T> {
    Disposable subscribe(Observer<T> observer);
}

interface Observer<T> {
   void onCompleted();
   void onError(Exception error);
   void onNext(T value);
}

和很多组合器。

于 2013-01-30T19:18:47.720 回答
1

除了 Observables,您还可以查看 twitter 的方法。

他们使用Spool,这是Stream的异步版本。

基本上它是一个简单的特征,类似于List

trait Spool[+A] {

  def head: A

  /**
   * The (deferred) tail of the spool. Invalid for empty spools.
   */
  def tail: Future[Spool[A]]
}

这使您可以在其之上执行诸如map,filter之类的功能性操作。foreach

于 2013-02-13T07:49:06.873 回答
0

Future 真正设计为返回单个(原子)结果,而不是用于以这种方式传达中间结果。您真正想要做的是使用多个期货,每批一个。

我们有一个类似的要求,我们需要从不同的远程服务器获取一堆东西,每个东西都会在不同的时间返回。我们不想等到最后一个返回,而是按照返回的顺序处理它们。为此,我们创建了AsyncCompleter,它在迭代时接受Iterable<Callable<T>>并返回一个Iterable<T>阻塞,完全抽象了Future接口的使用。

如果您查看该类是如何实现的,您将了解如何使用 aCompletionService按可用顺序从 an 接收结果Executor,如果您需要自己构建它。

编辑:刚看到Gray的后半部分的回答也差不多,基本上是用了一个ExecutorCompletionService

于 2012-09-10T22:22:32.567 回答