假设我有一个 API,它会根据一些查询条件查找或构造一个小部件:
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
(同步)客户端代码如下所示:
try {
Widget w = getMatchingWidget(criteria);
processWidget(w);
} catch (Throwable t) {
handleError(t);
}
现在说查找或构建一个小部件的成本出人意料,我不希望客户在等待它时阻塞。所以我把它改成:
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
然后客户端可以写:
CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })
或者:
getMatchingWidget(criteria).whenComplete((t, w) -> {
if (t != null) { handleError(t); }
else { processWidget(t); }
});
现在,假设同步 API 可以返回0 到 n 个小部件:
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
天真地,我可以写:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
然而,这实际上并没有使代码成为非阻塞的,它只是推动阻塞——要么是Future
块直到所有Widgets
可用的块,要么是迭代Stream
等待每个块的代码Widget
。我想要的是让我在每个小部件到达时处理它们的东西,例如:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
但这不提供错误处理,即使我添加了一个额外的Consumer<Throwable> errorHandler
,它也不允许我将我的小部件检索与其他查询组合起来,或者转换结果。
所以我正在寻找一些可组合的东西,它结合了 a 的特性Stream
(可迭代性、可转换性)和 a 的特性CompletableFuture
(异步结果和错误处理)。(而且,当我们这样做时,背压可能会很好。)
这是java.util.concurrent.Flow.Publisher吗?一个io.reactivex.Observable?更复杂的东西?更简单的东西?