我正在使用Monix Task进行异步控制。
设想
- 任务并行执行
 - 如果故障发生 X 次以上
 - 停止所有尚未完成的任务(越快越好)
 
我的解决方案
我提出了在 1. 结果和 2. 错误计数器之间竞争的想法,并取消失败者。
通过Task.race如果错误计数器首先达到阈值,那么任务将被取消Task.race。
实验
关于菊石REPL
{
  import $ivy.`io.monix::monix:3.1.0`
  import monix.eval.Task
  import monix.execution.atomic.Atomic
  import scala.concurrent.duration._
  import monix.execution.Scheduler
  //import monix.execution.Scheduler.Implicits.global
  implicit val s = Scheduler.fixedPool("race", 2) // pool size
  val taskSize = 100
  val errCounter = Atomic(0)
  val threshold = 3
  val tasks = (1 to taskSize).map(_ => Task.sleep(100.millis).map(_ => errCounter.increment()))
  val guard = Task(f"stop because too many error: ${errCounter.get()}")
    .restartUntil(_ => errCounter.get() >= threshold)
  val race = Task
    .race(guard, Task.gather(tasks))
    .runToFuture
    .onComplete { case x => println(x); println(f"completed task: ${errCounter.get()}") }
}
问题
结果取决于线程池大小!?
对于池大小 1
 
,结果几乎总是任务成功,即没有停止。
Success(Right(.........))
completed task: 100 // all task success !
对于池大小 2
 
,成功和失败之间的不确定性非常不确定,并且取消也不准确。例如:
Success(Left(stop because too many error: 1))
completed task: 98
取消最晚至 98 个任务已完成。
错误计数很奇怪,小到阈值。  
默认全局调度程序获得相同的结果行为。
对于池大小 200
 
,它更具确定性,并且停止更早,因此在完成更少任务的意义上更准确。
Success(Left(stop because too many error: 2))
completed task: 8
池大小越大越好。
如果我Task.gather改为Task.sequence执行,所有问题都消失了!
这种依赖池大小的原因是什么?一旦发生太多错误,如何改进它或者是否有更好的选择来停止任务?