7

我正在寻找将 aListenableFuture<Iterable<A>>转换为个人序列的最佳方法ListenableFutures。这是我正在寻找的方法签名:

public <A, B> Iterable<ListenableFuture<B>> splitAndRun(
    final ListenableFuture<Iterable<A>> elements, 
    final Function<A, B> func, 
    final ListeningExecutorService executor
);

显然,如果我回来了,我可以这样做ListenableFuture<Iterable<ListenableFuture<B>>>,但我觉得我应该能够拆分并运行它并保持它的异步性。

这是我到目前为止的代码,但你会注意到最后的讨厌.get(),这会破坏事情。如果我把事情复杂化了,请原谅。

public class CallableFunction<I, O> implements Callable<O>{
  private final I input;
  private final Function<I, O> func;

  public CallableFunction(I input, Function<I, O> func) {
    this.input = input;
    this.func = func;
  }

  @Override public O call() throws Exception {
    return func.apply(input);
  }
}

public <A, B> Iterable<ListenableFuture<B>> splitAndRun(
    final ListenableFuture<Iterable<A>> elements, 
    final Function<A, B> func, 
    final ListeningExecutorService executor
) throws InterruptedException, ExecutionException {
  return Futures.transform(elements, 
      new Function<Iterable<A>, Iterable<ListenableFuture<B>>>() {
    @Override
    public Iterable<ListenableFuture<B>> apply(Iterable<A> input) {
      return Iterables.transform(input, new Function<A, ListenableFuture<B>>() {
        @Override
        public ListenableFuture<B> apply(A a) {
          return executor.submit(new CallableFunction<A, B>(a, func));
        }
      });
    }
  }, executor).get();
}
4

3 回答 3

3

如果你把它想象ListenableFuture成一个盒子,为了从那个盒子里得到一些东西,你必须阻塞线程(调用get方法)。

不确定实施splitAndRun方法是否会以某种方式使您受益。您仍然会有一个返回的异步操作ListenableFuture<Iterable<A>>

Iterable<ListenableFuture<A>>不过,作为单个加入的反向操作ListenableFuture<Iterable<A>>可能很有用。如果要将多个异步计算收集到一个中,则可能需要使用它。

于 2013-02-21T11:53:01.783 回答
2

(替代我原来的答案

但是,如果转换速度很慢,或者某些输入可能会失败但其他输入会成功怎么办?在这种情况下,我们希望单独转换每个输出。另外,我们要确保转换只发生一次。我们的集合转换方法不做保证。因此,在您的示例代码中,对输出的每次迭代都会向执行程序提交新任务,即使之前提交的任务可能已经完成。出于这个原因,我们Iterables.transform只在转换轻量级时推荐和朋友。(通常,如果你正在做一些重量级的事情,你的转换函数会抛出一个检查异常,它Function不允许。认为这是一个提示:) 当然,您的示例恰好没有触发提示。)

这在代码中意味着什么?基本上,我们将颠倒我在其他答案中给出的操作顺序。我们将从 转换Future<Iterable<A>>Iterable<Future<A>>第一。然后我们将为每个人提交一个单独的任务,A以将其转换为B. 对于后一步,我们将提供一个Executor 以便转换不会阻塞一些无辜的线程。(我们现在只需要Futures.transform,所以我静态导入了它。)

List<ListenableFuture<A>> individuals = newArrayList();
for (int i = 0; i < knownSize; i++) {
  final int index = i;
  individuals.add(transform(input, new Function<List<A>, A>() {
    @Override
    public A apply(List<A> values) {
      return values.get(index);
    }
  }));
}

List<ListenableFuture<B>> result = newArrayList();
for (ListenableFuture<A> original : individuals) {
  result.add(transform(original, function, executor));
}
return result;

反正就是这个想法。但是我的实现很愚蠢。我们可以轻松地同时执行这两个步骤:

List<ListenableFuture<B>> result = newArrayList();
for (int i = 0; i < knownSize; i++) {
  final int index = i;
  result.add(transform(input, new Function<List<A>, B>() {
    @Override
    public B apply(List<A> values) {
      return function.apply(values.get(index));
    }
  }, executor));
}
return result;

因为这使得 nFutures.transform调用而不是 1 并且因为它使用单独Executor的 ,所以如果转换是重量级的,它比我的其他解决方案更好,如果它是轻量级的,则更糟。另一个警告仍然存在:只有当您知道您将拥有多少输出时,这才会起作用。

于 2013-02-25T16:46:44.793 回答
1

这有两个部分。第一个是将 a 转换Future<Iterable<A>>为 a Future<Iterable<B>>。这可以通过嵌套Futures.transformIterables.transform如果转换速度很快——在替代答案中更多)来实现。这是代码(虽然我已经换成了Iterables.transformLists.transform这将需要 a Future<List<A>>,原因我稍后也会解释):

final ListenableFuture<List<B>> transformed =
    Futures.transform(input, new Function<List<A>, List<B>>() {
      @Override
      public List<B> apply(List<A> inputs) {
        return Lists.transform(inputs, function);
      }
    });

现在是从Future<Iterable<B>>到的转换Iterable<Future<B>>。正如 Mairbek 所说,这通常是不可能的:在我们知道将返回多少元素之前,我们无法知道输出将有多少元素Future。但是,如果我们确实知道输出中的元素数量,我们就可以让它工作。我们将应用另一个Futures.transform. (这就是我在List上面使用的原因。PlainCollection也可以工作,但代码会更难看。)

List<ListenableFuture<B>> result = newArrayList();
for (int i = 0; i < knownSize; i++) {
  final int index = i;
  result.add(Futures.transform(transformed, new Function<List<B>, B>() {
    @Override
    public B apply(List<B> values) {
      return values.get(index);
    }
  }));
}
return result;

当然,Future列表中的每一个都将同时完成——只有在完整输入Future完成后。也就是我们不能更早的得到结果;我们只能以不同的格式获取它们(即,单个Future实例而不是批量列表)。这是以Future<Iterable<A>>.

于 2013-02-25T16:11:32.993 回答