2

我正在阅读有关观察者广告的Monix 文档,我遇到了以下示例:

或者您可以快速构建一个仅记录它接收到的事件的观察者。我们将在其他示例中使用它:

import monix.reactive.Observer

val out = Observer.dump("O")
// out: Observer.Sync[Any]

out.onNext(1)
//=> 0: O-->1
// res0: Ack = Continue

out.onNext(2)
//=> 1: O-->2
// res0: Ack = Continue

out.onComplete()
//=> 2: O completed

下一个非法示例:

喂食两个元素,然后停止。这是不合法的:

// BAD SAMPLE
observer.onNext(1)
observer.onNext(2)
observer.onComplete()

所以我们可以看到相同的onNext -> onNext -> onComplete链条。这不合法吗?为什么?

4

1 回答 1

1

在您链接的文档中,在示例之后直接进行了解释

这是这样做的合法方式:

observer.onNext(1).map {
  case Continue =>
    // We have permission to continue
    observer.onNext(2)
    // No back-pressure required here
    observer.onComplete()
    Stop
  case Stop =>
    // Nothing else to do
    Stop
}

正如您在评论中看到的,问题是背压。那么为什么有一个例子,使用.dump它似乎是非法的?

请注意该示例中的注释:

//=> 0: O-->1
// res0: Ack = Continue

这些评论显示了如果你在 Scala REPL 中运行它会得到什么。当您输入一个表达式并按回车键时,REPL 会打印类似的res0内容并让您知道最后一个命令的返回值是什么。

所以这个例子证明了:

  • 从 REPL 提供观察者
  • 每个.onNext都完成了Continue

以这种方式编写一个为观察者提供数据的程序是不正确的,但这是对提供观察者的合法执行的正确转录。

您可以在合约部分查看与背压相关的规则:

  1. 背压:每个 onNext 调用必须等待上一个 onNext 调用的 Future[Ack] 返回的 Continue 结果。
  2. onComplete 和 onError 的背压是可选的:当调用 onComplete 或 onError 时,您不需要等待前一个 onNext 的 Future[Ack]。

这是一个值得关注的好点,因为优雅的背压管理是响应式流的一大承诺。

于 2018-05-23T12:59:00.363 回答