我对整个项目反应器或反应式编程非常陌生,所以我可能做错了什么。我正在努力建立一个执行以下操作的流程:
给定一个类实体:
Entity {
private Map<String, String> items;
public Map<String, String> getItems() {
return items;
}
}
- 从数据库中读取实体 (
ListenableFuture<Entity> readEntity()
) - 对每个项目执行一些并行异步处理 (
boolean processItem(Map.Entry<String, String> item)
) - 当所有完成调用 doneProcessing(
void doneProcessing(boolean b)
)
目前我的代码是:
handler = this;
Mono
.fromFuture(readEntity())
.doOnError(t -> {
notifyError(“some err-msg” , t);
return;
})
.doOnSuccess(e -> log.info("Got the Entity: " + e))
.flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
.all(handler::processItem)
.consume(handler::doneProcessing);
事情有效,但handler::processItem
调用不会在所有项目上同时运行。我尝试使用dispatchOn
andpublishOn
和io
andasync
SchedulerGroup
和各种参数,但调用仍然在一个线程上串行运行。我究竟做错了什么?
此外,我相信总体上可以改进上述内容,因此任何建议都会受到赞赏。
谢谢