最近发现projectreactor.io对Publisher的支持不错:
Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++)
fluxSink.next(i);
fluxSink.complete();
})
.map(...)
.subscribe(...);
对处理器有什么好的支持吗? 我的意思是类似或类似的东西:
XXX process = new XXX((inputValue, output) -> {
if(inputValue == 0)
output.error();
else
output.next(inputValue);
});
publisher.subscribe(process);
process.subscribe(...);
如果没有,我该如何实现自己的,或者为什么我不能这样做?
更新1:
经过讨论(见评论),似乎在我的用例中我需要使用flatMap
(见答案),我的问题是处理器的良好实现,我的意思是一些功能,如果它失败了,我可以控制并发出错误。我认为flatMap
会给你足够的功能。就我而言,我使用了:
import org.jsoup.Jsoup;
Flux.just("url")
.flatMap(url -> {
try {
Document document = Jsoup.connect(url).get();
return Flux.just(document);
} catch (IOException e) {
return Flux.error(e);
}
})
.subscribe();