5

假设我有一个 API,它会根据一些查询条件查找或构造一个小部件:

Widget getMatchingWidget(WidgetCriteria c) throws Throwable

(同步)客户端代码如下所示:

try {
  Widget w = getMatchingWidget(criteria);
  processWidget(w);
} catch (Throwable t) {
  handleError(t);
}

现在说查找或构建一个小部件的成本出人意料,我不希望客户在等待它时阻塞。所以我把它改成:

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)

然后客户端可以写:

CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })

或者:

getMatchingWidget(criteria).whenComplete((t, w) -> {
  if (t != null) { handleError(t); }
  else { processWidget(t); }
});

现在,假设同步 API 可以返回0 到 n 个小部件:

Stream<Widget> getMatchingWidgets(WidgetCriteria c)

天真地,我可以写:

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)

然而,这实际上并没有使代码成为非阻塞的,它只是推动阻塞——要么是Future块直到所有Widgets可用的块,要么是迭代Stream等待每个块的代码Widget。我想要的是让我在每个小部件到达时处理它们的东西,例如:

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)

但这不提供错误处理,即使我添加了一个额外的Consumer<Throwable> errorHandler,它也不允许我将我的小部件检索与其他查询组合起来,或者转换结果。

所以我正在寻找一些可组合的东西,它结合了 a 的特性Stream(可迭代性、可转换性)和 a 的特性CompletableFuture(异步结果和错误处理)。(而且,当我们这样做时,背压可能会很好。)

这是java.util.concurrent.Flow.Publisher吗?一个io.reactivex.Observable?更复杂的东西?更简单的东西?

4

1 回答 1

6

你的用例很自然地落入了 RxJava 正在解决的世界。如果我们有一个可观察的:

Observable<Widget> getMatchingWidgets(wc);

根据标准生成零个或多个小部件,然后您可以使用以下方法处理每个小部件:

getMatchingWidgets(wc)
  .subscribeOn( backgroundScheduler )
  .subscribe( w -> processWidget(w),
              error -> handleError(error) );

可观察链将在 上运行backgroundScheduler,它通常是线程池执行器服务的包装器。如果您需要对 UI 中的每个小部件进行最终处理,可以observeOn()在处理之前使用操作符切换到 UI 调度程序:

getMatchingWidgets(wc)
  .subscribeOn( backgroundScheduler )
  .observeOn( uiScheduler )
  .subscribe( w -> processWidget(w),
              error -> handleError(error) );

对我来说,RxJava 方法的优雅之处在于它以流畅的方式处理了管道管理的许多细节。看看那个观察者链,你就知道到底发生了什么以及在哪里。

于 2017-10-19T21:53:55.973 回答