2

假设我们有两个 Akka Stream 流,每个流都在自己的 JVM 上运行。

// A reactive streams publisher running on JVM 1:
val stringPublisher: Publisher[String] = Source(() => "Lorem Ipsum".split("\\s").iterator).runWith(Sink.publisher[String])

// A reactive streams subscriber running on JVM 2:
def subscriber: Subscriber[String] = Sink.foreach[String](println(_)).runWith(Source.subscriber[String])

// Subscribe the second stream to the first stream
stringPublisher.subscribe(subscriber)

此示例在一个 JVM 上运行良好,但我如何订阅在不同 JVM 上运行的发布者?

我必须使用消息/队列中间件还是可以使用反应流 API 将两者连接在一起?

4

1 回答 1

3

反应流规范没有谈到分布式(跨网络)流,并且它的当前实现(例如 Akka Streams)都没有实现跨网络边界的流。做起来有点棘手(但可以做也可能会做),因为它需要透明的重新传递,以防消息丢失。

简短的回答:你(目前)不能。然而,由于 Akka HTTP 是基于流的并通过 TCP 应用背压,您可以通过基于流的 TCP 或 HTTP连接流,并且背压将按预期工作。

于 2015-06-07T11:49:58.850 回答