3

最近发现projectreactor.io对Publisher的支持不错:

Flux.create(fluxSink -> {
           for (int i = 0; i < 10; i++)
            fluxSink.next(i);
           fluxSink.complete();
        })
                .map(...)
                .subscribe(...);

对处理器有什么好的支持吗? 我的意思是类似或类似的东西:

XXX process = new XXX((inputValue, output) -> {
    if(inputValue == 0)
       output.error();
    else
       output.next(inputValue);
});

publisher.subscribe(process);  
process.subscribe(...);

如果没有,我该如何实现自己的,或者为什么我不能这样做?

更新1:

经过讨论(见评论),似乎在我的用例中我需要使用flatMap(见答案),我的问题是处理器的良好实现,我的意思是一些功能,如果它失败了,我可以控制并发出错误。我认为flatMap会给你足够的功能。就我而言,我使用了:

        import org.jsoup.Jsoup;

        Flux.just("url")
            .flatMap(url -> {
                try {
                    Document document = Jsoup.connect(url).get();
                    return Flux.just(document);
                } catch (IOException e) {
                    return Flux.error(e);
                }
            })
            .subscribe();
4

3 回答 3

3

您可能正在寻找与reactorSubmissionPublisher中的实现类似的内容:Flux

AFlow.Publisher将提交的(非空)项目异步发布给当前订阅者,直到它被关闭。每个当前订阅者都以相同的顺序接收新提交的项目,除非遇到丢弃或异常。使用 aSubmissionPublisher 允许项目生成器充当兼容的反应流发布者,依赖丢弃处理和/或阻塞来进行流量控制。

注意:链接中共享了一个自定义示例Flow.Processor,可以根据您的用例的需要onErrorconsume

于 2017-11-17T21:27:51.337 回答
2

这真的取决于你想做什么。

大多数Flux创建此类处理器的方法只是将它们返回,以Flux确保它们以正确的方式订阅到上游Flux

因此,如果您Processor应该只为它收到的每个事件发出一个事件,但不同的事件map是您创建Processor. 如果它为每个接收到的事件创建多个(或没有)事件flatMap,等等。

您可以通过链接这些方法来创建更复杂的方法。我希望 99% 的用例都能以这种方式处理。

如果这还不够,subscribe请考虑可用于Consumer处理 a 元素Flux以及状态更改(如错误、完成和订阅)的各种重载。您可以将它们与其他组合在一起,Flux.create(fluxSink -> ...)以构建非常灵活Processors的 .

于 2017-11-17T12:45:51.943 回答
2

根据您对用例的描述,我不希望您真的需要Processor. 相反,用于flatMap触发异步 URL 获取。flatMap与所有 Reactive Streams 运算符一样,默认情况下会在出现错误时立即停止。

Flux<URL> 如果您事先不知道 URL,您可能需要处理器的唯一部分是生成(否则, a Flux.fromIterableorFlux.just(...)就可以了)。

如果您需要将结果分派给多个Subscriber而不重新触发请求,请查看publish().connect()and/or cache()

于 2017-11-20T10:47:02.097 回答