我最近发现了一个在使用 Monix 时无法完全理解的案例Task
:
有两个函数(在队列消息处理程序中):
def handle(msg: RollbackMsg): Task[Unit] = {
logger.info(s"Attempting to rollback transaction ${msg.lockId}")
Task.defer(doRollback(msg)).onErrorRestart(5).foreachL { _ =>
logger.info(s"Transaction ${msg.lockId} rolled back")
}
}
private def doRollback(msg: RollbackMsg): Task[Unit] =
(for {
originalLock <- findOrigLock(msg.lockId)
existingClearanceOpt <- findExistingClearance(originalLock)
_ <- clearLock(originalLock, existingClearanceOpt)
} yield ()).transact(xa)
的 for-comprehension的内部doRollback
都是一组返回 monad 的doobie调用ConnectionIO[_]
,然后transact
在其上运行将组合变成 Monix Task
。
现在,如handle
函数所示,我希望整个过程在失败的情况下重试 5 次。神秘的部分是这个简单的调用:
doRollback(msg).onErrorRestart(5)
并没有真正重新启动异常操作(在测试中验证)。为了获得这种重试行为,我必须明确地将其包装在 中Task.defer
,或者Task
以任何其他方式将其置于“上下文”中。
这就是我不完全明白的一点:为什么会这样?doRollback
已经给了我Task
实例,所以我应该可以调用onErrorRestart
它,不是吗?如果不是这种情况,我如何确定Task
我从“某处”获得的实例是否可以重新启动?
我在这里想念什么?