我正在使用Monix Task
进行异步控制。
设想
- 任务并行执行
- 如果故障发生 X 次以上
- 停止所有尚未完成的任务(越快越好)
我的解决方案
我提出了在 1. 结果和 2. 错误计数器之间竞争的想法,并取消失败者。
通过Task.race
如果错误计数器首先达到阈值,那么任务将被取消Task.race
。
实验
关于菊石REPL
{
import $ivy.`io.monix::monix:3.1.0`
import monix.eval.Task
import monix.execution.atomic.Atomic
import scala.concurrent.duration._
import monix.execution.Scheduler
//import monix.execution.Scheduler.Implicits.global
implicit val s = Scheduler.fixedPool("race", 2) // pool size
val taskSize = 100
val errCounter = Atomic(0)
val threshold = 3
val tasks = (1 to taskSize).map(_ => Task.sleep(100.millis).map(_ => errCounter.increment()))
val guard = Task(f"stop because too many error: ${errCounter.get()}")
.restartUntil(_ => errCounter.get() >= threshold)
val race = Task
.race(guard, Task.gather(tasks))
.runToFuture
.onComplete { case x => println(x); println(f"completed task: ${errCounter.get()}") }
}
问题
结果取决于线程池大小!?
对于池大小 1
,结果几乎总是任务成功,即没有停止。
Success(Right(.........))
completed task: 100 // all task success !
对于池大小 2
,成功和失败之间的不确定性非常不确定,并且取消也不准确。例如:
Success(Left(stop because too many error: 1))
completed task: 98
取消最晚至 98 个任务已完成。
错误计数很奇怪,小到阈值。
默认全局调度程序获得相同的结果行为。
对于池大小 200
,它更具确定性,并且停止更早,因此在完成更少任务的意义上更准确。
Success(Left(stop because too many error: 2))
completed task: 8
池大小越大越好。
如果我Task.gather
改为Task.sequence
执行,所有问题都消失了!
这种依赖池大小的原因是什么?一旦发生太多错误,如何改进它或者是否有更好的选择来停止任务?