我从对已转换为 Observable 的同一(MongoDB)数据库调用的重复调用中得到间歇性缺失值。我已经删除了所有数据库代码,以获得一个只有 Monix 位的最小测试用例,而且我偶尔仍然会丢失值 - 通常每 2,000 次测试一到两个。
根据文档 ConcurrentSubject 的意思是“不需要遵循背压合同”,但无论我是否这样做,我都会遇到类似的失败。
import monix.eval.Task
import monix.reactive.{MulticastStrategy, Observable}
import monix.reactive.subjects.ConcurrentSubject
import org.scalatest.FunSuite
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class Test_JustMonix extends FunSuite {
implicit val scheduler = monix.execution.Scheduler.global
def build(): Observable[Boolean] = {
val subject = ConcurrentSubject(MulticastStrategy.publish[Boolean])
subject.doAfterSubscribe {
Task.eval {
subject.onNext(true)
subject.onComplete()
}
}
}
test("just monix") {
(0 until 20).foreach { loop =>
println(s"loop $loop")
val tOpts = (0 until 100).map { _ => build().firstOptionL }
val tDone = Task.gather(tOpts).map { list =>
val emptyCount = list.count(_.isEmpty)
assert(emptyCount === 0)
}
Await.result(tDone.runToFuture, Duration.Inf)
}
println("Finished")
}
}
在某些运行中,所有 20x100 循环都正确完成 - firstOptionL isDefined 用于所有 2,000 个结果。但是,超过 50% 的时间 assert(emptyCount === 0) 在值为 1 或有时为 2 时触发,这表明我偶尔会得到一个 None 值,好像 onComplete 发生在 onNext 之前?
这可能发生在 20 个循环中的任何一个中,因此它看起来像是一种竞争条件,或者我误解了所需的输入。我已经尝试了几乎所有主题 - PublishSubject,有和没有 BufferedSubscriber,并且都给出了相似的结果。
我也尝试过将 onComplete 延迟到 Ack via
subject.onNext(true).map(_=> subject.onComplete())
这似乎会稍早失败。
我也试过 MulticastStrategy.replay 没有区别。
我在 Scala 2.12.8 上使用 Monix 3.0.0-RC3。