4

我有一个Set<Object>,对于每个条目,Set我必须进行一个 API 调用,将其作为参数传递。我必须处理每一个响应并用自己的逻辑填充另一个地图

示例顺序执行:

List<MyResponse> responses = newArrayList<>();
Set<StoreNode> nodes = // Assume we have a Set
nodes.forEach(storeNode -> responses.add(myAPI.myMethod(storeNode.getId()));
responses.forEach(response -> processResponse(response, myMap); // This is a common map & I have some custom logic to populate this map

如何使用 Observables 实现相同的目标?我想并行进行这些调用并填充我的通用地图myMap

我遇到了 map()、flatMap() 和 zip(),但我看到的大多数示例都是简单的示例,它们没有进行 API 调用并处理它们的响应。

4

1 回答 1

2

这取决于您使用的 RxJava 版本。如果它早于 2.0.5,那么你需要做flatMap,在那里你创建另一个Observable并确保那里的东西是平行的。在 StackOverflow 上查看这个答案。

否则,我建议使用Flowable,然后您可以使用parallel()将您更改FlowableParallelFlowable.

所以你可以这样做:

Flowable.fromIterable(nodes)
        .parallel() // you can also specify number of rails here
        .runOn(Schedulers.computation())
        .map(node -> myAPI.myMethod(node.getId()))
        .sequential()
        .subscribe(
                response -> processResponse(response, myMap),
                error -> log(error)
        );

有关更多信息,请参阅并行流文档。

于 2018-06-23T12:06:54.150 回答