1

我无法弄清楚为什么executeWithFork在以下示例中添加会阻止任务运行:

import java.util.concurrent.TimeUnit

import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.ConcurrentSubject

object Sandbox {

  def main(args: Array[String]): Unit = {
    implicit val scheduler: SchedulerService =
      monix.execution.Scheduler(java.util.concurrent.Executors.newCachedThreadPool())

    val input = ConcurrentSubject.publish[String]

    // prints nothing
    input.foreachL(println).executeWithFork.runAsync
    // this works:
    // input.foreachL(println).runAsync

    input.onNext("one")
    input.onNext("two")

    scheduler.shutdown()
    scheduler.awaitTermination(1, TimeUnit.MINUTES, monix.execution.Scheduler.Implicits.global)
  }
}
4

1 回答 1

1

您看到的行为是两个事实的结果:

  1. 使用executeWithFork为线程切换引入了一些额外的延迟

  2. 您使用ConcurrentSubject.publish(而不是replay例如)。如果您打开文档,PublishSubject您可能会看到

APublishSubject仅向订阅者发出那些在订阅时间之后由源发出的项目。

换句话说,您在发布"one"and的主线程"two"和必须订阅以input获取数据的分叉线程之间存在竞争条件。结果取决于哪个线程赢得比赛:订阅前发布的所有数据都会丢失。我几乎总是看到"two"甚至偶尔会看到我的硬件之一"one",您的结果可能会有所不同。

测试这一点的最简单方法是Thread.sleep(100)在第一个之前添加input.onNext,您应该看到每次都打印两个事件。您也可以尝试推送更多的事件而不是仅仅 2 个,以确保并非所有内容都丢失了。

于 2019-01-19T03:26:12.657 回答