0

我试图在 akka 中理解 Reactive Streams。我已经阅读了这个博客http://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/,我想我对它的工作原理有了基本的了解。然而,我不明白的是处理器在这个概念中的目的。它是干什么用的?订阅者请求 N-Objects 和发布者使用 onNext() 发送它们还不够吗?

4

1 回答 1

2

假设您有一个真正简单的流程,只有一个 Source(发布者)和一个 Sink(订阅者)。您将这两者连接起来,接收器订阅发布者并开始请求数据,数据流向接收器。在此示例中,您真正需要的只是发布者和订阅者。但是在这个例子中,数据从源到接收器的过程中没有发生任何变化。它没有以任何方式转换,因此不是很有趣也不是很有用。

处理器结合了发布者和订阅者接口,因此可以同时充当这两个角色。处理器旨在嵌入源和接收器之间的处理流并转换数据。如果我将一个放入之前的源/接收器示例中,数据流以及谁订阅了哪些变化。现在接收器订阅该处理器,而处理器又订阅源。接收器向处理器请求元素,处理器将请求向上传播到源。当有元素满足需求时,它还负责将元素向下游推送到水槽。这就是为什么它必须实现这两个接口,因为它必须同时扮演这两个角色。

随着您添加的每个处理步骤,例如mapor filter,您正在添加另一个可以处理背压的位置。这些步骤不是数据的起始点(源)或目的地(汇)。他们要做的就是接收数据并对它做一些事情或改变它的流动并将元素发送到下游以满足需求。因为他们需要能够链接到他们需要发布和订阅功能的任何链,这就是处理器存在的原因。

于 2016-02-02T11:58:30.783 回答