6

我正在使用Async Http Client 库(使用 Netty)向 RESTful API 发出异步 Http Get 请求。由于我想保留非阻塞行为,因此我将返回实例CompletableFuture<T>作为 Http Get 请求的结果。所以,当一个 RESTful API 端点返回一个 Json 数组时,我返回一个CompletableFuture<T[]>.

然而,根据 Erik Meijer 对编程中的四个基本效果所做的分类,我认为这Stream<T>更适合发出异步 Http Get 请求并返回 Json 数组的 Java 方法的结果。在这种情况下,我们可以将Stream<T>视为Observable<T>等价物,它是返回许多值的异步计算的结果

因此,考虑到resp保持响应,那么我可以得到CompletableFuture<Stream<T>>如下结果:

 CompletableFuture<T[]> resp = …
 return resp.thenApply(Arrays::stream);

但是,我想知道如何在不等待计算完成的情况下将其转换CompletableFuture<Stream<T>> resp为 a Stream<T>(即我不想阻止get()调用)?

我希望得到与以下表达式相同的结果,但不阻塞get()

return resp.thenApply(Arrays::stream).get();
4

2 回答 2

6

您可以构建一个Stream<T>延迟对该Future<T> get()方法的调用,就像这样:

CompletableFuture<T[]> resp = ...
return Stream
        .of(resp)                               // Stream<CompletableFuture<T[]>>
        .flatMap(f -> Arrays.stream(f.join())); // Stream<T>

为了简化使用,而不是get()join()用来避免检查异常。

于 2016-05-23T14:37:44.637 回答
4

只要将异步计算的结果作为数组传递,你就无法从这里的 Stream API 中受益,因为 Stream 操作在数组被移交之前无法开始处理元素,这意味着整个完成异步作业。

除非您重写异步作业以发布数组的各个元素,例如通过队列,否则您只能将同步推迟到流的终端操作开始时。换句话说,您可以在必须等待异步作业完成之前将中间操作链接到 Stream。由于链接不是一项昂贵的操作,因此收益将非常小。

如果您仍然想这样做,Miguel Gamboa 的解决方案Stream.of(resp).flatMap(f -> Arrays.stream(f.join()))做,而且很简洁。不幸的是,它可能具有超过延迟join操作的任何好处的性能缺陷。由于数组具有可预测的长度并支持平衡拆分,因此通过数组进行流式处理可以顺利进行,而嵌套流不仅缺少这些功能,当前的实现甚至还缺少短路处理

因此,与其利用flatMap延迟创建流,更建议更深入一层,直接支持延迟流创建:

static <T> Stream<T> getStream(CompletableFuture<T[]> resp) {
    return StreamSupport.stream(() -> Arrays.spliterator(resp.join()),
        Spliterator.ORDERED|Spliterator.SIZED|Spliterator.SUBSIZED|Spliterator.IMMUTABLE,
        false);
}

这将创建一个 Stream,它将join操作推迟到终端操作开始,但仍具有基于数组的 Stream 的性能特征。但是代码显然更复杂,并且好处仍然是,如前所述,只有在提供异步操作的数组仍在运行时链接中间操作的可能性。

于 2016-05-23T15:31:24.963 回答