7

通常使用 CompletableFuture 我会在结果可用后立即调用 thenApply 或其他一些方法来执行某些操作。但是,我现在有一种情况,我想处理结果,直到收到肯定的结果,然后忽略所有进一步的结果。

如果我只想获取第一个可用结果,我可以使用 CompletableFuture.anyOf (尽管我讨厌将列表转换为数组只是为了调用 anyOf)。但这不是我想要的。我想获得第一个结果,如果它没有理想的结果,那么我想处理第二个可用的结果,依此类推,直到我得到理想的结果。

这是一个简单的示例,它遍历所有结果并返回它找到的第一个大于 9 的值。(请注意,这不是我真正的任务。这只是一个简单的示例。)

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    for(CompletableFuture<Integer> result : results) {
        Integer v = result.get();
        if(v > 9)
            return v;
    }
    return null;
}

当然,该示例从一开始就检查结果,而不是在完成时查看结果。所以这是一个完成我想要的东西,但代码更复杂的一个。

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    AtomicInteger finalResult = new AtomicInteger();
    CountDownLatch latch = new CountDownLatch(results.size());
    for(CompletableFuture<Integer> result : results) {
        result.whenComplete((v,e) -> {
            if(e!=null) {
                Logger.getLogger(getClass()).error("",e);
            } else if(v > 9) {
                finalResult.set(v);
                while(latch.getCount() > 0)
                    latch.countDown();
                return;
            }
            latch.countDown();
        });
    }
    latch.await();

    if(finalResult.get() > 9)
        return finalResult.get();
    return null;
}    

有没有我可以做到这一点的api?

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    Iterator<Integer> resultIt = getResultsAsAvailable(results);
    for(; resultIt.hasNext();) {
        Integer v = resultIt.next();
        if(v > 9)
            return v;
    }
    return null;
}

甚至更好:

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    return getFirstMatch(results, r -> {return r > 9;});
}
4

2 回答 2

4

我不知道 JDK 或其他地方有任何这样的 API。你可以自己滚动。

如果未来已经完成,您可以利用CompletableFuture#complete(and ) 什么都不做的事实。completeExceptionally

如果尚未完成,则将由get()和相关方法返回的值设置为给定值。

创建一个新的最终结果 CompletableFuture。如果您的条件适用,则向您的每个期货添加一个继续尝试complete最终结果。这个未来将以第一次成功告终。但是,如果没有成功,那么您显然需null要这样做。您可以创建一个CompletableFuturewithallOf也尝试complete使用null.

就像是

public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) {
    CompletableFuture<T> finalResult = new CompletableFuture<>();
    // attempt to complete on success
    futures.stream().forEach(future -> future.thenAccept(successResult -> {
        if (condition.test(successResult))
            finalResult.complete(successResult);
    }));
    CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    all.thenRun(() -> {
        finalResult.complete(null);
    });
    return finalResult;
}

您支付无操作调用的开销。

您可以酌情更改null为某些默认值或以不同方式处理异常(completeExceptionally一旦发生错误)。您必须使用whenCompleteorhandle 而不是thenAccept上述内容才能访问Exception.

于 2016-04-14T01:37:46.927 回答
3

您可以使用以下解决方案:

public static <T> CompletableFuture<T> anyMatch(
    List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) {

    CompletableFuture<T> result=new CompletableFuture<>();
    Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); };
    CompletableFuture.allOf(l.stream()
        .map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new))
    .whenComplete((ignored, t) ->
        result.completeExceptionally(t!=null? t: new NoSuchElementException()));
    return result;
}

基本原理与Pillar 的答案相同,但是有一些区别:

  • 通用签名更灵活。
  • 所需的数组的创建CompletableFuture.allOf与源期货的后续动作的注册相结合。作为副作用,allOf操作的处理程序依赖于完成结果的所有尝试的完成,而不仅仅是原始的期货。这使得实际所需的依赖关系变得明确。这样,当我们用 s 替换所有thenAccepts时,它甚至可以工作thenAcceptAsync
  • 在没有结果符合标准的情况下,此解决方案以 aNoSuchElementException而不是返回来完成。null如果至少一个 future 异常完成并且没有成功完成并具有匹配的结果,则中继发生的异常之一。

你可以试试

List<CompletableFuture<Integer>> list=Arrays.asList(
    CompletableFuture.supplyAsync(()->5),
    CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }),
    CompletableFuture.supplyAsync(()->42),
    CompletableFuture.completedFuture(0)
);
anyMatch(list, i -> i>9)
    .thenAccept(i->System.out.println("got "+i))
    // optionally chain with:
    .whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); });
于 2016-04-14T10:48:17.370 回答