1

我正在玩 Monix 流,并获得了我ObservableIterator. 在我看来,运行时它产生的元素比我预期的多 1 个。以下代码表明:

  val count = AtomicLong(0)
  def produceValue(): Long = {
    count.transformAndGet { i =>
      logger.info(s"Producing value: ${i + 1}")
      i + 1
    }
  }
  def more(): Boolean = count.get < 20

  lazy val iter = new Iterator[Long] {
    override def hasNext: Boolean = more()
    override def next(): Long     = produceValue()
  }    

  Observable
    .fromIterator(iter)
    .mapParallelUnordered(5) { x =>
      Task(x)
        .foreachL { x =>
          logger.info(s"Transforming $x")
        }
        .delayExecution(3.seconds)
    }
    .consumeWith(Consumer.complete)
    .runAsync

这个案子很简单。Iterator每次产生next值时都会打印日志。下游阶段是简单的延迟任务,并行计数为 5 以查看发生了什么。现在输出如下:

[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] -  Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] -  Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] -  Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] -  Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] -  Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] -  Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 17

如您所见,最初流产生 6 个元素,而我只期望 5 个(因为下游阶段mapParallelUnordered只需要 5 个元素。实际上这没什么大不了的,但我只是想了解为什么会这样。

还有为什么初始值是在线程中产生的,而后续的值是在线程池中main调用的?execution-context不应该都使用用于运行整个流的调度程序吗?

4

1 回答 1

4

如您所见,最初流产生 6 个元素

低级通信协议是围绕 aSubscriber及其(继承的)方法 onNext 设计的,具有以下签名:

def onNext(elem: A): Future[Ack]

(资源)

如果我们将创建和转换都想象成一个阶段,那么可观察的源(fromIterator在您的情况下)其价值推低给订阅者,并且在得到确认时推动下一个。

所以会发生什么:

  • fromIterator阶段产生价值1
  • 1 值被推送到mapAsyncUnordered阶段,在那里它被接受(b/c 有空闲的工人),所以确认是立即Continue
  • 对值 2-5 重复上述步骤
  • fromIterator阶段生成值 6(这是您看到输出的时候)
  • 6 值被推到mapAsyncUnordered阶段。这一次,它不能立即被接受,所以要过Continue一段时间才能接受。在此之前,不会再生成任何值fromIterator

需要注意的是,mapAsyncUnordered将值拉出的不是stage fromIterator,而是自己fromIterator生成这些值,并且无法提前知道下游转换是否会立即接受该值。


不应该都使用用于运行整个流的调度程序吗?

Monix Observable 出于性能原因尝试尽可能多地同步工作(切换线程很昂贵)。通常,除非通过 , 等方法显式控制,否则executeAsyncexecuteOn将无法判断操作是否将在同一线程上执行。

于 2018-11-26T09:00:19.823 回答